Getting Started

To get started, add the following dependency to the pom:

<dependency>
  <groupId>org.apache.mahout</groupId>
  <artifactId>mahout-flink_2.10</artifactId>
  <version>0.12.0</version>
</dependency>

Here is how to use the Flink backend:

import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._

object ReadCsvExample {

  def main(args: Array[String]): Unit = {
    val filePath = "path/to/the/input/file"

    val env = ExecutionEnvironment.getExecutionEnvironment
    implicit val ctx = new FlinkDistributedContext(env)

    val drm = readCsv(filePath, delim = "\t", comment = "#")
    val C = drm.t %*% drm
    println(C.collect)
  }

}

Current Status

The top JIRA for Flink backend is MAHOUT-1570 which has been fully implemented.

Implemented

  • MAHOUT-1701 Mahout DSL for Flink: implement AtB ABt and AtA operators
  • MAHOUT-1702 implement element-wise operators (like A + 2 or A + B)
  • MAHOUT-1703 implement cbind and rbind
  • MAHOUT-1709 implement slicing (like A(1 to 10, ::))
  • MAHOUT-1710 implement right in-core matrix multiplication (A %*% B when B is in-core)
  • MAHOUT-1711 implement broadcasting
  • MAHOUT-1712 implement operators At, Ax, Atx - Ax and At are implemented
  • MAHOUT-1734 implement I/O - should be able to read results of Flink bindings
  • MAHOUT-1747 add support for different types of indexes (String, long, etc) - now supports Int, Long and String
  • MAHOUT-1748 switch to Flink Scala API
  • MAHOUT-1749 Implement Atx
  • MAHOUT-1750 Implement ABt
  • MAHOUT-1751 Implement AtA
  • MAHOUT-1755 Flush intermediate results to FS - Flink, unlike Spark, does not store intermediate results in memory.
  • MAHOUT-1764 Add standard backend tests for Flink
  • MAHOUT-1765 Add documentation about Flink backend
  • MAHOUT-1776 Refactor common Engine agnostic classes to Math-Scala module
  • MAHOUT-1777 move HDFSUtil classes into the HDFS module
  • MAHOUT-1804 Implement drmParallelizeWithRowLabels(..) in Flink
  • MAHOUT-1805 Implement allReduceBlock(..) in Flink bindings
  • MAHOUT-1809 Failing tests in flin-bindings: dals and dspca
  • MAHOUT-1810 Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)
  • MAHOUT-1812 Implement drmParallelizeWithEmptyLong(..) in flink bindings
  • MAHOUT-1814 Implement drm2intKeyed in flink bindings
  • MAHOUT-1815 dsqDist(X,Y) and dsqDist(X) failing in flink tests
  • MAHOUT-1816 Implement newRowCardinality in CheckpointedFlinkDrm
  • MAHOUT-1817 Implement caching in Flink Bindings
  • MAHOUT-1818 dals test failing in Flink Bindings
  • MAHOUT-1819 Set the default Parallelism for Flink execution in FlinkDistributedContext
  • MAHOUT-1820 Add a method to generate Tuple<PartitionId, Partition elements count» to support Flink backend
  • MAHOUT-1821 Use a mahout-flink-conf.yaml configuration file for Mahout specific Flink configuration
  • MAHOUT-1822 Update NOTICE.txt, License.txt to add Apache Flink
  • MAHOUT-1823 Modify MahoutFlinkTestSuite to implement FlinkTestBase
  • MAHOUT-1824 Optimize FlinkOpAtA to use upper triangular matrices
  • MAHOUT-1825 Add List of Flink algorithms to Mahout wiki page

Tests

There is a set of standard tests that all engines should pass (see MAHOUT-1764).

  • DistributedDecompositionsSuite
  • DrmLikeOpsSuite
  • DrmLikeSuite
  • RLikeDrmOpsSuite

These are Flink-backend specific tests, e.g.

  • DrmLikeOpsSuite for operations like norm, rowSums, rowMeans
  • RLikeOpsSuite for basic LA like A.t %*% A, A.t %*% x, etc
  • LATestSuite tests for specific operators like AtB, Ax, etc
  • UseCasesSuite has more complex examples, like power iteration, ridge regression, etc

Environment

For development the minimal supported configuration is

When using mahout, please import the following modules:

  • mahout-math
  • mahout-math-scala
  • mahout-flink_2.10 *