The data science team at Comtravo uses `dask`

to coordinate fairly complex machine learning workloads, both for training and running them in production. Our NLP pipeline has a lot of cross-dependencies between the different predictive models and I find it really useful to have an easy, lightweight, and purely ‘pythonic’ way of encoding and executing model dependencies. The direct use of the `dask`

computational graph was not something I was familiar with before joining Comtravo. I hope you’ll enjoy this overview of what the graph is and how to use it to coordinate custom workloads.

# What is `dask`

?

Dask is a Python library usually marketed as “out-of-core `pandas`

”. Meaning that `dask`

is able to run computations on `pandas`

data frames that do not fit into memory. I won’t talk about that here, as there are lots of tutorials that demonstrate that use case (see References at the bottom of the article).

I want to highlight that `dask`

is much more than just out-of-core `pandas`

, it also offers `dask.bag`

, which is a Map-Reduce-like abstraction over any data on disk (remote or local). I’ve found `dask.bag`

especially useful for log parsing; I can read a large number of log files directly from S3 into a `dask.bag`

. There’s also `dask.array`

, which is an out-of-core abstraction over large `numpy`

arrays.

So, in general, `dask`

provides out-of-core abstractions over existing functionality in `pandas`

and `numpy`

. The part I find really interesting is the way in which these out-of-core abstractions are done. Rather than reimplementing a lot of the `pandas`

API, `dask`

creates wrappers that split certain operations into aggregates that work on small chunks of the original data. For instance, the arithmetic mean (\(\frac{1}{N}\sum_{i=1}^N a_i\)) can easily be parallelised, since the sum(s) as well as the overall count of values can be computed on small samples and then aggregated together. Below is a visualisation of what that looks like for a `dask.dataframe`

with 3 partitions:

While the arithmetic mean is rather trivial to parallelise, other computations are not. The exact way in which some specific computation should be parallelised is encoded into a computational graph, an example of one is above.

# Computational Graphs 101

A computational graph is a data structure that represents operations (functions) and dependencies between operations. Specifically, the graph needs to be directed and acyclic, in other words a DAG. Each node in the graph is a unit of computation that can be executed individually. Formally, computational graphs are a part of \(\lambda\)-calculus (lambda-calculus), which is a form of function abstraction. \(\lambda\)-calculus is a universal model of computation (in the sense of Turing complete), so although many of the examples below look simple, the paradigm they follow is very powerful.

The foundational layer of `dask`

is the computational graph. This encodes the work to be done (executable functions) and the dependencies those pieces of work depend on. Let’s see how a simple dependency graph is encoded in `dask`

. Since `dask`

is pure python, the graph is just encoded as a python `dict`

. The example below is taken from the `dask`

documentation:

```
def inc(i):
return i+1
def add(a, b):
return a + b
dsk = {'x': 1,
'y': (inc, 'x'),
'z': (add, 'x', 'y')}
```

Let’s unpack what’s happening in the code above. The graph has three nodes named `x`

, `y`

and `z`

. The node `x`

just contains data, in this case the integer `1`

. Node `y`

is an executable function `inc`

which takes one parameter. Notice how the syntax `dask`

has adopted for executable functions in the graph resembles a regular python function call with the opening bracket moved left by one token, i.e, `inc('x')`

\(\rightarrow\) `(inc, 'x')`

.

The function `inc`

should probably take an integer, though, not a string. The parameter, or parameters, for an executable function are interpreted by `dask`

. If the parameter is a name that refers back to the graph, like `x`

in this case, `dask`

will de-reference that argument and pass in the output of the named node. In this case, since `x`

just contains the integer `1`

, it’ll be passed into `y`

when `y`

is called.

The node named `z`

is where interesting things start to happen. It’s also an executable function and takes two parameters: `x`

and `y`

. The `x`

we’re already familiar with; the value of the node named `y`

, on the other hand, is an executable, so `dask`

will call the function and pass the output of that function call as the second parameter to `z`

. This chaining of nodes allows us to encode very complex dependencies.

Let’s get back to machine learning.

# Machine learning pipelines on `dask`

.

Machine learning tends to break down into easily parallelisable tasks. For instance, cross-validation and hyperparameter search are both cases where each individual task can be executed independent of any other task: Computing the cross-validation results of fold `n`

is independent of computing the cross-validation results of any other fold. There’s also a number of machine learning algorithms that are inherently parallel. An obvious example are model ensembles such as random forests where each individual tree is trained on a separate data sample. As a side note, there’s `dask-ml`

(docs) which has parallel implementations of a number of machine learning algorithms.

Let’s look at how cross-validation can easily be parallelised using low-level `dask`

operations. First, we do some imports and set up the data structures

```
import sklearn
dsk = {} # create an empty graph
dsk['X'] = X # add training data
dsk['y'] = y # add training data
dsk['SVM'] = sklearn.linear_model.SGDClassifier(loss='hinge', penalty='l2', alpha=1e-5, max_iter=25)
def fit_model(mdl: sklearn.base.Estimator,
X: np.ndarray,
y: np.ndarray) -> sklearn.base.Estimator:
"""Take an unfitted model, and fit its weights on data `X` and `y`.
"""
return sklearn.base.clone(mdl).fit(X, y)
def predict(mdl: sklearn.base.Estimator,
X: np.ndarray) -> Iterable[bool]:
"""Take a fitted model and produce predictions for the `X`.
"""
return mdl.predict(X)
def evaluate(y_true: Iterable[bool]],
y_pred: Iterable[bool]) -> str:
"""Evaluate the quality of predictions `y_pred` against the ground truth `y_true`.
"""
return sklearn.metrics.classification_report(y_true, y_pred)
```

Then we create some number of cross-validation folds and add `.fit`

, `.predict`

and `.evaluate`

nodes for every fold into the graph.

```
kfold = sklearn.model_selection.KFold(random_state=348347)
for i_fold, (trn, tst) in enumerate(kfold.split(X)):
# add the train / test splits of each fold into the graph
dsk[f'X/cv/transform-tst-{i_fold:02d}'] = X[tst]
dsk[f'y/cv/transform-trn-{i_fold:02d}'] = y[trn]
dsk[f'X/cv/transform-trn-{i_fold:02d}'] = X[trn]
dsk[f'y/cv/transform-tst-{i_fold:02d}'] = y[tst]
cv_model_key = f'svm/cv/fit-{i_fold:02d}' # output is a binary blob (the fitted model)
cv_pred_trn_key = f'svm/cv/transform-trn-{i_fold:02d}' # output is a numpy array of predictions on the training data
cv_pred_tst_key = f'svm/cv/transform-tst-{i_fold:02d}' # output is a numpy array of predictions on the test data
cv_report_trn_key = f'svm/cv/report-trn-{i_fold:02d}' # a textual report of the trained classifier
cv_report_tst_key = f'svm/cv/report-tst-{i_fold:02d}' # a textual report of the trained classifier
dsk[cv_model_key] = (fit_model, 'SVM', f'X/cv/transform-trn-{i_fold:02d}', f'y/cv/transform-trn-{i_fold:02d}')
dsk[cv_pred_trn_key] = (predict, cv_model_key, f'X/cv/transform-trn-{i_fold:02d}')
dsk[cv_pred_tst_key] = (predict, cv_model_key, f'X/cv/transform-tst-{i_fold:02d}')
dsk[cv_report_trn_key] = (evaluate, cv_pred_trn_key, f'y/cv/transform-trn-{i_fold:02d}')
dsk[cv_report_tst_key] = (evaluate, cv_pred_tst_key, f'y/cv/transform-tst-{i_fold:02d}')
dsk[f'svm/prod'] = (fit_model, 'SVM', 'X', 'y')
```

This somewhat cumbersome looking code creates a graph that parallelises 3-fold cross-validation of an SVM classifier. The key to how the graph encodes the dependencies is in the last five lines on code inside the for loop. The first node `cv_model_key`

just calls the `.fit_model`

function with an unfitted model and some data. However, notice that the data we pass in comes from the `*-trn-*`

key for that fold, i.e. the training data.

The next two nodes `cv_pred_trn_key`

and `cv_pred_tst_key`

take the trained model from the previous step and produce some predictions from it on the test data. Finally, the `cv_report_trn_key`

and `cv_report_tst_key`

nodes take the predictions from the previous two steps and the ground truth data and evaluate the quality of the predictions.

This might seem like a lot of work for something that `sklearn`

can already parallelise. The key, however, is that this method is much more general than `sklearn`

’s parallel cross-validation operators. The above graph could contain hundreds of nodes and given the compute infrastructure, `dask`

could execute the workload on a distributed cluster.

There’s a few things to note here. Normally, to fit a `sklearn`

model you would call `svm.fit(X, y)`

, here we’re calling `fit_model(svm, X, y)`

. We’ve stored the base model under the key `'SVM'`

, but we can’t encode in the graph a node `'SVM'.fit`

because the string `'SVM'`

does not have a method `.fit`

. This unfortunately requires us to introduce proxies for each callable we wish to execute.

# Execute where ever

With the graph now encoded, we can execute those jobs on a local machine using all the cores the computer has.

```
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
# only retrieve the fitted model from the first fold
svm_00 = client.get(dsk, keys=['svm/cv/fit-00'])
# only retrieve the fitted models from the all the folds
svms = client.get(dsk, keys=['svm/cv/fit-00', 'svm/cv/fit-01', 'svm/cv/fit-02'])
```

We’re not constrained to the local machine, though. We could just as easily use an existing Kubernetes or yarn cluster.

```
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster.from_yaml('worker-spec.yml')
client = Client(cluster)
```

These additional cluster implementations are external projects usually not developed by the core `dask`

team and the level of support and maturity will vary. I haven’t used `dask_kubernetes`

(yet), but it was developed by the UK MET office.

# A few tricks to ease the pain

By default, `dask`

will hold results that are still required by other tasks in memory. This can become problematic, but luckily there’s an easy workaround. Since all the nodes in the graph are essentially Python callables, we can wrap them into a caching callable that serialises and deserialises and inputs/outputs of tasks and only hand over the correct path of those results to `dask`

.

Wrapping the results in a thin caching layer has another additional benefit: `dask`

will try to move computation to where data resides as the callable functions tend to be faster to move over the network than large data sets. In fact, `dask`

tracks the size of the results and tries to make trade-offs between moving data or moving computation. However, there are cases where moving even large, several hundred megabyte data sets around is not an issue, e.g. in machine learning cases where the model fitting can easily take several hours or even days. Caching the results to disk allows `dask`

to distribute the computational load more evenly as the “result” of each computation is just a path reference. Furthermore, the cached results can be used to great effect when restarting a partially completed computation.

# Some References

- dask
- dask-ml
- Matthew Rocklin on Dask at PyData Berlin 2015
- Scaling PyData with Dask by Martin Durant and Jim Crist
- Scalable Machine Learning on Dask by Tom Augspurger
- Matthew Rocklin comparing
`dask`

and`celery`