Distributed Python

Build your Celery (the distributed task queue, not the vegetable) skills with weekly tutorials,
how-tos and articles about common gotchas.

September 28, 2018

Estimated reading time: 6 minutes

Custom Celery task states

Posted by Bjoern Stiel

Celery tasks always have a state. If a task finished executing successfully, its state is SUCCESS. If a task execution resulted in an exception, its state is FAILURE. Celery knows six built-in states:

  • PENDING (waiting for execution or unknown task id)

  • STARTED (task has been started)

  • SUCCESS (task executed successfully)

  • FAILURE (task execution resulted in exception)

  • RETRY (task is being retried)

  • REVOKED (task has been revoked)

In case you wonder why you have never come across the STARTED state, it is not reported by default. You have to enable it explicitly via the Celery config, setting task_track_started = True.

The update_state method

The Celery task object provides an update_state method. This method lets you do three things:

  • set the task’s state to one of the built-in states

  • provide additional meta data

  • set the task’s state to any custom state you define.

All you need to define your own state is a unique name. It is just a string and does not need to be registered anywhere. For example, if you have a long running task, you can define a PROGRESS state and publish the progress made via the meta json argument:

import time
from worker import app


@app.task(bind=True)
def task(self):
    n = 30
    for i in range(0, n):
        self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
        time.sleep(1)

    return n

This task runs for ~30 seconds and sends a task state update every ~1 second, broadcasting a custom PROGRESS state and the number of total and completed iterations. Let’s execute the task asynchronously, wait for the task to finish and capture the state and meta data while it’s still running:

import time
import tasks

task = tasks.task.s().delay()

while not task.ready():
    print(f'State={t.state}, info={t.info}')
    time.sleep(1)

print(f'State={t.state}, info={t.info}')

Which produces something like this:

State=PENDING, info=None
State=PROGRESS, info={'done': 0, 'total': 30}
State=PROGRESS, info={'done': 1, 'total': 30}
State=PROGRESS, info={'done': 2, 'total': 30}
State=PROGRESS, info={'done': 3, 'total': 30}
...
State=SUCCESS, info=29

This is a very simple example. But if we take a closer look, there are a few very interesting learnings:

  • any string can be a custom state
  • a custom state is only temporary and is eventually overriden by a Celery built-in state as soon as the task finishes successfully - or throws an exception, is retried or revoked (the same applies if we uset update_state with a built-in state but custom meta data - the custom meta data is ultimatemy overwritten by Celery)
  • while the task is in a custom state, the meta argument we published via update_state is available as info property o the AsyncResult object (the object .delay() returns on the execution side)
  • when the task is in the built-in SUCCESS state, the info property returns the task result (when the task failed, the info property returns the exception type and stacktrace, try it yourself by throwing an exception in the implementation of the task function above)

Built-in state with manual task result handling

Say, you want to provide some additional custom data for a failed tasks. Unfortunately, as we established above, Celery will overwrite the custom meta data, even if we use a built-in state type. Fortunately, there is a way to prevent this, raising an celery.exceptions.Ignore() exception. This means, no state will be recorded for the task, but the message is still removed from the queue

from celery import states
from celery.exceptions import Ignore
from worker import app


@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(state=states.FAILURE, meta={'custom': '...'})
        raise Ignore()

This works… at least, kind of. This time Celery does not overwrite the meta data:

>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.backend.get(t.backend.get_key_for_task(task.id)))
b'{"status": "FAILURE", "result": {"custom": "..."}, "traceback": null, "children": [], "task_id": "1df4b70c-1206-41e5-bcd3-786295d21267"}'

But, it turns out that, depending on the built-in task state, Celery expects the corresponding meta data dictionary to be in a particular format. And here, the meta data itself is incompatible with the FAILURE state:

>>> print(task.state)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 471, in state
    return self._get_task_meta()['status']
  File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 410, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 359, in get_task_meta
    meta = self._get_task_meta_for(task_id)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 674, in _get_task_meta_for
    return self.decode_result(meta)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 278, in decode_result
    return self.meta_from_decoded(self.decode(payload))
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 274, in meta_from_decoded
    meta['result'] = self.exception_to_python(meta['result'])
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 248, in exception_to_python
    from_utf8(exc['exc_type']), __name__)
KeyError: 'exc_type'

We can fix this by adding the exc_type and exc_message keys to our custom meta dictionary, effectively mimicking Celery’s default FAILURE meta structure.

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n')
                'custom': '...'
            })
        raise Ignore()

And this time we can get the task’s state and info without Celery throwing an exception. And we also have access to the custom field. Note that we have to retrieve the result from the backend via task.backend.get(...) as Celery parses the result dict depending on the task’s state.

>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.state)
'FAILURE'
>>> print(task.info)
ValueError('Traceback (most recent call last):', '  File "/app/tasks.py", line 16, in task', "    raise ValueError('some exception')", 'ValueError: some exception', '')
>>> print(task.backend.get(task.backend.get_key_for_task(task.id)))
b'{"status": "FAILURE", "result": {"exc_type": "ValueError", "exc_message": ["Traceback (most recent call last):", "  File \\"/app/tasks.py\\", line 16, in task", "    raise ValueError(\'some exception\')", "ValueError: some exception", ""], "custom": "..."}, "traceback": null, "children": [], "task_id": "d2f60111-aec6-4c58-83a7-24f0edb7ac5f"}'

Custom state

We can use the same Ignore() trick from above to instruct Celery to not overwrite our temporary custom state from the initial example:

from celery.exceptions import Ignore
from worker import app


@app.task(bind=True)
def task(self):
    self.update_state(state='SOME-CUSTOM-STATE', meta={'custom': '...'})
    raise Ignore()

This time, the task remains in our custom state. Also, Celery does not assume any specific meta dict structure:

>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.state)
'SOME-CUSTOM-STATE'
>>> print(task.info)
{'custom': '...'}
>>> print(task.result)
{'custom': '...'}

Conclusion

Celery provides a lot of flexibility when it comes to custom task states and custom meta data. Transient custom states in combination with custom meta data can be used to implement task progress trackers. Or, you might have a good reason to implement your own final custom task state, which Celery can equally cater for. You can even enrich a built-in the FAILURE task state with additional data. For further information, I encourage you to read the docs and play around with a few code examples.

If there's an aspect that I didn't cover, or you have specific questions (or find something wrong), let me know in the comments - I'll answer them as best as I can.



Get distributed Python tips straight to your inbox