# Parallel Monte Carlo with Dask

## An introduction to parallel computing with Dask

Published on September 25, 2018Estimated reading time: 7 minutes

Today, we will look at how to implement a Monte Carlo simulation
to value an Asian option in a Jupyter notebook; and how to distribute that Monte Carlo
simluation across a cluster using Dask. Don’t worry if you are not a
Quantitative Analyst, this blog is more about Dask, `from dask import delayed`

and exciting
cluster stuff than about financial mathematics.

## Asian options

An Asian option is an exotic type of financial option contract. The payoff at expiry
depends on the average underlying price over some pre-set period of time - different to
a plain vanilla option where the payoff depends on the underlying price at expiry (fun
fact: they are called *Asian* options because legend has it that
Bankers Trust traded the first option linked to the average price of crude oil in in Tokyo
back in 1987).

The payoff of a fixed-strike Asian call option at expiry is defined as: `C(T) = max(0, A(T) - K)`

.
There is no closed form analytical formula to determine the fair value any time before
expiry for this kind of option. But we can determine the fair value by simulating a
large number of paths for the underlying asset price and calculate the payout at expiry
for each path. The fair value is then the discounted average over all these Monte Carlo
simulations.

## Random walk

We assume that the current value of the underlying asset price is composed of the past value and
a determinstic upward trend plus an error term defined as a white noise (a normal variable with zero mean and variance one). This is called a random walk with drift. Given a start price
`s0`

, a constant drift `mu`

and volatility `sigma`

, we can simulate the price path over `days`

number of days like this:

```
import numpy as np
def random_walk(s0, mu, sigma, days):
dt = 1/365.
prices = np.zeros(days)
shocks = np.zeros(days)
prices[0] = s0
for i in range(1, days):
e = np.random.normal(loc=mu * dt, scale=sigma * np.sqrt(dt))
prices[i] = prices[i-1] * (1 + e)
return prices
```

For any random path, we can now calculate the average price and, given
the strike price `K`

, the payoff at expiry:

```
days = 365 * 4 # days to expiry
s0 = 100 # current underlying price
mu = 0.02 # drift
sigma = 0.2 # volatility
K = 100 # strike price
A = np.average(random_walk(s0, mu, sigma, days)
C = max(0, A - K)
```

## Monte Carlo Simulation

We need to generate a large number of random price paths for the underlying. For each price path we calculate the associated payoff. These payoffs are averaged and discounted to today. This result is the value of the option.
The actual number of simulations `n`

required depends on the confidence level you are comfortable with for
the estimate. That, in turn, depends on your option’s parameters. I won’t go into details here, so let’s just assume that `n = 10000`

. We need to generate `n`

random paths and calculate the average value of the option at expiry:

```
n = 10000
%time np.average([max(0, np.average(random_walk(s0, mu, sigma, days)) - K) for i in range(0, n)])
Wall time: 1min 10s
11.673350929139112
```

## Dask task scheduler: dask.distributed

If you have more than one CPU at your disposal, you can bring down the calculation time by
distributing the random walk generation across multiple CPUs. This is where Dask comes in.
Dask is a flexible library for parallel computing in Python, optimised for
interactive computational workloads. Instead of running `n`

random walks in a single
Jupyter notebook process, we make Dask distribute these calculations across
a number of processes.

Distributing these calculations is the job of the `task scheduler`

. Think of the task
scheduler as a server that manages where and how calculations as executed.
There are different types of schedulers: single machine scheduler (limited to
local machine) and distributed scheduler (distributed across a cluster). Here, I will
use a simple single machine scheduler.

In a new shell/command window, start a local `dask.distributed`

process scheduler:

```
Python 3.6.0 |Continuum Analytics, Inc.| (default, Dec 23 2016, 11:57:41) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client(scheduler_port=8786)
```

You can interrogate the `client`

object to confirm it runs on port 8786 on localhost.
By default, it spins up as many processes as you have CPUs on your computer, in
my case I have 4 cores:

```
>>> client
<Client: scheduler='tcp://127.0.0.1:8786' processes=4 cores=4>
```

## Wrapping functions with dask.delayed

We need to instruct our `random_walk`

and `np.average`

function calls
to execute lazily. Instead of executing the functions immediately,
we want to defer execution via the Dask task scheduler. We can achieve
this by wrapping functions in `dask.delayed`

. Alternatively, you can decorate
your functions with `@dask.delayed`

(not covered in this blog post, though
have a look at http://dask.pydata.org/en/latest/delayed.html#decorator).
This delays the execution of the function and generates a task graph instead.

```
from dask import delayed
s0 = 100
K = 100
mu = 0.02
sigma = 0.2
days = 365*4
n = 10000
result = delayed(np.average)([
delayed(max)(
0,
delayed(np.average)(random_walk(s0, mu, sigma, days)) - K
) for i in range(0, n)
])
```

`result`

is now a `Delayed`

object and contains the task graph. The task graph is a
directed acyclic graph (DAG) and models the dependencies between the `np.average`

,
`max`

and `random_walk`

function calls - without executing them. This allows Dask to optimise
its task execution strategy.

## Execute the task graph

We have everything in place now to execute the actual calculation of `result`

. Remember
that `result`

up to this point is just a Delayed object. Execute the computation:

```
%time result.compute()
Wall time: 39.9 s
11.617145839123664
```

In essence, the calculation time has roughly halved on 4 cores compared to 1 core. The reason it has not reduced to 25% is that there is some overhead involved in managing and distributing the task themselves. Have a look at the Dask docs about dask.delayed and task scheduling.