The source code used in this blog post is available on GitHub.

By default, Celery routes all tasks to a single queue and all workers consume this default queue. With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks. Or if you need to send tasks from one microservice to another microservice.

Step 1: Configure Celery via task_routes

Celery can be configured on a per-task basis which queue a task gets sent to:

app = Celery(__name__)
    'broker_url': os.environ['CELERY_BROKER_URL'],
    'imports': (
    'task_routes': {
        'fetch_bitcoin_price_index': {'queue': 'feeds'}
        'calculate_moving_average': {'queue': 'filters'}
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

Step 2: Make worker subscribe to a queue

We run one Celery worker that subscribes to the feeds queue and processes fetch_bitcoin_price_index tasks.

# start celery worker
~$ celery worker --hostname=worker.feeds@%h --queues=feeds

The other worker subscribes to the filters queue and processes calculate_moving_average tasks:

~$ celery worker --hostname=worker.filters@%h --queues=filters

Note the --queues command line arguments. They your workers subscribe to particular queues. For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters. For further information, have a look at the Celery docs.

Step 3: Give it a go

Bring up the docker-compose stack and execute

# start docker stack
~$ docker-compose up -d

# execute
~$ docker-compose exec worker-feeds python --start_date=2018-01-01 --end_date=2018-05-29 --window=3

Here, we re-use the Celery task chain from my previous blog post.Inside, we invoke the Celery chain that consists of two tasks: fetch_bitcoin_price_index fetches Bicoin Price Index data from the Coindesk API via thefeeds queue to the worker-feeds Celery worker.

When the task completes successfully, the result is passed onto the calculate_moving_average via the filters queue to the worker-filters Celery worker.

import argparse
from celery import chain
from tasks import fetch_bitcoin_price_index, calculate_moving_average

parser = argparse.ArgumentParser()
parser.add_argument('--window', default=3)
args = parser.parse_args()
task = chain(

Check the docker-compose logs to follow the task flow through the two workers:

# print logs
~$ docker-compose logs -f

The docker-compose.yml stack also comes with a Flower instance Flower is a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.


In this blog post you learned how to configure Celery to route tasks to dedicated queues and how to make Celery workers subscribe to certain queues.

In order to achieve that, you need to define routes per task. This approach works well for a simple setup. However it does not scale very well for an application with many Celery tasks or for a micro-services/Docker environment where multiple services communicate via the same message broker.

In the next blog post, you will learn all about dynamic task routing which is a programmatic and scalable solution to overcome the limitations described above.

The source code used in this blog post is available on GitHub.