Become a Celery expert TODAY! Sign up for my newsletter.
Tell me where to send your free Celery Bootcamp lessons.

Quandl Timeseries With Celery and Falcon

A step-by-step tutorial to building a Celery app with a Falcon front-of-the-house

Published on July 10, 2018
Estimated reading time: 10 minutes
The full source code is available on https://github.com/ZoomerAnalytics/python-celery-falcon

Falcon is a bare-metal, minimalist Python library for building fast web APIs. Falcon shines with a clean design that embraces HTTP and the REST architectural style. In this blog post, you will learn you to integrate Falcon with Celery. I will show you how to structure your Falcon/Celery project so that is easily maintainable and reusable. And, you will learn how to fetch economic timeseries data from Quandl via Celery, store it in a Postgres database and retrieve it via a Falcon REST API.

Project structure

Clean separation of concerns between the Falcon and Celery apps is essential for maintainability, reusability and scalability. When designing your application, think of Falcon and Celery as two different apps (that most commonly share only the models): app.py (Falcon app) and worker.py (Celery app). The models folder contains the SQLAlchemy models which is common codebase for both the Falcon and Celery apps.

The middleware and resources folders are Falcon-specific. Falcon resources are all the things in your API that can be accessed by a URL. Middleware components provide a way to execute code before and after each request. The tasks folder contains our Celery tasks and belongs exclusively to the Celery sphere.

.
+-- app
|   +-- app.py
|   +-- worker.py
|   +-- alembic
|       +-- versions
|           +-- 20180710_init.py
|       +-- env.py
|       +-- script.py.mako
|   +-- models
|       +-- __init__.py
|       +-- timeseries.py
|   +-- middleware
|       +-- __init__.py
|       +-- sqla_sessionmanager.py
|   +-- resources
|       +-- __init__.py
|       +-- timeseries.py
|   +-- tasks
|       +-- __init__.py
|       +-- base.py
|       +-- quandl.py
|   +-- requirements.txt
+-- Dockerfile
+-- docker-compose.yml
+-- README.md

Falcon app

The Falcon app requires only minimum boilerplate code. We need to register the routes and their respective resources as well as any middleware components. Make sure to checkout the Falcon docs for further details.

#app/app.py

import falcon
import argparse
import sqlalchemy

from sqlalchemy.orm import sessionmaker
from .middleware import SQLAlchemySessionManager
from .resources import TimeSeriesResource


engine = sqlalchemy.create_engine('postgresql://user:password@postgres/5432')
session_factory = sessionmaker(bind=engine)

app = falcon.API(middleware=[SQLAlchemySessionManager(session_factory)])
app.add_route('/timeseries', TimeSeriesResource())


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--debug', help='Run debug server', action='store_true')
    args = parser.parse_args()
    if args.debug:
        from werkzeug.serving import run_simple
        run_simple('0.0.0.0', 8000, app, use_debugger=True, use_reloader=True)

For easy database SQLAlchemy session management, we create a middleware component so that we can easily access the session object from within our resources.

#app/middleware/sqla_sessionmanager.py
from sqlalchemy.orm import scoped_session

class SQLAlchemySessionManager:

    def __init__(self, session_factory):
        self.session_factory = session_factory

    def process_resource(self, req, resp, resource, params):
        resource.session = scoped_session(self.session_factory)

    def process_response(self, req, resp, resource, req_succeeded):
        if hasattr(resource, 'session'):
            resource.session.remove()

Finally, we define our timeseries resource to GET data from the database. As the request goes through the SQLAlchemySessionManager, the session instance is automatically available on the TimeSeriesResource.

We return a JSON response by asiggning the serialized content to resp.body. If you want to go the extra mile and follow the DRY principle, you can create another middleware component to do the json.dumps(...) bit. I left this out for the sake of simplicity but it is a good excercise to do it yourself.

#app/resources/timeseries.py
import falcon
import uuid
import json

from models import TimeSeries
from tasks.quandl import fetch_data_from_quandl


class TimeSeriesResource:

    def on_get(self, req, resp):
        if 'id' in req.params:
            timeseries = self.session.query(TimeSeries).filter(TimeSeries.id == req.params['id']).one()
            resp.status = falcon.HTTP_200
            resp.body = json.dumps(timeseries.data)
        else:
            timeseries = [{
                'id': str(timeseries.id),
                'database_code': timeseries.database_code,
                'dataset_code': timeseries.dataset_code
             } for timeseries in self.session.query(TimeSeries).all()]
            resp.status = falcon.HTTP_200
            resp.body = json.dumps(timeseries)
    
    def on_post(self, req, resp):
        body = req.stream.read()
        payload = json.loads(body.decode('utf-8'))
        database_code = payload['database_code']
        dataset_code = payload['dataset_code']

        timeseries = TimeSeries(
            id=uuid.uuid4(),
            database_code=database_code,
            dataset_code=dataset_code,
            data={},
            status='pending'
        )

        self.session.add(timeseries)
        self.session.commit()

        fetch_data_from_quandl.s(timeseries_id=timeseries.id).delay()
        resp.status = falcon.HTTP_201
        resp.body = json.dumps({
            'id': str(timeseries.id),
            'status': timeseries.status
        })

I also define the POST method here to fetch the data from Quandl asynchronously via Celery. This is the only interaction between our Falcon and Celery world. Here, I directly import the task but if you have a good reason to keep it absolutely separate, you can create another abstraction layer and, for example, call the Celery task by its name.

Also, I recommend creating a dedicated middleware component to deserialize the request body for the POST and PUT methods (which I deliberately skipped here).

Celery app

Just like the Falcon app, the Celery app requires minimal boiler plate code: broker_url specifies the URL to the message broker (here RabbitMQ) and imports tells Celery where to look for the tasks.

#app/worker.py
from celery import Celery

app = Celery('app')
app.conf.update({
    'broker_url': 'amqp://user:password@rabbitmq:5672',
    'imports': (
        'tasks',
    ),
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json'],
    'result_compression': 'gzip',
    'timezone': 'UTC'})

Similarly to the Falcon middleware, we define a custom Celery base task to manage the SQLAlchemy database session so that we can readily access the session object in our Celery tasks.

#app/tasks/base.py
import celery
import sqlalchemy

from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session


class Task(celery.Task):

    def __call__(self, *args, **kwargs):
        self.engine = sqlalchemy.create_engine('postgresql://user:password@postgres/5432')
        session_factory = sessionmaker(bind=self.engine)
        self.session = scoped_session(session_factory)
        return super().__call__(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if hasattr(self, 'session'):
            self.session.remove()
        if hasattr(self, 'engine'):
            self.engine.engine.dispose()

Now we have everything in place to create the Celery task that fetches data from Quandl:

#app/tasks/quandl.py
import requests

from models import TimeSeries
from worker import app
from .base import Task


@app.task(bind=True, base=Task, name='fetch_data_from_quandl')
def fetch_data_from_quandl(self, timeseries_id):
    timeseries = self.session.query(TimeSeries).filter_by(id=timeseries_id).one()
    url = f'https://www.quandl.com/api/v3/datasets/{timeseries.database_code}/{timeseries.dataset_code}/data.json'
    response = requests.get(url)
    if not response.ok:
        self.session.query(TimeSeries).filter_by(id=timeseries_id).update({'status': 'error'})
        self.session.commit()
        raise ValueError(f'GET {url} returned unexpected response code: {response.status_code}')

    self.session.query(TimeSeries).filter_by(id=timeseries_id).update({
        'data': response.json(),
        'status': 'success'
    })
    self.session.commit()

Showtime

Start the entire stack, including RabbitMQ and Postgres with docker-compose up -d. The Falcon app becomes available on port 8000. Request a financial timeseries from Quandl:

curl -d '{"database_code":"WIKI", "dataset_code":"FB"}' -H "Content-Type: application/json" -X POST http://localhost:8000

This should return something like this:

{
    "id": "4484d077-cee3-4f2c-bf18-50cb5d3573fe",
    "status": "pending"
}

Retrieve the result from the database:

curl http://localhost:8000/timeseries?id=4484d077-cee3-4f2c-bf18-50cb5d3573fe

This, in turn, returns the Quandl result (assuming it’s finished already), which is now persisted in our database”

{
    "dataset_data": {
        "data": [
            [
                "2018-03-27",
                156.31,
                162.85,
                150.75,
                152.19,
                76787884,
                0,
                1,
                156.31,
                162.85,
                150.75,
                152.19,
                76787884
            ],
            ...

Summary

In this blog post you learned how to structure your Celery powered Falcon app. We built a nice little REST app that asynchronously fetches data from Quandl and writes it into our internal Postgres database.

Key takeaway is to keep the Falcon and Celery apps separate. This allows you (and any other developer involved) to follow the workflow which in turn makes it an easily maintainable and scalable application.

Posted on July 10, 2018