6 Steps to Get Top Performance in Apache Spark 2.0



One thing that makes working with Apache Spark so exciting is that the platform doesn't stand still. In fact, that's an understatement: It is fair to say that Spark's architecture and usage best practices are nearly unrecognizable from just 18 months ago.

There are inevitably “headline features” when a new version of Spark comes out, but in this article I'll take you behind those headlines to show you some key performance patterns and anti-patterns that will help you get the most out of Spark 2.0.

 

Step 0: Ditch RDDs; Learn and use DataFrame and Dataset

This is “Step 0” because it's not really new. But if you've been putting it off, it's time to make some changes. DataFrame (and its more general cousin, Dataset) are the correct ways to program for Apache Spark. More than just a new, easier API — >DataFrame/Dataset are the recipients of all of the optimization and performance boosts in Spark over the past two or so years.

As a result, new code should leverage best practices on this API; older RDD-based code should be opportunistically migrated: All of the amazing performance work you read about online and hear about at conferences generally applies only to DataFrame/Dataset. Your legacy RDD code will not take advantage of any of this recent R&D.

That said, these APIs are easy to use but a little trickier to understand deeply, since they allow Spark to optimize and rewrite your operations. Which brings us to:

 

Step 1: Learn the Basics of Catalyst

What is Catalyst? Catalyst is the name of Spark's integral query optimizer and execution planner for Dataset/DataFrame.

Catalyst is where most of the “magic” happens to improve the execution speed of your code. But in any complex system, “magic” is unfortunately not good enough to always guarantee optimal performance. Just as with relational databases, it is valuable to learn a bit about exactly how the optimizer works in order to understand its planning and tune your applications.

In particular, Catalyst can perform sophisticated refactors of complex queries. However, almost all of its optimizations are qualitative and rule-based rather than quantitative and statistics-based. For example, Spark knows how and when to do things like combine filters, or move filters before joins. Spark 2.0 even allows you to define, add, and test out your own additional optimization rules at runtime. [1][2]

On the other hand, Catalyst is not designed to perform many of the common optimizations that RDBMSs have performed for decades, and that takes some understanding and getting used to.

For example, Spark doesn't “own” any storage, so it does not build on-disk indexes, B-Trees, etc. (although its parquet file support, if used well, can get you some related features). Spark has been optimized for the volume, variety, etc. of big data — so, traditionally, it has not been designed to maintain and use statistics about a stable dataset. E.g., where an RDBMS might know that a specific filter will eliminate most records, and apply it early in the query, Spark 2.0 does not know this fact and won't perform that optimization.

 

Step 2: Load your Data Better

When loading data with Spark 2.0, try to leverage the DataFrame/Dataset sources exposed by DataFrameReader. These classes are optimized to support Spark's newest high-performance infrastructure, such as binary, language-neutral encoding. Although you may have legacy code that produces RDDs, which you then convert to DataFrames, the RDD “scans” will likely not be as performant as using a proper source, which can support optimizations like column pruning and predicate pushdown.

If you can't use one of the built-in or third-party pluggable sources, it's easy to write your own using the official DataSource API or the less-official, but straight-to-the-metal FileFormat class. [3][4]

If you are using one of the built-in sources, such as the JSON reader or the CSV reader, which is now bundled directly with Spark 2.0, be wary of clever features like “schema inference.” For exploratory analysis, or a quick hacking session, these sources can be helpful for attempting to figure out the schema of your data by reading the actual records. In production, however, these inference operations can be expensive (even if you tell Spark to merely sample your data). If your business records have a known schema or format, go ahead and tell Spark explicitly what that is (using the DataFrameReader.schema method). The time saved may be substantial or — in the case of streaming applications — critical.

In the past, Spark had a simple and consistent mechanism for turning block-based storage (such as HDFS data) into partitions of data for computation. Spark 2.0 Dataset uses a new, more sophisticated mechanism that accounts for the number of cores your cluster has available, the quantity of data, and the estimated “cost” of opening additional files to read. Why do these details matter? Because, in general, a Spark 2.0 read will produce a different number of partitions as compared to Spark 1.x. Since partitions correspond to blocks for caching and tasks for computation, and since proper-sized tasks are Spark's mechanism for load balancing, failure recovery, and straggler mitigation, this subtle change will affect a number of behaviors in your programs, including overall cluster utilization.

 

Step 3: To Go Fast, Keep Your Foot Off the Java Roundtrip Brake

Spark Dataset is an API that offers the performance and infrastructure benefits of DataFrame, while allowing you to access records as if they were instances of your own custom Scala types, as well as to apply a bit of traditional functional programming in situations where the DataFrame table-style operators won't cut it.

While Dataset can perform just as fast as DataFrame, it opens up a number of opportunities to accidentally “de-optimize” your program by disabling some of Spark's optimizations.

First, don't use the experimental version of Dataset that debuted in Spark 1.6. The details are beyond the scope of this article, but if you're interested in Dataset, you need to be using the version released with Spark 2.0.

Once you start using Datasets, you will notice that they can be accessed just like DataFrames/tables: Spark automatically maps record field names to columns. So a lot of API options open up: you can filter using a lambda (reminiscent of filtering an RDD) or a column (expression). For a select/map operation, you have lambda, column or user-defined function (UDF) at your disposal.

However, these approaches do not optimize the same way. For example, filtering with a column performs better than with a lambda because you're explicitly telling Spark which attributes and conditions are involved in the filter (so that Spark can access just those fields and may not need to create any Java representation of your record at all, comparing encoded values directly). In contrast, the same filter written as a lambda is “opaque” to Spark. All Spark knows is that you need a (whole) record marshaled as a Scala object in order to return true or false, requiring Spark to do a lot more work to meet that implicit requirement.

Learn to read the query planner output and code generation hints as well. You've probably heard by now about Spark 2.0's support for whole-stage codegen, a technique to create faster purpose-built code at runtime. Note that — for now at least — some Dataset lambda operations like map and filter can be inlined within this compiled code (albeit subject to the inefficiency described above). Whereas others, like flatMap, “break” the whole-stage codegen into two smaller codegen sections, with a MapPartitions operation in between. The query planner and codegen output will show you whether and where this is occurring.

Finally, if you've seen a GroupedData object when doing a DataFrame group, then you know how that API provided “safety guardrails” and implicitly tried to prevent the RDD groupBy and groupByKey errors which once dominated the Spark mailing list with tales of Executor OOMs. But beware — Dataset allows you to take those protections off: you can trivially write a groupByKey + flatMapGroups pattern that recreates the classic RDD groupByKey data shaping if you need it. The likely expensive shuffle is not magically obviated. You need to consider your goals and methods carefully.

 

Step 4: Invite Your SQL Friends to the Spark Party

For some time, it has been possible to write DataFrame logic flows using pure SQL. But because of some bugs, multistep operations could be difficult and convoluted to write.

In Spark 2.0, a new parser — which supports HiveQL and ANSI SQL:2003 — allows you to create intermediate, zero-weight DataFrame variables through the familiar CREATE OR REPLACE TEMPORARY VIEW myData AS SELECT … syntax.

Subsequent steps can CREATE TEMPORARY VIEW nextData in relation to myData, and so on, allowing multistep flows to be authored clearly and simply, one step at a time. The temporary table objects are just symbols that represent a query, identical to a DataFrame/Dataset object handle.

And, as before, the queries don't need to be human-generated. Tools like Tableau can interface directly via JDBC/ODBC.

 

Step 5: Leverage Improved (and Improving) Stats Support

Last, Spark 2.0 is taking some steps toward more realistic calculation of Dataset size via query statistics. For example…

spark.range(10).queryExecution.logical.statistics.sizeInBytes

… returns 80

whereas in Spark 1.6…

sqlContext.range(10).queryExecution.logical.statistics.sizeInBytes

… returns 10485761 (i.e., 10 MB + 1 byte)

Why do we care? Because 10485761 is 1 byte over the default value of spark.sql.autoBroadcastJoinThreshold, the config limit for automatic use of a broadcast (hash) join, a mechanism that can be vastly more performant than the default distributed sort-merge join.

Although we started out saying that Spark, optimized for the variety and volume of “big data” doesn't care too much about stats, they can clearly make a difference.

Let's wrap up with a related example of how learning the cutting edge of open source Spark can help you optimize your work. At Spark Summit (SF) 2016, a group of engineers presented research on collecting more extensive statistics for Spark SQL data sets, and used those statistics to optimize queries. Work is now in progress to implement these optimizations in the core Spark codebase, and may appear as a significant new feature in Spark 2.1. Read the design doc and follow the progress at https://issues.apache.org/jira/browse/SPARK-16026

 

Takeaways

As a user — or prospective user — of Apache Spark, you are hopefully impressed by the power, elegance, and relative simplicity of the platform. At the same time, throughout this article I have tried to point out some of the subtler, more technical aspects of operating Spark to help you get the best performance out of the latest version of the platform.

As with any precision-built high-performance machine the best results come not just from the machine alone, but from placing a trained driver with modest knowledge of its inner workings at the helm. That is the sort of knowledge ProTech aims to provide in our Programming for Spark 2.0 and Spark 2.0 for Machine Learning and Data Science courses, which we enjoy sharing in a forum like this, and which we encourage you to check out today to improve your Spark applications.

 

About the Author:

Adam Breindel is the Apache Spark technical lead at ProTech. He is the author of ProTech's Apache Spark 2.0 training courses, has trained enterprise development teams around the globe, and frequently speaks at conferences and tech events on Apache Spark.

 

This article was originally published at data-informed.com on October 10, 2016.

 

References:

  1. http://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/
  2. http://stackoverflow.com/questions/36152173/how-to-extend-spark-catalyst-optimizer-with-custom-rules
  3. See the CSV reader code as an example of implementing BaseRelation + DefaultSource:
    https://github.com/databricks/spark-csv
  4. Laskowski has a demo extending FileFormat at:
    https://github.com/jaceklaskowski/spark-workshop/tree/gh-pages/solutions/spark-mf-format
Published October 10, 2016