This tutorial will show you how to play with Mahout’s scala DSL for linear algebra and its Spark shell. Please keep in mind that this code is still in a very early experimental stage.
(Edited for 0.10.2)
We’ll use an excerpt of a publicly available dataset about cereals. The dataset tells the protein, fat, carbohydrate and sugars (in milligrams) contained in a set of cereals, as well as a customer rating for the cereals. Our aim for this example is to fit a linear model which infers the customer rating from the ingredients.
Name | protein | fat | carbo | sugars | rating |
---|---|---|---|---|---|
Apple Cinnamon Cheerios | 2 | 2 | 10.5 | 10 | 29.509541 |
Cap’n’Crunch | 1 | 2 | 12 | 12 | 18.042851 |
Cocoa Puffs | 1 | 1 | 12 | 13 | 22.736446 |
Froot Loops | 2 | 1 | 11 | 13 | 32.207582 |
Honey Graham Ohs | 1 | 2 | 12 | 11 | 21.871292 |
Wheaties Honey Gold | 2 | 1 | 16 | 8 | 36.187559 |
Cheerios | 6 | 2 | 17 | 1 | 50.764999 |
Clusters | 3 | 2 | 13 | 7 | 40.400208 |
Great Grains Pecan | 3 | 3 | 13 | 4 | 45.811716 |
We describe how to do a quick toy setup of Spark & Mahout on your local machine, so that you can run this example and play with the shell.
sbt/sbt assembly
to build itgit clone https://github.com/apache/mahout mahout
mahout
directory and build mahout using mvn -DskipTests clean install
sbin/start-all.sh
to locally start Spark</pre>
bin/mahout spark-shell
,
you should see the shell starting and get the prompt mahout>
. Check
FAQ for further troubleshooting.We’ll use the shell to interactively play with the data and incrementally implement a simple linear regression algorithm. Let’s first load the dataset. Usually, we wouldn’t need Mahout unless we processed a large dataset stored in a distributed filesystem. But for the sake of this example, we’ll use our tiny toy dataset and “pretend” it was too big to fit onto a single machine.
Note: You can incrementally follow the example by copy-and-pasting the code into your running Mahout shell.
Mahout’s linear algebra DSL has an abstraction called DistributedRowMatrix (DRM) which models a matrix that is partitioned by rows and stored in the memory of a cluster of machines. We use dense()
to create a dense in-memory matrix from our toy dataset and use drmParallelize
to load it into the cluster, “mimicking” a large, partitioned dataset.
val drmData = drmParallelize(dense( (2, 2, 10.5, 10, 29.509541), // Apple Cinnamon Cheerios (1, 2, 12, 12, 18.042851), // Cap'n'Crunch (1, 1, 12, 13, 22.736446), // Cocoa Puffs (2, 1, 11, 13, 32.207582), // Froot Loops (1, 2, 12, 11, 21.871292), // Honey Graham Ohs (2, 1, 16, 8, 36.187559), // Wheaties Honey Gold (6, 2, 17, 1, 50.764999), // Cheerios (3, 2, 13, 7, 40.400208), // Clusters (3, 3, 13, 4, 45.811716)), // Great Grains Pecan numPartitions = 2);
Have a look at this matrix. The first four columns represent the ingredients
(our features) and the last column (the rating) is the target variable for
our regression. Linear regression
assumes that the target variable \(\mathbf{y}\)
is generated by the
linear combination of the feature matrix \(\mathbf{X}\)
with the
parameter vector \(\boldsymbol{\beta}\)
plus the
noise \(\boldsymbol{\varepsilon}\)
, summarized in the formula
\(\mathbf{y}=\mathbf{X}\boldsymbol{\beta}+\boldsymbol{\varepsilon}\)
.
Our goal is to find an estimate of the parameter vector
\(\boldsymbol{\beta}\)
that explains the data very well.
As a first step, we extract \(\mathbf{X}\)
and \(\mathbf{y}\)
from our data matrix. We get X by slicing: we take all rows (denoted by ::
) and the first four columns, which have the ingredients in milligrams as content. Note that the result is again a DRM. The shell will not execute this code yet, it saves the history of operations and defers the execution until we really access a result. Mahout’s DSL automatically optimizes and parallelizes all operations on DRMs and runs them on Apache Spark.
val drmX = drmData(::, 0 until 4)
Next, we extract the target variable vector y, the fifth column of the data matrix. We assume this one fits into our driver machine, so we fetch it into memory using collect
:
val y = drmData.collect(::, 4)
Now we are ready to think about a mathematical way to estimate the parameter vector β. A simple textbook approach is ordinary least squares (OLS), which minimizes the sum of residual squares between the true target variable and the prediction of the target variable. In OLS, there is even a closed form expression for estimating \(\boldsymbol{\beta}\)
as
\(\left(\mathbf{X}^{\top}\mathbf{X}\right)^{-1}\mathbf{X}^{\top}\mathbf{y}\)
.
The first thing which we compute for this is \(\mathbf{X}^{\top}\mathbf{X}\)
. The code for doing this in Mahout’s scala DSL maps directly to the mathematical formula. The operation .t()
transposes a matrix and analogous to R %*%
denotes matrix multiplication.
val drmXtX = drmX.t %*% drmX
The same is true for computing \(\mathbf{X}^{\top}\mathbf{y}\)
. We can simply type the math in scala expressions into the shell. Here, X lives in the cluster, while is y in the memory of the driver, and the result is a DRM again.
val drmXty = drmX.t %*% y
We’re nearly done. The next step we take is to fetch \(\mathbf{X}^{\top}\mathbf{X}\)
and
\(\mathbf{X}^{\top}\mathbf{y}\)
into the memory of our driver machine (we are targeting
features matrices that are tall and skinny ,
so we can assume that \(\mathbf{X}^{\top}\mathbf{X}\)
is small enough
to fit in). Then, we provide them to an in-memory solver (Mahout provides
the an analog to R’s solve()
for that) which computes beta
, our
OLS estimate of the parameter vector \(\boldsymbol{\beta}\)
.
val XtX = drmXtX.collect val Xty = drmXty.collect(::, 0) val beta = solve(XtX, Xty)
That’s it! We have a implemented a distributed linear regression algorithm on Apache Spark. I hope you agree that we didn’t have to worry a lot about parallelization and distributed systems. The goal of Mahout’s linear algebra DSL is to abstract away the ugliness of programming a distributed system as much as possible, while still retaining decent performance and scalability.
We can now check how well our model fits its training data.
First, we multiply the feature matrix \(\mathbf{X}\)
by our estimate of
\(\boldsymbol{\beta}\)
. Then, we look at the difference (via L2-norm) of
the target variable \(\mathbf{y}\)
to the fitted target variable:
val yFitted = (drmX %*% beta).collect(::, 0) (y - yFitted).norm(2)
We hope that we could show that Mahout’s shell allows people to interactively and incrementally write algorithms. We have entered a lot of individual commands, one-by-one, until we got the desired results. We can now refactor a little by wrapping our statements into easy-to-use functions. The definition of functions follows standard scala syntax.
We put all the commands for ordinary least squares into a function ols
.
def ols(drmX: DrmLike[Int], y: Vector) = solve(drmX.t %*% drmX, drmX.t %*% y)(::, 0)
Note that DSL declares implicit collect
if coersion rules require an in-core argument. Hence, we can simply
skip explicit collect
s.
Next, we define a function goodnessOfFit
that tells how well a model fits the target variable:
def goodnessOfFit(drmX: DrmLike[Int], beta: Vector, y: Vector) = { val fittedY = (drmX %*% beta).collect(::, 0) (y - fittedY).norm(2) }
So far we have left out an important aspect of a standard linear regression
model. Usually there is a constant bias term added to the model. Without
that, our model always crosses through the origin and we only learn the
right angle. An easy way to add such a bias term to our model is to add a
column of ones to the feature matrix \(\mathbf{X}\)
.
The corresponding weight in the parameter vector will then be the bias term.
Here is how we add a bias column:
val drmXwithBiasColumn = drmX cbind 1
Now we can give the newly created DRM drmXwithBiasColumn
to our model fitting method ols
and see how well the resulting model fits the training data with goodnessOfFit
. You should see a large improvement in the result.
val betaWithBiasTerm = ols(drmXwithBiasColumn, y) goodnessOfFit(drmXwithBiasColumn, betaWithBiasTerm, y)
As a further optimization, we can make use of the DSL’s caching functionality. We use drmXwithBiasColumn
repeatedly as input to a computation, so it might be beneficial to cache it in memory. This is achieved by calling checkpoint()
. In the end, we remove it from the cache with uncache:
val cachedDrmX = drmXwithBiasColumn.checkpoint() val betaWithBiasTerm = ols(cachedDrmX, y) val goodness = goodnessOfFit(cachedDrmX, betaWithBiasTerm, y) cachedDrmX.uncache() goodness
Liked what you saw? Checkout Mahout’s overview for the Scala and Spark bindings.