Machine Learning on `dask`

Machine Learning on `dask`

The data science team at Comtravo uses dask to coordinate fairly complex machine learning workloads, both for training and running them in production. Our NLP pipeline has a lot of cross-dependencies between the different predictive models and I find it really useful to have an easy, lightweight, and purely ‘pythonic’ way of encoding and executing model dependencies. The direct use of the dask computational graph was not something I was familiar with before joining Comtravo. I hope you’ll enjoy this overview of what the graph is and how to use it to coordinate custom workloads.

What is dask?

Dask is a Python library usually marketed as “out-of-core pandas”. Meaning that dask is able to run computations on pandas data frames that do not fit into memory. I won’t talk about that here, as there are lots of tutorials that demonstrate that use case (see References at the bottom of the article).

I want to highlight that dask is much more than just out-of-core pandas, it also offers dask.bag, which is a Map-Reduce-like abstraction over any data on disk (remote or local). I’ve found dask.bag especially useful for log parsing; I can read a large number of log files directly from S3 into a dask.bag. There’s also dask.array, which is an out-of-core abstraction over large numpy arrays.

So, in general, dask provides out-of-core abstractions over existing functionality in pandas and numpy. The part I find really interesting is the way in which these out-of-core abstractions are done. Rather than reimplementing a lot of the pandas API, dask creates wrappers that split certain operations into aggregates that work on small chunks of the original data. For instance, the arithmetic mean () can easily be parallelised, since the sum(s) as well as the overall count of values can be computed on small samples and then aggregated together. Below is a visualisation of what that looks like for a dask.dataframe with 3 partitions:

While the arithmetic mean is rather trivial to parallelise, other computations are not. The exact way in which some specific computation should be parallelised is encoded into a computational graph, an example of one is above.

Computational Graphs 101

A computational graph is a data structure that represents operations (functions) and dependencies between operations. Specifically, the graph needs to be directed and acyclic, in other words a DAG. Each node in the graph is a unit of computation that can be executed individually. Formally, computational graphs are a part of -calculus (lambda-calculus), which is a form of function abstraction. -calculus is a universal model of computation (in the sense of Turing complete), so although many of the examples below look simple, the paradigm they follow is very powerful.

The foundational layer of dask is the computational graph. This encodes the work to be done (executable functions) and the dependencies those pieces of work depend on. Let’s see how a simple dependency graph is encoded in dask. Since dask is pure python, the graph is just encoded as a python dict. The example below is taken from the dask documentation:

def inc(i):
    return i+1

def add(a, b):
    return a + b

dsk = {'x': 1,
       'y': (inc, 'x'),
       'z': (add, 'x', 'y')}

Let’s unpack what’s happening in the code above. The graph has three nodes named x, y and z. The node x just contains data, in this case the integer 1. Node y is an executable function inc which takes one parameter. Notice how the syntax dask has adopted for executable functions in the graph resembles a regular python function call with the opening bracket moved left by one token, i.e, inc('x') (inc, 'x').

The function inc should probably take an integer, though, not a string. The parameter, or parameters, for an executable function are interpreted by dask. If the parameter is a name that refers back to the graph, like x in this case, dask will de-reference that argument and pass in the output of the named node. In this case, since x just contains the integer 1, it’ll be passed into y when y is called.

The node named z is where interesting things start to happen. It’s also an executable function and takes two parameters: x and y. The x we’re already familiar with; the value of the node named y, on the other hand, is an executable, so dask will call the function and pass the output of that function call as the second parameter to z. This chaining of nodes allows us to encode very complex dependencies.

Let’s get back to machine learning.

Machine learning pipelines on dask.

Machine learning tends to break down into easily parallelisable tasks. For instance, cross-validation and hyperparameter search are both cases where each individual task can be executed independent of any other task: Computing the cross-validation results of fold n is independent of computing the cross-validation results of any other fold. There’s also a number of machine learning algorithms that are inherently parallel. An obvious example are model ensembles such as random forests where each individual tree is trained on a separate data sample. As a side note, there’s dask-ml (docs) which has parallel implementations of a number of machine learning algorithms.

Let’s look at how cross-validation can easily be parallelised using low-level dask operations. First, we do some imports and set up the data structures

import sklearn

dsk = {}        # create an empty graph
dsk['X'] = X    # add training data
dsk['y'] = y    # add training data
dsk['SVM'] = sklearn.linear_model.SGDClassifier(loss='hinge', penalty='l2', alpha=1e-5, max_iter=25)

def fit_model(mdl: sklearn.base.Estimator,
              X: np.ndarray,
              y: np.ndarray) -> sklearn.base.Estimator:
    """Take an unfitted model, and fit its weights on data `X` and `y`.
    return sklearn.base.clone(mdl).fit(X, y)

def predict(mdl: sklearn.base.Estimator,
            X: np.ndarray) -> Iterable[bool]:
    """Take a fitted model and produce predictions for the `X`.
    return mdl.predict(X)

def evaluate(y_true: Iterable[bool]],
             y_pred: Iterable[bool]) -> str:
    """Evaluate the quality of predictions `y_pred` against the ground truth `y_true`.
    return sklearn.metrics.classification_report(y_true, y_pred)

Then we create some number of cross-validation folds and add .fit, .predict and .evaluate nodes for every fold into the graph.

kfold = sklearn.model_selection.KFold(random_state=348347)
for i_fold, (trn, tst) in enumerate(kfold.split(X)):
    # add the train / test splits of each fold into the graph
    dsk[f'X/cv/transform-tst-{i_fold:02d}'] = X[tst]
    dsk[f'y/cv/transform-trn-{i_fold:02d}'] = y[trn]
    dsk[f'X/cv/transform-trn-{i_fold:02d}'] = X[trn]
    dsk[f'y/cv/transform-tst-{i_fold:02d}'] = y[tst]

    cv_model_key = f'svm/cv/fit-{i_fold:02d}'               # output is a binary blob (the fitted model)
    cv_pred_trn_key = f'svm/cv/transform-trn-{i_fold:02d}'  # output is a numpy array of predictions on the training data
    cv_pred_tst_key = f'svm/cv/transform-tst-{i_fold:02d}'  # output is a numpy array of predictions on the test data
    cv_report_trn_key = f'svm/cv/report-trn-{i_fold:02d}'   # a textual report of the trained classifier
    cv_report_tst_key = f'svm/cv/report-tst-{i_fold:02d}'   # a textual report of the trained classifier

    dsk[cv_model_key] = (fit_model, 'SVM', f'X/cv/transform-trn-{i_fold:02d}', f'y/cv/transform-trn-{i_fold:02d}')
    dsk[cv_pred_trn_key] = (predict, cv_model_key, f'X/cv/transform-trn-{i_fold:02d}')
    dsk[cv_pred_tst_key] = (predict, cv_model_key, f'X/cv/transform-tst-{i_fold:02d}')
    dsk[cv_report_trn_key] = (evaluate, cv_pred_trn_key, f'y/cv/transform-trn-{i_fold:02d}')
    dsk[cv_report_tst_key] = (evaluate, cv_pred_tst_key, f'y/cv/transform-tst-{i_fold:02d}')

dsk[f'svm/prod'] = (fit_model, 'SVM', 'X', 'y')

This somewhat cumbersome looking code creates a graph that parallelises 3-fold cross-validation of an SVM classifier. The key to how the graph encodes the dependencies is in the last five lines on code inside the for loop. The first node cv_model_key just calls the .fit_model function with an unfitted model and some data. However, notice that the data we pass in comes from the *-trn-* key for that fold, i.e. the training data.

The next two nodes cv_pred_trn_key and cv_pred_tst_key take the trained model from the previous step and produce some predictions from it on the test data. Finally, the cv_report_trn_key and cv_report_tst_key nodes take the predictions from the previous two steps and the ground truth data and evaluate the quality of the predictions.

This might seem like a lot of work for something that sklearn can already parallelise. The key, however, is that this method is much more general than sklearn’s parallel cross-validation operators. The above graph could contain hundreds of nodes and given the compute infrastructure, dask could execute the workload on a distributed cluster.

There’s a few things to note here. Normally, to fit a sklearn model you would call, y), here we’re calling fit_model(svm, X, y). We’ve stored the base model under the key 'SVM', but we can’t encode in the graph a node 'SVM'.fit because the string 'SVM' does not have a method .fit. This unfortunately requires us to introduce proxies for each callable we wish to execute.

Execute where ever

With the graph now encoded, we can execute those jobs on a local machine using all the cores the computer has.

from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

# only retrieve the fitted model from the first fold
svm_00 = client.get(dsk, keys=['svm/cv/fit-00'])

# only retrieve the fitted models from the all the folds
svms = client.get(dsk, keys=['svm/cv/fit-00', 'svm/cv/fit-01', 'svm/cv/fit-02'])

We’re not constrained to the local machine, though. We could just as easily use an existing Kubernetes or yarn cluster.

from dask_kubernetes import KubeCluster
from dask.distributed import Client

cluster = KubeCluster.from_yaml('worker-spec.yml')
client = Client(cluster)

These additional cluster implementations are external projects usually not developed by the core dask team and the level of support and maturity will vary. I haven’t used dask_kubernetes (yet), but it was developed by the UK MET office.

A few tricks to ease the pain

By default, dask will hold results that are still required by other tasks in memory. This can become problematic, but luckily there’s an easy workaround. Since all the nodes in the graph are essentially Python callables, we can wrap them into a caching callable that serialises and deserialises and inputs/outputs of tasks and only hand over the correct path of those results to dask.

Wrapping the results in a thin caching layer has another additional benefit: dask will try to move computation to where data resides as the callable functions tend to be faster to move over the network than large data sets. In fact, dask tracks the size of the results and tries to make trade-offs between moving data or moving computation. However, there are cases where moving even large, several hundred megabyte data sets around is not an issue, e.g. in machine learning cases where the model fitting can easily take several hours or even days. Caching the results to disk allows dask to distribute the computational load more evenly as the “result” of each computation is just a path reference. Furthermore, the cached results can be used to great effect when restarting a partially completed computation.

Some References

Written by

Dr. Matti Lyra

Research Engineer in NLP

Matti has been an active member of the data science community in Berlin, he has led the PyData Berlin committee since 2016. Matti holds a PhD in Natural Language Processing from the University of Sussex. His thesis explored using topical bias in large text corpora to build better ensemble models for text classification. Beyond his work at Comtravo he's interested in language and linguistics, the philosophy of AI, cycling, photography and coffee.