Playing with Mahout's Spark Shell

This tutorial will show you how to play with Mahout's scala DSL for linear algebra and its Spark shell. Please keep in mind that this code is still in a very early experimental stage.

(Edited for 0.10.2)

Intro

We'll use an excerpt of a publicly available dataset about cereals. The dataset tells the protein, fat, carbohydrate and sugars (in milligrams) contained in a set of cereals, as well as a customer rating for the cereals. Our aim for this example is to fit a linear model which infers the customer rating from the ingredients.

Name protein fat carbo sugars rating
Apple Cinnamon Cheerios 2 2 10.5 10 29.509541
Cap'n'Crunch 1 2 12 12 18.042851
Cocoa Puffs 1 1 12 13 22.736446
Froot Loops 2 1 11 13 32.207582
Honey Graham Ohs 1 2 12 11 21.871292
Wheaties Honey Gold 2 1 16 8 36.187559
Cheerios 6 2 17 1 50.764999
Clusters 3 2 13 7 40.400208
Great Grains Pecan 3 3 13 4 45.811716

Installing Mahout & Spark on your local machine

We describe how to do a quick toy setup of Spark & Mahout on your local machine, so that you can run this example and play with the shell.

  1. Download Apache Spark 1.1.1 and unpack the archive file
  2. Change to the directory where you unpacked Spark and type sbt/sbt assembly to build it
  3. Create a directory for Mahout somewhere on your machine, change to there and checkout the master branch of Apache Mahout from GitHub git clone https://github.com/apache/mahout mahout
  4. Change to the mahout directory and build mahout using mvn -DskipTests clean install

Starting Mahout's Spark shell

  1. Goto the directory where you unpacked Spark and type sbin/start-all.sh to locally start Spark
  2. Open a browser, point it to http://localhost:8080/ to check whether Spark successfully started. Copy the url of the spark master at the top of the page (it starts with spark://)
  3. Define the following environment variables:
    export MAHOUT_HOME=[directory into which you checked out Mahout]
    export SPARK_HOME=[directory where you unpacked Spark]
    export MASTER=[url of the Spark master]
    
  4. Finally, change to the directory where you unpacked Mahout and type bin/mahout spark-shell, you should see the shell starting and get the prompt mahout>. Check FAQ for further troubleshooting.

Implementation

We'll use the shell to interactively play with the data and incrementally implement a simple linear regression algorithm. Let's first load the dataset. Usually, we wouldn't need Mahout unless we processed a large dataset stored in a distributed filesystem. But for the sake of this example, we'll use our tiny toy dataset and "pretend" it was too big to fit onto a single machine.

Note: You can incrementally follow the example by copy-and-pasting the code into your running Mahout shell.

Mahout's linear algebra DSL has an abstraction called DistributedRowMatrix (DRM) which models a matrix that is partitioned by rows and stored in the memory of a cluster of machines. We use dense() to create a dense in-memory matrix from our toy dataset and use drmParallelize to load it into the cluster, "mimicking" a large, partitioned dataset.

val drmData = drmParallelize(dense(
  (2, 2, 10.5, 10, 29.509541),  // Apple Cinnamon Cheerios
  (1, 2, 12,   12, 18.042851),  // Cap'n'Crunch
  (1, 1, 12,   13, 22.736446),  // Cocoa Puffs
  (2, 1, 11,   13, 32.207582),  // Froot Loops
  (1, 2, 12,   11, 21.871292),  // Honey Graham Ohs
  (2, 1, 16,   8,  36.187559),  // Wheaties Honey Gold
  (6, 2, 17,   1,  50.764999),  // Cheerios
  (3, 2, 13,   7,  40.400208),  // Clusters
  (3, 3, 13,   4,  45.811716)), // Great Grains Pecan
  numPartitions = 2);

Have a look at this matrix. The first four columns represent the ingredients (our features) and the last column (the rating) is the target variable for our regression. Linear regression assumes that the target variable \(\mathbf{y}\) is generated by the linear combination of the feature matrix \(\mathbf{X}\) with the parameter vector \(\boldsymbol{\beta}\) plus the noise \(\boldsymbol{\varepsilon}\), summarized in the formula \(\mathbf{y}=\mathbf{X}\boldsymbol{\beta}+\boldsymbol{\varepsilon}\). Our goal is to find an estimate of the parameter vector \(\boldsymbol{\beta}\) that explains the data very well.

As a first step, we extract \(\mathbf{X}\) and \(\mathbf{y}\) from our data matrix. We get X by slicing: we take all rows (denoted by ::) and the first four columns, which have the ingredients in milligrams as content. Note that the result is again a DRM. The shell will not execute this code yet, it saves the history of operations and defers the execution until we really access a result. Mahout's DSL automatically optimizes and parallelizes all operations on DRMs and runs them on Apache Spark.

val drmX = drmData(::, 0 until 4)

Next, we extract the target variable vector y, the fifth column of the data matrix. We assume this one fits into our driver machine, so we fetch it into memory using collect:

val y = drmData.collect(::, 4)

Now we are ready to think about a mathematical way to estimate the parameter vector β. A simple textbook approach is ordinary least squares (OLS), which minimizes the sum of residual squares between the true target variable and the prediction of the target variable. In OLS, there is even a closed form expression for estimating \(\boldsymbol{\beta}\) as \(\left(\mathbf{X}^{\top}\mathbf{X}\right)^{-1}\mathbf{X}^{\top}\mathbf{y}\).

The first thing which we compute for this is \(\mathbf{X}^{\top}\mathbf{X}\). The code for doing this in Mahout's scala DSL maps directly to the mathematical formula. The operation .t() transposes a matrix and analogous to R %*% denotes matrix multiplication.

val drmXtX = drmX.t %*% drmX

The same is true for computing \(\mathbf{X}^{\top}\mathbf{y}\). We can simply type the math in scala expressions into the shell. Here, X lives in the cluster, while is y in the memory of the driver, and the result is a DRM again.

val drmXty = drmX.t %*% y

We're nearly done. The next step we take is to fetch \(\mathbf{X}^{\top}\mathbf{X}\) and \(\mathbf{X}^{\top}\mathbf{y}\) into the memory of our driver machine (we are targeting features matrices that are tall and skinny , so we can assume that \(\mathbf{X}^{\top}\mathbf{X}\) is small enough to fit in). Then, we provide them to an in-memory solver (Mahout provides the an analog to R's solve() for that) which computes beta, our OLS estimate of the parameter vector \(\boldsymbol{\beta}\).

val XtX = drmXtX.collect
val Xty = drmXty.collect(::, 0)

val beta = solve(XtX, Xty)

That's it! We have a implemented a distributed linear regression algorithm on Apache Spark. I hope you agree that we didn't have to worry a lot about parallelization and distributed systems. The goal of Mahout's linear algebra DSL is to abstract away the ugliness of programming a distributed system as much as possible, while still retaining decent performance and scalability.

We can now check how well our model fits its training data. First, we multiply the feature matrix \(\mathbf{X}\) by our estimate of \(\boldsymbol{\beta}\). Then, we look at the difference (via L2-norm) of the target variable \(\mathbf{y}\) to the fitted target variable:

val yFitted = (drmX %*% beta).collect(::, 0)
(y - yFitted).norm(2)

We hope that we could show that Mahout's shell allows people to interactively and incrementally write algorithms. We have entered a lot of individual commands, one-by-one, until we got the desired results. We can now refactor a little by wrapping our statements into easy-to-use functions. The definition of functions follows standard scala syntax.

We put all the commands for ordinary least squares into a function ols.

def ols(drmX: DrmLike[Int], y: Vector) = 
  solve(drmX.t %*% drmX, drmX.t %*% y)(::, 0)

Note that DSL declares implicit collect if coersion rules require an in-core argument. Hence, we can simply skip explicit collects.

Next, we define a function goodnessOfFit that tells how well a model fits the target variable:

def goodnessOfFit(drmX: DrmLike[Int], beta: Vector, y: Vector) = {
  val fittedY = (drmX %*% beta).collect(::, 0)
  (y - fittedY).norm(2)
}

So far we have left out an important aspect of a standard linear regression model. Usually there is a constant bias term added to the model. Without that, our model always crosses through the origin and we only learn the right angle. An easy way to add such a bias term to our model is to add a column of ones to the feature matrix \(\mathbf{X}\). The corresponding weight in the parameter vector will then be the bias term.

Here is how we add a bias column:

val drmXwithBiasColumn = drmX cbind 1

Now we can give the newly created DRM drmXwithBiasColumn to our model fitting method ols and see how well the resulting model fits the training data with goodnessOfFit. You should see a large improvement in the result.

val betaWithBiasTerm = ols(drmXwithBiasColumn, y)
goodnessOfFit(drmXwithBiasColumn, betaWithBiasTerm, y)

As a further optimization, we can make use of the DSL's caching functionality. We use drmXwithBiasColumn repeatedly as input to a computation, so it might be beneficial to cache it in memory. This is achieved by calling checkpoint(). In the end, we remove it from the cache with uncache:

val cachedDrmX = drmXwithBiasColumn.checkpoint()

val betaWithBiasTerm = ols(cachedDrmX, y)
val goodness = goodnessOfFit(cachedDrmX, betaWithBiasTerm, y)

cachedDrmX.uncache()

goodness

Liked what you saw? Checkout Mahout's overview for the Scala and Spark bindings.