Spark, Turn Off the Light on the CLI

There was a popular post by Adam Drake a couple years ago about the performance gains from using simple command line tools versus an EMR Hadoop cluster. In the end the command line approach took 12 seconds while EMR took 26 minutes. I very much agree with Adam that using distributed computing tools for small to medium data is very much overkill and counter-productive.

That said, I thought it interesting to see how Spark compares against the command line and Hadoop. One of the great benefits of Spark is the ability to run virtually the same code on an automatically spawned local cluster and a distributed cluster.

The problem at hand is to analyze chess game data which is available in this github project.

The goal was to count the number of wins by white, wins by black and draws.

Command Line

The command line solution is:

# bash
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 mawk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | mawk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'

This takes 18 seconds to run on my laptop with a cleared file cache.

Local Spark

Setting up Spark to run locally is fairly easy, my environment of choice is Spark Notebook along with an IDEA IntelliJ for more intensive coding

A fairly recent version I’ll use for this is here:
http://spark-notebook.io/dl/zip/0.6.2/2.10/1.5.2/2.4.0/false/true

Next we need to start up the Spark Notebook server:

# bash
cd  spark-notebook-0.6.2-scala-2.10.4-spark-1.5.2-hadoop-2.4.with-parquet
chmod +x  ./bin/spark-notebook
./bin/spark-notebook

Going to localhost:9000 will let you access Spark Notebook:

Now simply create a new local cluster notebook:

The code which replicates the command line is, we’ll go through it line by line in a bit:

sc.textFile("/Users/marcin.mejran/Downloads/ChessData-master/*/*.pgn")
  .filter(_.contains("Result"))
  .map(
    l => {
      val s = l.split("-")
      s(0).substring(s(0).length-1,  s(0).length)
    }
  )
  .countByValue()
  .toSeq

The results aren’t too bad, 48 seconds and a nice tabular output:

Now let's go through what happened in more detail.

The first step is to read the pgn files, sc is a reference to a SparkContext created by Spark Notebook which allows us to read files. In this case we’re reading any files ending with .pgn in subdirectories. The end result is an RDD composed of a single record for every line in the input files:

sc.textFile("/Users/marcin.mejran/Downloads/ChessData-master/*/*.pgn")

The next step is to filter the RDD for only lines that contain the “Result” text which indicates the result of a game. The “_” is Scala syntactic sugar which allows you to create an anonymous function with less boiler place. It’s equivalent to “line => line.contains(“Result”)”:

.filter(_.contains("Result"))

After for each matching row we split based on “-“ and then select the first character before the “-“:

  .map(
    l => {
      val s = l.split("-")
      s(0).substring(s(0).length-1,  s(0).length)
    }
  )

Next we count the number of times each value shows up:

  .countByValue()

The result of countByValue is a Map however those don’t show up nicely in a tabular format in Spark Notebook so we convert it to a Seq (ie: List) which does:

  .toSeq

Local Spark Improved

If you go to localhost:4040 then you get to see the Spark Application UI which shows more information on what Spark actually did to run the job. Looking at the result in Stages shows that we have been running a lot of Tasks for this job, 3496!

This is a lot, so let’s look at the source code behind sc.textFile.

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }

The underlying reader is using a TextInputFormat which creates a single task per file it’s reading. That’s not too efficient when you have a lot of files especially if they're small files which are fast to read. There is a newer reader called CombineTextInputFormat that reads multiple files per Task and seems worth trying. We can use the “mapred.max.split.size” setting with it to control how many bytes each task aims to read. CombineTextInputFormat is a Hadoop 2.x features so we’re going to need to use the newAPIHadoopFile reader rather than hadoopFile.

import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.spark.api.java.StorageLevels

sc.hadoopConfiguration.set("mapred.max.split.size","100000000")

 val rdd=   sc.newAPIHadoopFile("/Users/marcin.mejran/Downloads/ChessData-master/*/*.pgn",
      classOf[CombineTextInputFormat], classOf[LongWritable], classOf[Text])
      .map(_._2.toString)
      .filter(_.contains("Result"))
      .map(
        l => {
          val s = l.split("-")
          s(0).substring(s(0).length-1,  s(0).length)
        }
      ).countByValue().toSeq

Let’s look at the results:

32 seconds and 65 Tasks, that’s a significant improvement!

Distributed Spark

One of the nice things about Spark is that you can run nearly the same code in a distributed cluster as you run locally. A previous set of posts goes a bit into the infrastructure we use for distributed computing. I’m going to gloss over everything which is needed for the infrastructure as it’d take a whole set of posts to describe all of it (which in fact is what we’re doing !).

I’m going to spin up a cluster of two spot price c3.8xlarge instances with a c3.large master node. This costs we around 75 cents per hour to run and takes around 5 minutes to spin up:

# bash
python spark_ec2.py -s 2 -k tm-spark -i "" -m c3.large -t c3.8xlarge --region=us-west-2 --zone=us-west-2b --spark-version=1.5.2 --hadoop-major-version=yarn --vpc-id="" --subnet-id="" --authorized-address="10.0.0.1/32" --additional-security-group="" --delete-groups --spot-price="0.4" --route53-domain="spark" --route53-record-set="" --spot-master --no-ganglia --ami=“" --minimal-modules launch marcin-spark

Now we can connect to the spark notebook running on an AWS instances at drive-spark.spark:9000:

Now we create a standalone spark cluster notebook:

The data was put on S3 and we setup the correct s3 keys in Spark Notebook:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","")

The only change needed in the code itself is where we’re reading data from:

import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.spark.api.java.StorageLevels

sc.hadoopConfiguration.set("mapred.max.split.size","50000000")

 val rdd=   sc.newAPIHadoopFile("s3n://tm-chess/*/*.pgn",
      classOf[CombineTextInputFormat], classOf[LongWritable], classOf[Text])
      .map(_._2.toString)
      .filter(_.contains("Result"))
      .map(
        l => {
          val s = l.split("-")
          s(0).substring(s(0).length-1,  s(0).length)
        }
      ).countByValue().toSeq

The performance isn’t quiet as good but not too bad for something that is reading all it’s data over the network from S3.

Distributed Spark Improved

In reality it’s unlikely that we’ll only run a single command on the data but rather that a set of commands and analyses is done. So the real question isn’t how quick we can make a single command but subsequent commands as well. To help this we can take advantage of Spark in memory persistence of data and the fact that out distributed cluster has a lot of memory. Simply adding .cache() to an RDD creating command causes it to be persisted in memory:

import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.spark.api.java.StorageLevels

sc.hadoopConfiguration.set("mapred.max.split.size","50000000")

 val rdd=   sc.newAPIHadoopFile("s3n://tm-chess/*/*.pgn",
      classOf[CombineTextInputFormat], classOf[LongWritable], classOf[Text])
      .map(_._2.toString)
      .cache()
rdd.count() //Forces the RDD to be read and cached
rdd
      .filter(_.contains("Result"))
      .map(
        l => {
          val s = l.split("-")
          s(0).substring(s(0).length-1,  s(0).length)
        }
      ).countByValue().toSeq

The reading and caching took 96 seconds however the actual computation was much faster at under 2 seconds:

We did end up using almost 25gb of memory for this:

Advanced Topics

Since we’re running this under Scala we get access to any libraries which means that as we get more complicated computations we can still do them efficiently. We can for example use the PGN Parser Java library to parse the PGN files:

import com.supareno.pgnparser.PGNParser
import collection.JavaConversions._
import org.apache.spark.storage.StorageLevel

sc.hadoopConfiguration.set("mapred.max.split.size","100000000")

object PGNParser {val parser = new PGNParser()}

val rdd = sc.wholeTextFiles("s3n://tm-chess/*/*.pgn",1000)
.filter(_._1.endsWith(".pgn"))
.flatMap(_._2.split("\n\\[Event ")).map("[Event " + _)
.repartition(300)
.flatMap(PGNParser.parser.parseContents(_).getGame)

We can then convert the data to a more standard case class data format:

import scala.collection.JavaConversions._
case class Game(event: String,
                result: String,
                whiteElo: String,
                blackElo: String,
                eventDate: String,
                moves: Array[String])
val rdd2 = rdd.map(g => Game(g.getEvent, g.getResult,g.getWhiteElo, g.getBlackElo, g.getEventDate,
                 g.getHits.getHit.map(h=>h.getContent).toArray
                 )
        ).cache()

This can be used to compute the most popular winning opening move:

rdd2.filter(_.moves.length>=2)
.keyBy(g => (g.result,if(g.result =="1-0") {g.moves(0)} else {g.moves(1)}))
.mapValues(r => 1)
.reduceByKey((a,b)=>a+b)
.map(r=>(r._1._1,(r._1._2,r._2)))
.reduceByKey((a,b)=>if(a._2 > b._2) {a} else {b})
.map(r => (r._1,r._2._1,r._2._2))
.collect()

We can also use SparkSQL to compute the more elo rating based on winning or losing:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val rdd2 = rdd.map(g => (g.getEvent, g.getResult,g.getWhiteElo, g.getBlackElo, g.getEventDate,
                 g.getHits.getHit.map(h=>h.getContent).toList
                 )
        ).toDF("event","result","whiteElo","blackElo", "date","moves")
import org.apache.spark.sql.functions._
rdd2
  .groupBy("result")
  .agg(count("*").as("c"), avg("whiteElo"), avg("blackElo"))
  .where(col("c") > 10)
  .collect()

Conclusion

Spark won't beat an optimized command line or local tool however it can come very close. At the same time it provides a lot more flexibility in terms of the analyses you can do and the coding you can do without bringing additional tools to bear. The same code can be ran on a local "cluster" for testing and on a remote distributed cluster for production coding. Even if you never run it on a remote cluster Spark can be extremely useful for local data analysis and ETL work due to it's speed and easy to use APIs.

/