Become a Celery expert TODAY! Sign up for my newsletter.
Tell me where to send your free Celery Bootcamp lessons.

Dynamic Task Routing in Celery

How to programmatically route tasks using a custom TaskRouter class

Published on June 05, 2018
Estimated reading time: 3 minutes
The full source code is available on https://github.com/ZoomerAnalytics/python-celery-dynamic-task-routing

In our previous blog post we looked into custom queues and task routing. We had to configure per task which queue we want Celery to task a route to. While this approach works well for a simple setup, it does not scale well for applications and micro-services where many Celery tasks need to be routed to a number of different worker queues.

Step 1: Celery task_routes config

Instead of configuring the task_routes per task, which queue you want to route a task to, you can tell Celery to use a custom class instead by specifying the path to said class (also, have a look at the Celery docs available at http://docs.celeryproject.org/en/latest/userguide/routing.html):

app = Celery(__name__)
app.conf.update({
    'broker_url': os.environ['CELERY_BROKER_URL'],
    'imports': (
        'tasks',
    ),
    'task_routes': ('task_router.TaskRouter',),
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})</code></pre>

Step 2: Define TaskRouter class

As per our task_routes value above, we need to define the custom TaskRouter class in the module task_router.py. Celery expects the method route_for_task that passes the task name as its first argument. Note how the method returns a dict that looks exactly like the one used for manual task routing.

class TaskRouter:
    def route_for_task(self, task, *args, **kwargs):
        if ':' not in task:
            return {'queue': 'default'}

        namespace, _ = task.split(':')
        return {'queue': namespace}

Our idea is to route a task based on its task name, in particular we want to assume that our task names follow the pattern queue:taskname. In our previous blog post’s example we had a task named fetch_bitcoin_price_index that we wanted to be routed to a queue called feeds. We rename this task to feeds:fetch_bitcoin_price_index.

@app.task(bind=True, name='feeds:fetch_bitcoin_price_index')
def fetch_bitcoin_price_index(self, start_date, end_date):
   ...


@app.task(bind=True, name='filters:calculate_moving_average')
def calculate_moving_average(self, args, window):
    ...

We need to run two Celery workers. One subscribes to the feeds, the other one to the filters queue:

celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters

Note the --queues command line arguments. They your workers subscribe to particular queues. For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters. For further information, have a look at the Celery docs.

Step 3: Ready for action

Bring up the docker-compose stack:

docker-compose up -d

And run the example.py script:

docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3

The script invokes the Celery chain that consists of two tasks: fetch_bitcoin_price_index fetches Bicoin Price Index data from the Coindesk API via the feeds queue to the worker-feeds Celery worker. When the task completes successfully, the result is passed onto the calculate_moving_average via the filters queue to the worker-filters Celery worker.

Check the docker-compose logs to follow the task flow through the two workers:

docker-compose logs -f

The docker-compose.yml stack also comes with a flower instance. Flower a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.

Summary

In this blog post you learned how to configure Celery to route tasks using a custom task router. This solution scales well when using many tasks across many queues and workers.

Posted on June 05, 2018