The data science team at Comtravo uses dask
to coordinate fairly complex machine learning workloads, both for training and running them in production. Our NLP pipeline has a lot of cross-dependencies between the different predictive models and I find it really useful to have an easy, lightweight, and purely ‘pythonic’ way of encoding and executing model dependencies. The direct use of the dask
computational graph was not something I was familiar with before joining Comtravo. I hope you’ll enjoy this overview of what the graph is and how to use it to coordinate custom workloads.
What is dask
?
Dask is a Python library usually marketed as “out-of-core pandas
”. Meaning that dask
is able to run computations on pandas
data frames that do not fit into memory. I won’t talk about that here, as there are lots of tutorials that demonstrate that use case (see References at the bottom of the article).
I want to highlight that dask
is much more than just out-of-core pandas
, it also offers dask.bag
, which is a Map-Reduce-like abstraction over any data on disk (remote or local). I’ve found dask.bag
especially useful for log parsing; I can read a large number of log files directly from S3 into a dask.bag
. There’s also dask.array
, which is an out-of-core abstraction over large numpy
arrays.
So, in general, dask
provides out-of-core abstractions over existing functionality in pandas
and numpy
. The part I find really interesting is the way in which these out-of-core abstractions are done. Rather than reimplementing a lot of the pandas
API, dask
creates wrappers that split certain operations into aggregates that work on small chunks of the original data. For instance, the arithmetic mean (\(\frac{1}{N}\sum_{i=1}^N a_i\)) can easily be parallelised, since the sum(s) as well as the overall count of values can be computed on small samples and then aggregated together. Below is a visualisation of what that looks like for a dask.dataframe
with 3 partitions:
While the arithmetic mean is rather trivial to parallelise, other computations are not. The exact way in which some specific computation should be parallelised is encoded into a computational graph, an example of one is above.
Computational Graphs 101
A computational graph is a data structure that represents operations (functions) and dependencies between operations. Specifically, the graph needs to be directed and acyclic, in other words a DAG. Each node in the graph is a unit of computation that can be executed individually. Formally, computational graphs are a part of \(\lambda\)-calculus (lambda-calculus), which is a form of function abstraction. \(\lambda\)-calculus is a universal model of computation (in the sense of Turing complete), so although many of the examples below look simple, the paradigm they follow is very powerful.
The foundational layer of dask
is the computational graph. This encodes the work to be done (executable functions) and the dependencies those pieces of work depend on. Let’s see how a simple dependency graph is encoded in dask
. Since dask
is pure python, the graph is just encoded as a python dict
. The example below is taken from the dask
documentation:
def inc(i):
return i+1
def add(a, b):
return a + b
dsk = {'x': 1,
'y': (inc, 'x'),
'z': (add, 'x', 'y')}
Let’s unpack what’s happening in the code above. The graph has three nodes named x
, y
and z
. The node x
just contains data, in this case the integer 1
. Node y
is an executable function inc
which takes one parameter. Notice how the syntax dask
has adopted for executable functions in the graph resembles a regular python function call with the opening bracket moved left by one token, i.e, inc('x')
\(\rightarrow\) (inc, 'x')
.
The function inc
should probably take an integer, though, not a string. The parameter, or parameters, for an executable function are interpreted by dask
. If the parameter is a name that refers back to the graph, like x
in this case, dask
will de-reference that argument and pass in the output of the named node. In this case, since x
just contains the integer 1
, it’ll be passed into y
when y
is called.
The node named z
is where interesting things start to happen. It’s also an executable function and takes two parameters: x
and y
. The x
we’re already familiar with; the value of the node named y
, on the other hand, is an executable, so dask
will call the function and pass the output of that function call as the second parameter to z
. This chaining of nodes allows us to encode very complex dependencies.
Let’s get back to machine learning.
Machine learning pipelines on dask
.
Machine learning tends to break down into easily parallelisable tasks. For instance, cross-validation and hyperparameter search are both cases where each individual task can be executed independent of any other task: Computing the cross-validation results of fold n
is independent of computing the cross-validation results of any other fold. There’s also a number of machine learning algorithms that are inherently parallel. An obvious example are model ensembles such as random forests where each individual tree is trained on a separate data sample. As a side note, there’s dask-ml
(docs) which has parallel implementations of a number of machine learning algorithms.
Let’s look at how cross-validation can easily be parallelised using low-level dask
operations. First, we do some imports and set up the data structures
import sklearn
dsk = {} # create an empty graph
dsk['X'] = X # add training data
dsk['y'] = y # add training data
dsk['SVM'] = sklearn.linear_model.SGDClassifier(loss='hinge', penalty='l2', alpha=1e-5, max_iter=25)
def fit_model(mdl: sklearn.base.Estimator,
X: np.ndarray,
y: np.ndarray) -> sklearn.base.Estimator:
"""Take an unfitted model, and fit its weights on data `X` and `y`.
"""
return sklearn.base.clone(mdl).fit(X, y)
def predict(mdl: sklearn.base.Estimator,
X: np.ndarray) -> Iterable[bool]:
"""Take a fitted model and produce predictions for the `X`.
"""
return mdl.predict(X)
def evaluate(y_true: Iterable[bool]],
y_pred: Iterable[bool]) -> str:
"""Evaluate the quality of predictions `y_pred` against the ground truth `y_true`.
"""
return sklearn.metrics.classification_report(y_true, y_pred)
Then we create some number of cross-validation folds and add .fit
, .predict
and .evaluate
nodes for every fold into the graph.
kfold = sklearn.model_selection.KFold(random_state=348347)
for i_fold, (trn, tst) in enumerate(kfold.split(X)):
# add the train / test splits of each fold into the graph
dsk[f'X/cv/transform-tst-{i_fold:02d}'] = X[tst]
dsk[f'y/cv/transform-trn-{i_fold:02d}'] = y[trn]
dsk[f'X/cv/transform-trn-{i_fold:02d}'] = X[trn]
dsk[f'y/cv/transform-tst-{i_fold:02d}'] = y[tst]
cv_model_key = f'svm/cv/fit-{i_fold:02d}' # output is a binary blob (the fitted model)
cv_pred_trn_key = f'svm/cv/transform-trn-{i_fold:02d}' # output is a numpy array of predictions on the training data
cv_pred_tst_key = f'svm/cv/transform-tst-{i_fold:02d}' # output is a numpy array of predictions on the test data
cv_report_trn_key = f'svm/cv/report-trn-{i_fold:02d}' # a textual report of the trained classifier
cv_report_tst_key = f'svm/cv/report-tst-{i_fold:02d}' # a textual report of the trained classifier
dsk[cv_model_key] = (fit_model, 'SVM', f'X/cv/transform-trn-{i_fold:02d}', f'y/cv/transform-trn-{i_fold:02d}')
dsk[cv_pred_trn_key] = (predict, cv_model_key, f'X/cv/transform-trn-{i_fold:02d}')
dsk[cv_pred_tst_key] = (predict, cv_model_key, f'X/cv/transform-tst-{i_fold:02d}')
dsk[cv_report_trn_key] = (evaluate, cv_pred_trn_key, f'y/cv/transform-trn-{i_fold:02d}')
dsk[cv_report_tst_key] = (evaluate, cv_pred_tst_key, f'y/cv/transform-tst-{i_fold:02d}')
dsk[f'svm/prod'] = (fit_model, 'SVM', 'X', 'y')
This somewhat cumbersome looking code creates a graph that parallelises 3-fold cross-validation of an SVM classifier. The key to how the graph encodes the dependencies is in the last five lines on code inside the for loop. The first node cv_model_key
just calls the .fit_model
function with an unfitted model and some data. However, notice that the data we pass in comes from the *-trn-*
key for that fold, i.e. the training data.
The next two nodes cv_pred_trn_key
and cv_pred_tst_key
take the trained model from the previous step and produce some predictions from it on the test data. Finally, the cv_report_trn_key
and cv_report_tst_key
nodes take the predictions from the previous two steps and the ground truth data and evaluate the quality of the predictions.
This might seem like a lot of work for something that sklearn
can already parallelise. The key, however, is that this method is much more general than sklearn
’s parallel cross-validation operators. The above graph could contain hundreds of nodes and given the compute infrastructure, dask
could execute the workload on a distributed cluster.
There’s a few things to note here. Normally, to fit a sklearn
model you would call svm.fit(X, y)
, here we’re calling fit_model(svm, X, y)
. We’ve stored the base model under the key 'SVM'
, but we can’t encode in the graph a node 'SVM'.fit
because the string 'SVM'
does not have a method .fit
. This unfortunately requires us to introduce proxies for each callable we wish to execute.
Execute where ever
With the graph now encoded, we can execute those jobs on a local machine using all the cores the computer has.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
# only retrieve the fitted model from the first fold
svm_00 = client.get(dsk, keys=['svm/cv/fit-00'])
# only retrieve the fitted models from the all the folds
svms = client.get(dsk, keys=['svm/cv/fit-00', 'svm/cv/fit-01', 'svm/cv/fit-02'])
We’re not constrained to the local machine, though. We could just as easily use an existing Kubernetes or yarn cluster.
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster.from_yaml('worker-spec.yml')
client = Client(cluster)
These additional cluster implementations are external projects usually not developed by the core dask
team and the level of support and maturity will vary. I haven’t used dask_kubernetes
(yet), but it was developed by the UK MET office.
A few tricks to ease the pain
By default, dask
will hold results that are still required by other tasks in memory. This can become problematic, but luckily there’s an easy workaround. Since all the nodes in the graph are essentially Python callables, we can wrap them into a caching callable that serialises and deserialises and inputs/outputs of tasks and only hand over the correct path of those results to dask
.
Wrapping the results in a thin caching layer has another additional benefit: dask
will try to move computation to where data resides as the callable functions tend to be faster to move over the network than large data sets. In fact, dask
tracks the size of the results and tries to make trade-offs between moving data or moving computation. However, there are cases where moving even large, several hundred megabyte data sets around is not an issue, e.g. in machine learning cases where the model fitting can easily take several hours or even days. Caching the results to disk allows dask
to distribute the computational load more evenly as the “result” of each computation is just a path reference. Furthermore, the cached results can be used to great effect when restarting a partially completed computation.
Some References
- dask
- dask-ml
- Matthew Rocklin on Dask at PyData Berlin 2015
- Scaling PyData with Dask by Martin Durant and Jim Crist
- Scalable Machine Learning on Dask by Tom Augspurger
- Matthew Rocklin comparing
dask
andcelery