Become a Celery expert TODAY! Sign up for my newsletter.
Tell me where to send your free Celery Bootcamp lessons.

Task Routing in Celery

How to route a Celery task to a dedicated queue

Published on May 29, 2018
Estimated reading time: 2 minutes
The full source code is available on https://github.com/ZoomerAnalytics/python-celery-task-routing

By default, Celery routes all tasks to a single queue and all workers consume this default queue. With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks. Or if you need to send tasks from one microservice to another microservice.

Step 1: Configure Celery via task_routes

Celery can be configured on a per-task basis which queue a task gets sent to:

app = Celery(__name__)
app.conf.update({
    'broker_url': os.environ['CELERY_BROKER_URL'],
    'imports': (
        'tasks',
    ),
    'task_routes': {
        'fetch_bitcoin_price_index': {'queue': 'feeds'}
        'calculate_moving_average': {'queue': 'filters'}
    },
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

Step 2: Make worker subscribe to a queue

We run one Celery worker that subscribes to the feeds queue and processes fetch_bitcoin_price_index tasks.

celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds

The other worker subscribes to the filters queue and processes calculate_moving_average tasks:

celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters

Note the --queues command line arguments. They your workers subscribe to particular queues. For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters. For further information, have a look at the Celery docs.

Step 3: Give it a go

Bring up the docker-compose stack:

docker-compose up -d

And run the example.py script:

docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3

Here, we re-use the Celery task chain from our previous blog post. Inside example.py, we invoke the Celery chain that consists of two tasks: fetch_bitcoin_price_index fetches Bicoin Price Index data from the Coindesk API via the feeds queue to the worker-feeds Celery worker.

When the task completes successfully, the result is passed onto the calculate_moving_average via the filters queue to the worker-filters Celery worker.

import argparse
from celery import chain
from tasks import fetch_bitcoin_price_index, calculate_moving_average
        
parser = argparse.ArgumentParser()
parser.add_argument('--start_date')
parser.add_argument('--end_date')
parser.add_argument('--window', default=3)
args = parser.parse_args()
task = chain(
    fetch_bitcoin_price_index.s(
        start_date=args.start_date,
        end_date=args.end_date),
    calculate_moving_average.s(window=args.window)
).delay()

Check the docker-compose logs to follow the task flow through the two workers:

docker-compose logs -f

The docker-compose.yml stack also comes with a flower instance. Flower is a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.

Summary

In this blog post you learned how to configure Celery to route tasks to dedicated queues and how to make Celery workers subscribe to certain queues.

In order to achieve that, you need to define routes per task. This approach works well for a simple setup. However it does not scale very well for an application with many Celery tasks or for a micro-services/Docker environment where multiple services communicate via the same message broker.

In the next blog post, you will learn all about dynamic task routing which is a programmatic and scalable solution to overcome the limitations described above.

Posted on May 29, 2018