Peapod: A Scala and Spark Data Pipeline and Dependency Manager

In a previous post I discussed why anyone would be crazy enough to write yet another dependency and workflow manager. In this post I talk about the one we wrote, Peapod, which aims to provide simple dependency and data pipeline management for Spark workloads.

You can find the code here, please feel free to contribute.

Summary

I've always been told it's better to show rather than tell so here's an example of Peapod in use:

class AUC(implicit val p: Peapod) extends StorableTask[Double] {
  val pipelineLR = pea(new PipelineLR())
  val pipelineFeature = pea(new PipelineFeature())
  val parsed = pea(new Parsed)
  def generate = {
    val training = parsed.get()
    val transformed = pipelineFeature.get().transform(training)
    val predictions = pipelineLR.get().transform(transformed)
    val evaluator = new BinaryClassificationEvaluator()
    evaluator.evaluate(predictions)
  }
}

implicit val w = new Peapod(
  path="~/pipeline",
  raw="~/")(generic.Spark.sc)
      
println(p(AUC).get())

The code checks if "generate" was already run, if it was it loads the value from disk and otherwise generates it. When generating, the three dependencies (PipelineLR, PipelineFeatures, Parsed) are likewise checked and are either loaded from disk or generated . Furthermore, all this not only checks that the output exists on disk but recursively makes sure that the version of all dependencies matches with what was used to generate the stored output. If the version of a dependency was changed then every dependency downstream is also re-generated.

Description

The goal was a simple dependency manager which could improve the writing of Machine Learning and Data Processing focused Spark code. The expectation is that the user is working iteratively on code and building up a pipeline over time. Pieces might be initially tested in a Spark Notebook and then transitioned over time into a more well defined pipeline for future reuse.

  • Simple: The code and use of the dependency manager should be concise without much boiler plate
  • Spark: The main use case is to help with complicated Spark data pipelines and workflows
  • Small Team: This isn’t enterprise software aimed at a team of 500 or a production job scheduler. That means there is no need for a centralized data store. In a production environment the whole dependency manager could be running as a single job in a much larger dependency chain or leverage another system for scheduling (like Oozie, Airflow, Luigi, etc).
  • Checkpointing: Components of a pipeline should automatically save/load their output data to disk. This prevents long pipeline from having to be rerun when we could simply checkpoint to disk at appropriate times.
  • Versioning: Components should be versioned so that if we update the logic not only that component but all downstream components would need to be rerun. Optimally all versions are checkpointed separately to allow for debugging and code reversion.

Components

There’s two basic components in the system from a user's point of view:

  • Task: Individual component of the workflow. A task defines what dependencies it has, what action it does and the type of it’s output (ie: RDD, DataFrame, etc.). Tasks are generaly created as Scala classes.
    • EphemeralTask: Generates the output for the task every time the task is run
    • StorableTask: Saves the output of the task to disk and skips generating the output by loading it from disk if available. Implicit conversions ensure that only classes which can be stored (DataFrames, RDDs, Serializable, Writable, Double classes) can make use of StorableTask.
  • Peapod: Manages the state of tasks and their dependencies, and contains various configurations.

Tasks

Tasks are defined as classes which inherit from either StorableTask or EphemeralTask while also defining the type of their output as a generic type annotation (such as DataFrame, RDD, Double, etc.). StorableTasks save their output to permanent storage while EphemeralTasks do not.

class Raw(implicit val p: Peapod) extends EphemeralTask[DataFrame] {
  val raw = pea(new Raw)
}

class Parsed(implicit val p: Peapod) extends StorableTask[DataFrame] {
  val raw = pea(new Raw)
}

Dependency

Dependencies are defined by wrapping a new Task in a pea() method within another Task.

class Parsed(implicit val p: Peapod) extends EphemeralTask[DataFrame] {
  val raw = pea(new Raw)
}

You can then access the output of the dependency in the generate by calling .get() or simply doing an apply:

class Parsed(implicit val p: Peapod) extends EphemeralTask[DataFrame] {
  val raw = pea(new Raw)
  def generate() = {
    raw.get()
    raw()
  }

Versioning

Each task has a version variable which defines the version of the task in question. StorableTask uses the version of itself and all it’s dependencies to determine if it can load a cached output from disk. In other words changing the version causes all stored data downstream of the task to be skipped. More specifically the task stores it’s output in a directory that is defined by the version of the task and all it’s dependencies.

class Parsed(implicit val p: Peapod) extends EphemeralTask[DataFrame] {
  override val version = "2"
  val raw = pea(new Raw)

Dependency Graph

The Peapod class can output the current dependency graph in DOT format. Tools such as Graphiz or Gravizo can be used to convert the textual representation into an image. Dashed boxes represent Ephemeral tasks while filled in boxes represent Tasks with already generated output on disk.

p(new Test.PipelineFeature().get())
p(new Test.ParsedEphemeral())
p(new Test.AUC())
println(p.dotFormatDiagram())
println(Util.teachingmachinesDotLink(p.dotFormatDiagram()))

Persisting/Caching

The outputs of Task's are cached as long as additional Task's still depend on that Task. In the case of RDD or DataFrame the output is persisted or cached using Spark's built in mechanism if that Task is the dependency of at least two other tasks.

Full Example

This example defines a simple pipeline for building, training and testing a machine learning model on some sample data.

class Raw(implicit val p: Peapod) extends StorableTask[RDD[DependencyInput]] {
  override val version = "2"
  def generate = {
    p.sc.textFile("file://" + Resources.getResource("dependency.csv").getPath)
      .map(_.split(","))
      .map(l => new DependencyInput(l(0).toDouble, l(1)))
  }
}

class Parsed(implicit val p: Peapod) extends EphemeralTask[DataFrame] {
  import p.sqlCtx.implicits._
  val raw = pea(new Raw)
  def generate =
    raw.get().df
}

class PipelineFeature(implicit val p: Peapod) extends StorableTask[PipelineModel] {
  val parsed = pea(new Parsed)
  override val version = "2"
  def generate = {
    val training = parsed.get()
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("TextTokenRaw")
    val remover = new (StopWordsRemover)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("TextToken")
    val hashingTF = new HashingTF()
      .setNumFeatures(5)
      .setInputCol(remover.getOutputCol)
      .setOutputCol("features")

    val pipeline = new org.apache.spark.ml.Pipeline()
      .setStages(Array(tokenizer,remover, hashingTF))
    pipeline.fit(training)
  }
}

class PipelineLR(implicit val p: Peapod) extends StorableTask[PipelineModel] {
  val pipelineFeature = pea(new PipelineFeature())
  val parsed = pea(new Parsed)
  def generate = {
    val training = parsed.get()
    val lr = new LogisticRegression()
      .setMaxIter(25)
      .setRegParam(0.01)
      .setFeaturesCol("features")
      .setLabelCol("label")
    val pipeline = new org.apache.spark.ml.Pipeline()
      .setStages(Array(lr))
    pipeline.fit(pipelineFeature.get().transform(training))
  }
}

class AUC(implicit val p: Peapod) extends StorableTask[Double] {
  val pipelineLR = pea(new PipelineLR())
  val pipelineFeature = pea(new PipelineFeature())
  val parsed = pea(new Parsed)
  def generate = {
    val training = parsed.get()
    val transformed = pipelineFeature.get().transform(training)
    val predictions = pipelineLR.get().transform(transformed)
    val evaluator = new BinaryClassificationEvaluator()
    evaluator.evaluate(predictions)
  }
}

implicit val p = new Peapod(
  fs="file://",
  path=new TemporaryFolder().newFolder("workflow").getAbsolutePath)
//Runs all tasks so this is slow
println(p(new AUC()).get())
//Loads last task output from disc so is instant
println(p(new AUC()).get())

Using

In Maven:

<dependency>
<groupId>io.teachingmachines</groupId>
<artifactId>peapod</artifactId>
<version>0.7-SNAPSHOT</version>
</dependency>