Become a Celery expert TODAY! Sign up for my newsletter.
Tell me where to send your free Celery Bootcamp lessons.

Parallel Monte Carlo with Dask

An introduction to parallel computing with Dask

Published on September 25, 2018
Estimated reading time: 7 minutes

Today, we will look at how to implement a Monte Carlo simulation to value an Asian option in a Jupyter notebook; and how to distribute that Monte Carlo simluation across a cluster using Dask. Don’t worry if you are not a Quantitative Analyst, this blog is more about Dask, from dask import delayed and exciting cluster stuff than about financial mathematics.

Asian options

An Asian option is an exotic type of financial option contract. The payoff at expiry depends on the average underlying price over some pre-set period of time - different to a plain vanilla option where the payoff depends on the underlying price at expiry (fun fact: they are called Asian options because legend has it that Bankers Trust traded the first option linked to the average price of crude oil in in Tokyo back in 1987).

The payoff of a fixed-strike Asian call option at expiry is defined as: C(T) = max(0, A(T) - K). There is no closed form analytical formula to determine the fair value any time before expiry for this kind of option. But we can determine the fair value by simulating a large number of paths for the underlying asset price and calculate the payout at expiry for each path. The fair value is then the discounted average over all these Monte Carlo simulations.

Random walk

We assume that the current value of the underlying asset price is composed of the past value and a determinstic upward trend plus an error term defined as a white noise (a normal variable with zero mean and variance one). This is called a random walk with drift. Given a start price s0, a constant drift mu and volatility sigma, we can simulate the price path over days number of days like this:

import numpy as np

def random_walk(s0, mu, sigma, days):
    dt = 1/365.
    prices = np.zeros(days)
    shocks = np.zeros(days)
    prices[0] = s0
    for i in range(1, days):
        e = np.random.normal(loc=mu * dt, scale=sigma * np.sqrt(dt))
        prices[i] = prices[i-1] * (1 + e)
    return prices

For any random path, we can now calculate the average price and, given the strike price K, the payoff at expiry:

days = 365 * 4  # days to expiry
s0 = 100  # current underlying price
mu = 0.02  # drift
sigma = 0.2  # volatility
K = 100  # strike price

A = np.average(random_walk(s0, mu, sigma, days)
C = max(0, A - K)

Monte Carlo Simulation

We need to generate a large number of random price paths for the underlying. For each price path we calculate the associated payoff. These payoffs are averaged and discounted to today. This result is the value of the option. The actual number of simulations n required depends on the confidence level you are comfortable with for the estimate. That, in turn, depends on your option’s parameters. I won’t go into details here, so let’s just assume that n = 10000. We need to generate n random paths and calculate the average value of the option at expiry:

n = 10000
%time np.average([max(0, np.average(random_walk(s0, mu, sigma, days)) - K) for i in range(0, n)])

Wall time: 1min 10s
11.673350929139112

Dask task scheduler: dask.distributed

If you have more than one CPU at your disposal, you can bring down the calculation time by distributing the random walk generation across multiple CPUs. This is where Dask comes in. Dask is a flexible library for parallel computing in Python, optimised for interactive computational workloads. Instead of running n random walks in a single Jupyter notebook process, we make Dask distribute these calculations across a number of processes.

Distributing these calculations is the job of the task scheduler. Think of the task scheduler as a server that manages where and how calculations as executed. There are different types of schedulers: single machine scheduler (limited to local machine) and distributed scheduler (distributed across a cluster). Here, I will use a simple single machine scheduler.

In a new shell/command window, start a local dask.distributed process scheduler:

Python 3.6.0 |Continuum Analytics, Inc.| (default, Dec 23 2016, 11:57:41) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client(scheduler_port=8786)

You can interrogate the client object to confirm it runs on port 8786 on localhost. By default, it spins up as many processes as you have CPUs on your computer, in my case I have 4 cores:

>>> client
<Client: scheduler='tcp://127.0.0.1:8786' processes=4 cores=4>

Wrapping functions with dask.delayed

We need to instruct our random_walk and np.average function calls to execute lazily. Instead of executing the functions immediately, we want to defer execution via the Dask task scheduler. We can achieve this by wrapping functions in dask.delayed. Alternatively, you can decorate your functions with @dask.delayed (not covered in this blog post, though have a look at http://dask.pydata.org/en/latest/delayed.html#decorator). This delays the execution of the function and generates a task graph instead.

from dask import delayed

s0 = 100
K = 100
mu = 0.02
sigma = 0.2
days = 365*4
n = 10000

result = delayed(np.average)([
    delayed(max)(
        0, 
        delayed(np.average)(random_walk(s0, mu, sigma, days)) - K
    ) for i in range(0, n)
])

result is now a Delayed object and contains the task graph. The task graph is a directed acyclic graph (DAG) and models the dependencies between the np.average, max and random_walk function calls - without executing them. This allows Dask to optimise its task execution strategy.

screenshot

Execute the task graph

We have everything in place now to execute the actual calculation of result. Remember that result up to this point is just a Delayed object. Execute the computation:

%time result.compute()

Wall time: 39.9 s
11.617145839123664

In essence, the calculation time has roughly halved on 4 cores compared to 1 core. The reason it has not reduced to 25% is that there is some overhead involved in managing and distributing the task themselves. Have a look at the Dask docs about dask.delayed and task scheduling.

Posted on September 25, 2018