Tell me where to send your free Celery Bootcamp lessons.

# Parallel Monte Carlo with Dask

## An introduction to parallel computing with Dask

Published on September 25, 2018

Today, we will look at how to implement a Monte Carlo simulation to value an Asian option in a Jupyter notebook; and how to distribute that Monte Carlo simluation across a cluster using Dask. Don’t worry if you are not a Quantitative Analyst, this blog is more about Dask, `from dask import delayed` and exciting cluster stuff than about financial mathematics.

## Asian options

An Asian option is an exotic type of financial option contract. The payoff at expiry depends on the average underlying price over some pre-set period of time - different to a plain vanilla option where the payoff depends on the underlying price at expiry (fun fact: they are called Asian options because legend has it that Bankers Trust traded the first option linked to the average price of crude oil in in Tokyo back in 1987).

The payoff of a fixed-strike Asian call option at expiry is defined as: `C(T) = max(0, A(T) - K)`. There is no closed form analytical formula to determine the fair value any time before expiry for this kind of option. But we can determine the fair value by simulating a large number of paths for the underlying asset price and calculate the payout at expiry for each path. The fair value is then the discounted average over all these Monte Carlo simulations.

## Random walk

We assume that the current value of the underlying asset price is composed of the past value and a determinstic upward trend plus an error term defined as a white noise (a normal variable with zero mean and variance one). This is called a random walk with drift. Given a start price `s0`, a constant drift `mu` and volatility `sigma`, we can simulate the price path over `days` number of days like this:

``````import numpy as np

def random_walk(s0, mu, sigma, days):
dt = 1/365.
prices = np.zeros(days)
shocks = np.zeros(days)
prices = s0
for i in range(1, days):
e = np.random.normal(loc=mu * dt, scale=sigma * np.sqrt(dt))
prices[i] = prices[i-1] * (1 + e)
return prices
``````

For any random path, we can now calculate the average price and, given the strike price `K`, the payoff at expiry:

``````days = 365 * 4  # days to expiry
s0 = 100  # current underlying price
mu = 0.02  # drift
sigma = 0.2  # volatility
K = 100  # strike price

A = np.average(random_walk(s0, mu, sigma, days)
C = max(0, A - K)
``````

## Monte Carlo Simulation

We need to generate a large number of random price paths for the underlying. For each price path we calculate the associated payoff. These payoffs are averaged and discounted to today. This result is the value of the option. The actual number of simulations `n` required depends on the confidence level you are comfortable with for the estimate. That, in turn, depends on your option’s parameters. I won’t go into details here, so let’s just assume that `n = 10000`. We need to generate `n` random paths and calculate the average value of the option at expiry:

``````n = 10000
%time np.average([max(0, np.average(random_walk(s0, mu, sigma, days)) - K) for i in range(0, n)])

Wall time: 1min 10s
11.673350929139112
``````

If you have more than one CPU at your disposal, you can bring down the calculation time by distributing the random walk generation across multiple CPUs. This is where Dask comes in. Dask is a flexible library for parallel computing in Python, optimised for interactive computational workloads. Instead of running `n` random walks in a single Jupyter notebook process, we make Dask distribute these calculations across a number of processes.

Distributing these calculations is the job of the `task scheduler`. Think of the task scheduler as a server that manages where and how calculations as executed. There are different types of schedulers: single machine scheduler (limited to local machine) and distributed scheduler (distributed across a cluster). Here, I will use a simple single machine scheduler.

In a new shell/command window, start a local `dask.distributed` process scheduler:

``````Python 3.6.0 |Continuum Analytics, Inc.| (default, Dec 23 2016, 11:57:41) [MSC v.1900 64 bit (AMD64)] on win32
>>> client = Client(scheduler_port=8786)
``````

You can interrogate the `client` object to confirm it runs on port 8786 on localhost. By default, it spins up as many processes as you have CPUs on your computer, in my case I have 4 cores:

``````>>> client
<Client: scheduler='tcp://127.0.0.1:8786' processes=4 cores=4>
``````

We need to instruct our `random_walk` and `np.average` function calls to execute lazily. Instead of executing the functions immediately, we want to defer execution via the Dask task scheduler. We can achieve this by wrapping functions in `dask.delayed`. Alternatively, you can decorate your functions with `@dask.delayed` (not covered in this blog post, though have a look at http://dask.pydata.org/en/latest/delayed.html#decorator). This delays the execution of the function and generates a task graph instead.

``````from dask import delayed

s0 = 100
K = 100
mu = 0.02
sigma = 0.2
days = 365*4
n = 10000

result = delayed(np.average)([
delayed(max)(
0,
delayed(np.average)(random_walk(s0, mu, sigma, days)) - K
) for i in range(0, n)
])
``````

`result` is now a `Delayed` object and contains the task graph. The task graph is a directed acyclic graph (DAG) and models the dependencies between the `np.average`, `max` and `random_walk` function calls - without executing them. This allows Dask to optimise its task execution strategy. We have everything in place now to execute the actual calculation of `result`. Remember that `result` up to this point is just a Delayed object. Execute the computation:
``````%time result.compute()