Apache Spark: Scala vs. Java v. Python vs. R vs. SQL

Apache Spark is an open source distributed computing platform released in 2010 by Berkeley's AMPLab. It has since become one of the core technologies used for large scale data processing. One of its selling point is the cross-language API that allows you to write Spark code in Scala, Java, Python, R or SQL (with others supported unofficially). However not all language APIs are created equal and in this post we'll look at the differences from both a syntax and performance point of view. As a note, this post focused on the DataFrame/DataSet APIs rather than the now deprecated RDD APIs.

Syntax

Scala

Spark is written in Scala and as a result Scala is the de-facto API interface for Spark. Scala is the only language that supports the typed Dataset functionality and, along with Java, allows one to write proper UDAFs (User Defined Aggregation Functions).

def uppercase = udf((string: String) => string.toUpperCase())

df.
  where($"taxonomy".isin("animal","plant")).
  select(uppercase($"name"),$"legs").
  groupBy("name").
  agg(mean($"legs").alias("legs"))

Java

Scala is somewhat interoperable with Java and the Spark team has made sure to bridge the remaining gaps.The limitations of Java mean that the APIs aren't always as concise as in Scala however that has improved since Java 8's lambda support. Furthermore, the Dataset API is not available and interactive notebook environments do not support Java. One definite upside of Java support is that other JVM languages such as Kotlin can use it to run Spark seamlessly.

spark.udf().register("uppercase", (String string) -> string.toUpperCase(), DataTypes.StringType);

df.
    where(col("taxonomy").isin("animal","plant")).
    select(callUDF("uppercase", col("name")).alias("name),col("legs")).
    groupBy("name").
    agg(mean(col("legs")).alias("legs"));

Python

Python is one of the de-facto languages of Data Science and as a result a lot of effort has gone into making Spark work seamlessly with Python despite being on the JVM. The Python API, however, is not very pythonic and instead is a very close clone of the Scala API. Since Spark 2.3 there is experimental support for Vectorized UDFs which leverage Apache Arrow to increase the performance of UDFs written in Python. As a note, Vectorized UDFs have many limitations including what types can be returned and the potential for out of memory errors.

@udf(StringType())
def uppercase(string):
  return string.upper()

df. \
  where(col("taxonomy").isin("animal","plant")). \
  select(uppercase(col("name")).alias("name),col("legs")). \
  groupBy("name"). \
  agg(mean(col("legs")).alias("legs"))

R

Support for R is relatively new and in the past support for various APIs has lagged behind Scala/Python however there is now relatively parity. The R API is also idiomatic R rather than a clone of the Scala API as in Python which makes it a lower barrier to entry for existing R users. Since Spark 2.3 the new Structured Streaming API is available in R which finally allows for stream processing support.

schema <- structType(
  structField("name", "string"), 
  structField("legs", "int"))

uppercase <- function(d) {
  d$name = sapply(d$name, toupper)
  d
}

df.filtered = df[df$taxonomy %in% c("animal","plant"), c("name", "legs")]

df.upper = dapply(df.filtered, uppercase, schema)

    agg(groupBy(df.upper, "name"), "legs" -> "mean")

SQL

Spark is capable of running SQL commands and is generally compatible with the Hive SQL syntax (including UDFs). One nice feature is that you can write custom SQL UDFs in Scala, Java, Python or R. Given how closely the DataFrame API matches up with SQL it's easy to switch between SQL and non-SQL APIs.

select uppercase(name) as name, mean(legs) as legs
from df
where taxonomy in ('animal','plant')
group by uppercase(name)

Performance

The DataFrame interface abstracts away most performance differences so in comparing performance we'll be focusing on custom UDFs. To remove the impact of disk reads all input DataFrames were cached. Two types of UDFs will be compared:

  • UDFs that take in a single value and return a single value
  • UDFs which take in all the rows for a group and return back a subset of those rows

The tests were done using the following:

  • 2016 15" Macbook Pro 2.6ghz 16gb ram (4 cores, 8 with hyperthreading)
  • Spark 2.3.0
  • Python 3.5.3
  • PyPy3.5 6.0.0
  • Scala 2.11.8
  • Wikipedia ClickStream data from April 2018 (available here: https://dumps.wikimedia.org/other/clickstream/)

All the code is available on Github here.

Simple UDFs

Overview

Two relatively simple custom UDFs were compared:

  • String: Takes a string column and selects the first 5 characters
  • Numeric: Takes an int, caps it at 10 and takes the log
  • NoOp: Takes an int and returns it unmodified

In each case a where clause and a count are used to bypass any optimizations which might result in the full table not being processed.

Approaches

The following approaches were tested:

  • Native/SQL: There are many built in UDFs for DataFrames and presumably they are optimized for performance. For these experiments I purposefully chose UDFs that can be represented in native Spark SQL, to provider a benchmark, however in real life that may not always be the case.
  • Scala/Java: Spark is written in Scala and runs on the JVM however DataFrames are a custom columnar abstraction so performance is not necesarilly guranteed.
  • Scala, DataSet: The DataSet API provider a type safe way to working with DataFrames within Scala.
  • Python: Spark is written in Scala and support for Python is achieved by serializing/deserializing data between a Python worker process and the main Spark JVM process. This incurs overhead in the serialization on top of the usual overhead of using Python.
  • Python, Vectorized UDFs: Vectorized UDFs as a new feature in Spark leverage Apache Arrow to quickly serialize/deserialize data from Spark into Python in batches. Instead of running a UDF on a single input value you run it on a column of data which corresponds to some subset of the original column.
  • Python, PyPy: PyPy is an optimized JIT based runtime for python which allows for faster code execution than regular Python. One unfortunate side effect of this is that it breaks low level API compatibility with regular Python which means that not all libraries will work with it. Spark, however, is PyPY compatible and every release is tested to ensure it remains so.
  • R: Like Python, the R support uses serialization to/from a R worker process. It's UDF methods are more limited and require passing in all the columns of the DataFrame into the UDF. To test the impact of this a test was done with only a single column being passed in although this limits the power of the UDF functionality as other columns as effectively dropped.

Results

Native/SQL Scala/Java Scala DataSet Python Python PyPy Python Vectorized R
NoOp 0.318s 0.419s 3.169s 7.52s 10.01s 3.30s 2880s
Numeric 0.448s 0.435 3.048 30.3s 10.8s 3.37s 6144s
String 1.06s 1.66s 3.75s 13.4s 16.9s 8.90s >10000s
Simply UDF Perf.png

Some takeaways from the results:

  • Native/SQL is generally the fastest as it has the most optimized code
  • Scala/Java does very well, narrowly beating SQL for the numeric UDF
  • The Scala DataSet API has some overhead however it's not large
  • Python is slow and while the vectorized UDF alleviates some of this there is still a large gap compared to Scala or SQL
  • PyPy had mixed results, slowing down the string UDF but speeding up the Numeric UDF. Given the NoOp results this seems to be caused by some slowness in the Spark-PyPy interface. However, it did worse than the Vectorized UDF and given the hassle of setting up PyPy (it's not supported out of the box by cloud Spark providers) it's likely not worth the effort.
  • R is very very slow to the point where I gave up on trying to time the string method. It's need to serialize all columns for it's apply method is likely to be partially at fault for this.

Group UDFs

Overview

While a simple UDF that takes in a set of columns and outputs a new column is often enough there are cases where more functionality is needed. One example, is taking in the results of a group by and for each group returning one or more rows of results. In other words a variant of a UDAF or UDTF.

Once again we are performing a String and a Numeric computation:

  • String: For each first five characters in to column, find the top 3 most popular from based on their first 5 characters. Then return all rows matching those entries.
  • Numeric: Compute the mean and std of the clicks for each first five characters in to value and then, if the std is above some threshold, standardize all the click values for that group.
  • NoOp: Group by the first five character of to and then return back all the rows unmodified.

Approaches

The following approaches were tested:

  • Native/SQL: Using joins and window functions the same functionality as the UDFs can be achieved however these are expensive operations to perform within Spark.
  • Scala/Java, collect_list: It's possible to do a collect_list(struct("*") aggregation for each group which generates a list of all the rows for each group. This list can then be passed into a UDF.
  • Scala/Java, RDD: It's possible to convert a DataFrame into a RDD which in turns has a method for applying a method to a group.
  • Python, collect_list: The same basic approach as Scala except the UDF will be written in Python.
  • Python, Vectorized UDF: The new Vectorized UDF supports taking all the rows of a group as a Pandas DataFrame and outputting another Pandas DataFrame.

The following were not tested:

  • Scala, UDAF: Given that we are working with the whole set of rows for each group a custom UDAF would simply replicate the collect_liost approach so it was not tested.
  • R: Given the performance of R for the simple UDF tests it didn't seem worth testing it further.

Results

Test Native/SQL Python Python PyPy Python Vectorized Scala RDD Scala/Java
NoOp 110s 139s 430s 62.9s 33.8s
Numeric 27.3s 163s 171s 493s 72.2s 33.6s
String 59.1s 112s 124s 787s 63.9s 34.8s
Group UDF PErf.png

Some takeaways from the results:

  • Scala/Java, again, performs the best although the Native/SQL Numeric approach beat it (likely because the join and group by both used the same key).
  • RDD conversion has a relatively high cost.
  • PyPy performs worse than regular Python across the board likely driven by Spark-PyPy overhead (given the NoOp results).
  • The Python Vectorized UDF performed significantly worse than expected. I'm not sure if I used it incorrectly or if the relatively small size of each group just didn't play top it's strength. I've verified that a no-op UDF (that simply returns it's input DataFrame) takes over 400s to run on my laptop and on the Databricks cloud the results were similarly slow. I also hit some out of memory issues while running the code which eventually went away.

Conclusions

  • If performance matters use either native UDFs, Scala or Java
  • Avoid custom UDFs in R if at all possible
  • PyPy comes with some overhead and doesn't necessarily improve performance
  • Vectorized UDFs are promising (SCALAR at least) but still lag quite a bit behind Scala in performance

If you liked this post be sure to follow usreach out on Twitter, or comment.

/