Apache Spark has proven an efficient and accessible platform for distributed computation. In some areas, it almost approaches the Holy Grail of making parallelization “automagic” — something we human programmers appreciate precisely because we are rarely good at it.
Nonetheless, although it is easy to get something to run on Spark, it is not always easy to tell whether it's running optimally, nor — if we get a sense that something isn't right — how to fix it.
For example, a classic Spark puzzler is the batch job that runs the same code on the same sort of cluster and similar data night after night … but every so often it seems to take much longer to finish. What could be going on?
In this article, I am going to show how to identify some common Spark issues the easy way: by looking at a particularly informative graphical report that is built into the Spark Web UI. The Web UI Stage Detail view is my go-to page for tuning and troubleshooting, and is also one of the most information-dense spots in the whole UI.
Let's briefly describe a “stage” and how to find the relevant UI screen. Then we'll look at the data in each part of that page.
In Apache Spark execution terminology, operations that physically move data in order to produce some result are called “jobs.” Some jobs are triggered by user API calls (so-called “Action” APIs, such as “.count” to count records). Other jobs live behind the scenes and are implicitly triggered — e.g., data schema inference requires Spark to physically inspect some data, hence it requires a job of its own.
Jobs are decomposed into “stages” by separating where a shuffle is required. The shuffle is essential to the “reduce” part of the parallel computation — it is the part that is not fully parallel but where we must, in general, move data in order to complete the current phase of computation. For example, to sort a distributed set of numbers, it's not enough to locally sort partitions of the data … sooner or later we need to impose a global ordering and that requires comparisons of data records from all over the cluster, necessitating a shuffle.
So, Spark's stages represent segments of work that run from data input (or data read from a previous shuffle) through a set of operations called tasks — one task per data partition — all the way to a data output or a write into a subsequent shuffle.
Start by opening a browser to the Spark Web UI.
Unless you already know the precise details of jobs and stages running on your Spark cluster, it's probably useful to navigate via the “Jobs” tab at the top of the UI, which provides a clear drill-down by job (rather than the “Stages” tab, which lists all stages but doesn't clearly distinguish them by job).
From the “Jobs” tab, you can locate the job you're interested in, and click its “Description” link to get to the Job Detail view, which lists all of the stages in your job, along with some useful stats.
We get to our final destination by clicking on a stage's “Description” link. This link leads to the Stage Detail view, which is the report we're analyzing today.
We'll look at major parts of this report, proceeding from top to bottom on the page.
One of my favorite parts of the Stage Detail view is initially hidden behind the “Event Timeline” dropdown. Click that dropdown link to get a large, colored timeline graph showing each of the tasks in the stage, plotted by start time (horizontally) and grouped by executor (vertically).
Within each task's colored bar — representing time — the full duration is further broken down via colored segments to show how the time was spent.
There is exactly one colored bar per task, so we can see how many tasks there are, and get a feel for whether there are too many or too few tasks. Since tasks are one-to-one with data partitions, this really helps us answer the question: How many partitions should I have?
More precisely: What would this graph look like if there are too few partitions (and tasks)?
In the most extreme case, we might see fewer tasks than an executor has cores — perhaps we have 40 cores across our cluster but see only 32 tasks. That is usually not what we want, and it's easy to identify and change.
But, more subtly, what if we have a larger number of tasks than we have cores but still too few for optimal performance. How would we recognize that situation?
Look at the right hand edge of the graph, and locate the last one or two tasks to complete. Those tasks are essentially limiting the progress of the job, because they have to complete before Spark can move on. Look at the timescale, and see whether the span between those tasks' end time and the previous few tasks' end time is significant (for example, hundreds of milliseconds or perhaps much more). What we're looking at is the period at the end of the stage when the cluster cores are underutilized. If this is substantial, it's an indicator of too few partitions/tasks, or of skew in data size, compute time, or both.
On the opposite end, we might have too many partitions, leading to too many tasks. How would this situation appear? Lots of very short tasks, dominated by time spent in non-compute activities
These Tasks come from a Stage with too many partitions. Notice how many of the tasks show only a little bit of green “Computing Time” … certainly less than 70%
The green color in the task indicates “Executor Computing Time” and we would ideally like this to make up at least 70% of the time spent on the task. If you see many tasks filled up with other colors, representing non-compute activities such as “Task Deserialization,” and only a small slice of green “Computing Time,” that is an indicator that you may have too many tasks/partitions or, equivalently, that the partitions are too small (data) or require too little work (compute) to be optimally efficient.
Note two gotchas about these timeline graphs in general:
Next on page we find the Summary Metrics, showing various metrics at the 0 (Min), 25th, 50th (Median), 75th, and 100th (Max) percentiles (among the tasks in the stage). More metrics can be revealed by selecting checkboxes hidden under “Show Additional Metrics” earlier on the page.
How do we make sense of this part of the report?
In a perfect world, where our computation was completely symmetric across tasks, we would see all of the statistics clustered tightly around the 50th percentile value. There would be minimal variance; the distance between 0 and 100% values would be small. In the real world, though, things don't always work out that way, but we can see how far off they are — and get an idea of why.
Suppose the 25%-75% spread isn't too wide but some Max metric figures are substantially higher then the corresponding 75% figures. That suggests a number of “straggler” tasks, taking too much time to compute (or triggering excess GC), and/or operating over partitions with larger skewed amounts of data.
On the other end, suppose that the distribution is reasonable, except that we have a bunch of Min values at or close to zero. That suggests we have empty (or near empty) partitions and/or tasks that aren't computing anything (our compute logic might not be the same for all sorts of records, so we could have large partitions whose tasks do no work).
Summary Metrics corresponding to the task timeline view (above) which had suggested skew. Note that the Max task took 10x the time and read about 10x the data of the 75th-percentile task. There is skew at the low end as well: Min time and data are zero, and 25th percentile data is around 1% of the median.
The next segment of the report is a set of summarized statistics, this time collected by executor.
How is this helpful? In theory, given the vicissitudes of scheduling on threads and across a network, we would expect that when running a job several times — or running a long job with many stages and tasks — we would see similar statistics across all our executors. There's that perfect world again. What could go wrong?
If we see one or more executors consistently showing worse metrics than most, it could indicate several possible situations:
All of those possibilities can be mitigated, and the report gives us hints about what to inspect so that we can do so.
The last section of the Stage Detail view is a grid containing a row for every single task in the stage. The data shown for each task is similar to the data shown in the graphical timeline, but includes the addition of a few fields such as data quantity read/written and — something not shown anywhere else — the specific data locality level at which each task ran.
Assuming you know something about where your data is located at this stage of the computation, the locality info will tell you whether tasks are generally being scheduled in a way that minimizes transporting data over the network.
Let's return by way of example to our mysterious scenario from the start of the article — a job that is in all respects similar each night, but every so often takes much longer to finish. Perhaps your Spark cluster coincides with your HDFS data “most of the time” but occasionally ends up getting launched with terrible proximity to the data block replicas it needs, and so it runs successfully but much slower on those occasions. Comparing the locality info in this part of the report to the observed behavior from other runs will give you an indication of what has happened.
Since a stage could easily have thousands of tasks, this is probably a good time to mention that the Spark UI data are also accessible through a REST API. So, if you want to monitor and plot locality in your stages, you don't need to read or scrape the thousands of task table rows.
Finally, a couple rules of thumb around partition sizing and task duration. Two of the most common questions in the Spark classes and workshops I teach for ProTech are: “How many partitions should I have? And how long should tasks execute?”
The proper answer is that it depends on so many variables — workload, cluster configuration, data sources — that it always requires hands-on tuning. However, new users are understandably desperate to have some a priori numbers to start with, so — purely as a bootstrapping mechanism — I suggest starting with code and configuration that causes each partition to contain 100-200MB of data and each task to take 50-200ms.
Those numbers are not a final goal, but once you have your app running, you can use the knowledge from this article to start to tune your partition counts and improve the speed and consistency of your Spark application.