Example Python Machine Learning Algorithm on Spark

Example Python Machine Learning Algorithm on Spark

Combining software frameworks that do machine learning with ones that do parallel cluster computing is tricky. Because the tools are always changing, you want these tasks to be decoupled, but not so decoupled that it’s hard to put them together. Inevitably there will be some glue code in the middle. What follows is an example of glue that runs Python’s scikit-learn code in a Spark parallel environment.

Built on top of Python’s SciPy scientific computing module, scikit-learn contains implementations of a broad range of machine learning algorithms along with helpful utilities for generating datasets, performing evaluations, working with standard corpora, and so forth. It is well-documented and relatively easy to use. Scikit-learn has support for running work in parallel on multiple cores of single machine, but no support for distributing work across a cluster.

Spark is a successor to Hadoop that takes the Map/Reduce paradigm back to its functional programming roots. Instead of writing Map and Reduce Java classes as you would in Hadoop, you apply anonymous map and reduce functions to a distributed data set, allowing you to focus on the data transformations you are interested in while minimizing boilerplate. Spark is implemented in Scala, but has support for other languages such as Java, Python and Ruby.

Here is a program that puts these two things together to run a basic boosting task in Spark.

from pyspark import SparkContext
import numpy as np
from sklearn.cross_validation import train_test_split, Bootstrap
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier
def run(sc):
    def zero_matrix(n, m):
        return np.zeros(n*m, dtype = int).reshape(n, m)
    def vote_increment(y_est):
        increment = zero_matrix(y_est.size, n_ys)
        increment[np.arange(y_est.size), y_est] = 1
        return increment # test point x class matrix with 1s marking the estimator prediction
    X, y = make_classification()
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    n_test = X_test.shape[0]
    n_ys = np.unique(y_train).size
    model = DecisionTreeClassifier()
    # Partition the training data into random sub-samples with replacement.
    samples = sc.parallelize(Bootstrap(y.size))
    # Train a model for each sub-sample and apply it to the test data.
    vote_tally = samples.map(lambda (index, _):
        model.fit(X[index], y[index]).predict(X_test)
    ).map(vote_increment).fold(zero_matrix(n_test, n_ys), np.add) # Take the learner majority vote.
    y_estimate_vote = np.argmax(vote_tally, axis = 1)
    return accuracy_score(y_test, y_estimate_vote)
if __name__ == '__main__':
    print run(SparkContext("local", "Boost"))

spark_parallel_boost.py randomly generates test and training data sets (lines 19-20), randomly selects subsets of the training set with replacement (line 27), uses the Spark framework to train an ensemble of decision trees with these subsets and apply them to the test set (line 30), then takes the ensemble majority vote and uses it to calculate a test prediction accuracy (lines 31-33). To see this script in action, set up Spark on your machine, then run it using the pyspark script that comes with the Spark distribution. It will generate a bunch of logger output followed by an accuracy score between 0 and 1.

Since the inputs are random, the accuracy results don’t mean anything. Also the code is all being run on your local machine. (That’s what the “local” argument to the SparkContext constructor in line 36 means.) Nevertheless, this can serve as a template for applications that do real learning on a cluster and illustrates both the advantages and frustrations of combining these two tools. The advantage is that the machine learning part itself is extremely simple: all the heavy lifting is done for you in line 30. The frustration is that although scikit-learn already implements various parallelizable techniques (boosting, cross validation, decision forests), it only does so for a single machine, so you have to write Spark versions yourself. (Note how here the most complicated part of the code is the part that takes the majority vote among the learners.) Still, it’s probably easier to roll your own ensemble techniques in Python than in Java Map and Reduce classes, so this is a good place to start.