Apache Spark: DataFrames and RDDs

Apache Spark DataFrames have existed for over three years in one form or another. They provide Spark with much more insight into the data types it's working on and as a result allow for significantly better optimizations compared to the original RDD APIs. Furthermore, RDD functionality while not deprecated is no longer receiving updates and it's suggested that users migrate to DataFrames/DataSets.

In this post we'll will look in more detail at how DataFrames compare to RDDs in terms of features and performance, and see if the claims about them are true.

What are RDDs

Resilient Distributed Datasets (RDDs) are the core abstraction behind Spark and come from the original Spark paper. RDDs are an immutable fault-tolerant distributed memory based abstraction over data that allows for complex in-memory computations along with optional disk caching. Within the contraints of Spark they allow for the most flexible data manipulation and storage paradigms. There is a cost to this power, since the data stored is arbitrary and Spark has no usable knowledge of it's format the automatic optimization it can leverage are limited. Likewise, the Python implementation can only use the JVM to store opaque binary blobs which means all computations must be done on the slower Python side (and require serialization/deserialization to/from Python).

rdd = sc.textFile("/data/pagecounts-rrd") \
  .map(lambda s: s.split(" ")) \
  .map(lambda s: (s[1], int(s[2]))) \
  .keyBy(lambda r: r[0][0:10]) \
  .mapValues(lambda r: r[1])

grouped = rdd.reduceByKey(lambda a, b: a + b)

grouped.join(rdd) \
  .filter(lambda r: r[1][0] != r[1][1]) \
  .count()

What are DataFrames

DataFrames are a newer abstration of data within Spark and are a structured abstration (akin to SQL tables). Unlike RDDs they are stored in a column based fashion in memory which allows for various optimizations (vectorization, columnar compression, off-heap storage, etc.). Their schema is fairly robust allowing for arbitrary nested data structures (ie: a column can be a list of structs which are composed of other structs). The fact that Spark is aware of the structure of the underlying data allows for non-JVM languages (Python, R) to leverage functions written in the faster native JVM implementation. One of the issues with DataFrames however is that they are only runtime and not compile time type safe which for a language like Scala introduces a severe drawback.

df = spark.read.load("/data/pagecounts-parquet") \
  .withColumn("pagename", substring(col("pagename"), 0, 10))

grouped = df. \
  groupBy("pagename"). \
  sum("pageviews")

grouped.join(df, "pagename") \
  .filter(col("sum(pageviews)") != col("pageviews")) \
  .count()

DataSets

DataSets are an even newer abstraction and can be thought of as a typed DataFrames. In fact in Scala a DataFrame is simply a DataSet with Row elements. They exist only within Java and Scala, and allow for a compile time type-safe version of DataFrames. I mention them for completeness sake but will otherwise not delve into them.

Performance

To test some of the performance differences between RDDs and Dataframes I'm going to use a cluster of two m4.large nodes running the 4.0 Runtime (Spark 2.3) on Databricks Cloud. The reference dataset will be wikipedia page views, and I'll be doing a mix of aggregations, joins and UDF operations on it. I'm going to compare the results across different ways or running code on both RDDs and DataFrames. You can see the set of operations being performed below:

df = spark.read.load("/data/pagecounts-parquet") \
  .withColumn("pagename", substring(col("pagename"), 0, 10))

grouped = df. \
  groupBy("pagename"). \
  sum("pageviews")

grouped.join(df, "pagename") \
  .filter(col("sum(pageviews)") != col("pageviews")) \
  .count()

Admittedly this isn't a complete test suite however hopefully even a simple test like this gleans some insights. I'm going to test the following dimensions:

  • Scala vs. Python: Spark is natively written in Scala and the Python interface requires data conversion to/from the JVM. Furthermore, Python as a language is slower than Scala resulting in slower performence if any Python functions are used (as UDFs for example).
  • RDD vs. DataFrame from CSV vs. DataFrame from Parquet: Parquet is a column oriented file storage format which Spark has native support for. It allows for an optimized way to create DataFrames from on disk files. As a note, the Spark CSV reader is bugged and has no way to not create NULLs for empty string columns.
  • UDF vs. Vectorized UDF (Python Only) vs. Native DataFrame Function: As mentioned before, Python is slow and UDFs defined in it are likewise slow. Vectorized UDFs are a new Spark feature that passes a Pandas Series or DataFrame into a Python UDF which significantly lowers the number of function calls and allows for native pandas functions to be used as well.
 

Results

Test Running Time
Scala RDD 21s
Scala DataFrame from CSV 32s
Scala DataFrame from Parquet 20s
Scala DataFrame from Parquet with Native UDFs 18s
Python RDD 81s
Python DataFrame from CSV 80s
Python DataFrame from Parquet 57s
Python DataFrame from Parquet with Vectorized UDFs 40s
Python DataFrame from Parquet with Native UDFs 17s
Screen Shot 2018-03-17 at 5.27.44 PM.png

(Unscientific) Conclusions

  • Scala is fast, for relatively simple queries DataFrames aren't needed and don't provide too much of a performance benefit.
  • Python is slow, for RDDs needing to use native Python functions for most everything causes significant performance slow downs. As a result DataFrames provide a nice performance boost by allowing for more native code to be used.
  • Use Parquet where possible, the CSV reader library isn't very fast and as mentioned before has a bug relating to empty strings
  • In Python use native function first, then vectorized UDFs and only then regular UDFs. If performance is critical consider writing your UDFs in Scala and then calling them from Python.
  • In Scala do whatever, although the built in functions have a slight performance benefit.
  • We did not test the pushdown partition pruning which DataFrames allow which would increase their performance on data sets and queries amenable to it.

If you liked this post be sure to follow usreach out on Twitter, or comment.

/