Tracing the Breadcrumbs: Apache Spark Workload Diagnostics

Download Slides

Have you ever hit mysterious random process hangs, performance regressions, or OOM errors that leave barely any useful traces, yet hard or expensive to reproduce? No matter how tricky the bugs are, they always leave some breadcrumbs along the way. All you need is the skills, tools, and knowledge to trace them. At Databricks, millions of clusters consisting of tens of millions of instances are being launched every month to host our customers’ workloads. While being exciting, this is also a perfect environment for bugs to lurk in either our customers’ workloads or our own platform. This talk brings you some lessons and case studies we learned from real-life bug-hunting experiences.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Hi, welcome to our talk on Spark diagnostics.

So I’m Kris, and I’m a software engineer here at Databricks, working on Spark, mostly on Spark SQL related topics. So in the past, I’ve also worked on a couple of JVM implementations. I have with me Cheng, who is also a software engineer here at Databricks. He is a Spark PMC, and also a Parquet committer.

About us

Switching over to Cheng. – Okay, thanks Kris for the introduction. Hi everybody, this is Cheng. So, I’m super excited to be able to see all of you here. So Databricks provides a unified data analytics platform for accelerating innovation across data science, data engineering in business and analytics.

So, a lot of innovations are recently happening inside this Spark community. We are very excited to provide a solution for our customers around Apache Spark, Delta Lake, MLflow and projects like Koalas, to simplify everybody’s daily workflow. – Distributed applications are hard. You can hit all the issues of what happen local to a single node plus a lot more.

Distributed applications are hard

Distributed applications are inherently hard to develop, to tune and to diagnose. And then various factors contribute to this. When it comes to diagnostics, one of the biggest hurdles is that, useful information may be scattered around or simply lost. And you need to follow the breadcrumbs to gradually discover how the dots are connected.

At Databricks, millions of clusters consisting of tens of millions of instances are being launched every month to host our customer’s workloads. While it’s exciting, this is also a perfect environment for bugs to lurk, in either our customers or in our own platform. This talk brings you some of the lessons that we’ve learned and case studies from our real-life bug hunting experiences. – Thank you Kris. So one of the common issues of, you can hit is performance regression.

So performance regression shows some symptoms like a query runs slower when you switch to a newer Spark version or sometimes you are still using the same Spark version but there are some characteristics of your underlying data changed and you still observe some performance regression. And at this time, the most intuitive thing you can do is to check the Spark SQL query plan by running an explain command. So if you observe that the query plan changes before and after you switch on the Spark version or you switch the underlying data, then the regression may likely be from a Catalyst optimizer change or because the Catalyst optimizer tries to, choose a new plan according to the characteristics of your underlying data. This may change. The second case may happen when, for example, the data volume changes and the Catalyst decided to use a different kind of adjoins or are trying to change the adjoin order.

And if the query plan stayed the same before and after the change, then the regression could be coming from various sources, like query optimization and compilation delays or tasking scheduling delays or even network operations and the JVM GC issues.

So a few tools here and the methodology is that

we can use here as, firstly you may want to build a benchmark so that you can reproduce the performance regression and in this case, usually you want to minimize your benchmark, so that you can reproduce that as fast as possible. But sometimes, this is not feasible. Without it it’s still okay.

After you are having a benchmark, usually we want to attach a profiler to your benchmark to record some traces and providing data for our afterward analysis.

Case study

So here is a real case we hit during the Spark 2.4 development.

Inside Databricks, we have a benchmark team, who does a performance sign-off before Databricks’ Runtime releases. So Databricks’ Runtime is a propietary forecast that matches Spark inside Databrick so has a better performance and other extensions. So in the 5.0-beta version, we had found a significant performance regression versus DBR 4.3 and multiple TPC-DS queries were slower. One of the examples is query 67.

FlameGraph: DBR 4.3 (Spark 2.3-based)

So the tool we are using here is a Flame Graph, and Async profiler. So one of the key skill you might want to master while debugging a distributed application that you want to try to reduce the marginal distributed application issue into a single known issue, and the Async profiler is one of the such tools that you can leverage. So by recording, execution of the one executor site, we can sample the execution traces of the JVM process, and then we can produce such a Flame graph.

Usually, we want to record at least two Flame graphs, one is before the change and one is after the change.

FlameGraph: DBR 5.0-beta (Spark 2.4-SNAPSHOT-base

And you can see that these two Flame graphs changed slightly. And those changed parts are usually where the bugs are lurking.

Zoom in on the difference in hot spot

So now, here we are zooming to the difference in the hot spot and in above, the figure above the one produced from the DBR 4.3, which is the good version and the image below is the one produced from 5.0-beta. And you can see that there are some differences between these two images.

So the key difference here is that while doing an aggregation, the good version, DBR 4.3, is doing a hot loop calling monomorphic function. And in this way, we are avoiding extra buffer copies, while in the newer version, 5.0-beta, this code hasn’t got changed and is calling a polymorphic function instead and an extra buffer copy is introduced, so that the whole aggregation process is a lot slower.

Next, I’m gonna share another kind of issue that happens commonly, which is job hang.

So the symptom of a job hang is quite annoying. Sometimes, you can observe that one job finishes very fast for the first 90% or 95% of the whole progress. However, for the last five percent, maybe just one or two tasks, and it’s stuck there almost forever.

Tools and methodologies demonstrated

So here are some tools and methodology that you can leverage. First of all, thread dump from the Spark UI is always very useful, because when you are hitting a job hang, usually you’re hitting a data log or some threads are busy doing something not useful and in that case you can observe them from this thread dump. And also, there can be network issues and maybe your Spark job is actually contacting some remote resources or services. For example, reading from history, or Azure log storage, and the third tool is that sometimes you might need to do some JVM debugging, so that you can dig for some low-level details. And the last one which is very interesting is about a log exploration and visualization. And using this, we can often find some interesting insights about the whole distributed application from the overview.

Case study I

So the first case study, I’m gonna share is a shuffle fetch on the dead executor that causes entire cluster hung. Quoting from the report from our customer, saying that, the customer found there are a lot of exceptions in the Spark log. Something like an Apache timeout exception. We cannot receive any reply from some places in 120 seconds. And during about the same time, about a 10 minutes gap, when the exceptions happened, the entire cluster hung and the customer complained that, and this was a pretty large cluster. The cluster usage is extremely low.

Early triaging questions

And here are some early triaging questions.

When a cluster hung there, are there anything special happening before the cluster hang? And that thing might be the root cause of the hang. And when the cluster hung there, is there anything happening during the hung? So is it completely silent, or is it busy doing something? And during such cluster hungs, sometimes a data log happens, and executors are just being silent there. Or sometimes they are just busy doing something unuseful, and burning the CPU for no good.

So in order to answer these questions, we can use a few tools. And the first one is the Spark history server, where you can check event visualizations of executors or jobs, all the stages and tasks. And the second one inside Databricks, we the Spark logs in Delta Lake. So within Databricks, we have an ETL line that extracts from Spark logs into Delta tables for fast exploration. And of course, all these are with the consent of our customers. So interactive notebook environment is largely very essential to leverage in Delta Lake for interactive log exploration and visualization.

Spark History Server – Historical Spark Ul

So from the Spark’s UI, we can see that, one executor got removed right before the cluster hung, and also a new executor was added and that is also about the same time the cluster hang stopped.

Spark logs exploration and visualization

So one thing we want to try to see is that,

One thing we can try to determine, whether a note is active or not, is to check the logs this node is producing during a certain period. So here, we are checking the time period where, including the whole cluster hung and two minutes after the cluster hung. So, as you can see, using this query, we’re checking a specific time range, and we are only checking the Spark driver node, and we also aggregate on a per minute basis, to check the number of total log messages happening. And it turned out that during the whole cluster hang period, the driver is very quiet and is not producing a lot of logs.

Let’s do the same trick on the executor side, and you can see that during the cluster hang, there are some mild activities on the executor’s side, and what are they really doing? So let’s zoom into it.

So after zooming into the exact cluster hang period, we can try to explore the actual logs, and when you are exploring the logs, you will find that there are a lot of logs that you are not really caring about. For example, there might be some connection retry logs or there are some timeout errors or there’s some heartbeat messages.

During the exploration, you can gradually find those patterns and filter them out, so that you can find the actual Spark jobs, stages, or task activities. And after filtering all of them, we found that this whole query, actually, returns an empty result set, which means that no new tasks are scheduled during this time. But already scheduled tasks could be still running quietly here.

So now, let’s do the same log message account trick, just for executors during this hang period. And we can, in the visualization, we can clearly observe a pattern that repeated three times every 120 seconds. To compare this pattern with the actual log lines we can later find out that these are, actually, executor side shuffle connection timeout events.

So later investigation revealed that a very large stage consisting of 2,000 tasks happened to be occupying all the CPU cores during the so-called cluster hang period, which means that no new tasks could be scheduled. But all those 2,000 tasks, already been scheduled, are actually running there quietly and not producing any logs but they are burning the CPUs and doing a real work.

And this makes the cluster appear to be hanging. And secondly, after executor 29 got lost, other active executors tried to reconnect to the lost executors for three times every 120 seconds and these two numbers actually conform to the default values of the following two Spark configurations that are related to shuffle connection retries.

And in short, this whole behavior is totally expected. It’s just that the customer happened to hit a case that unfortunately, met in their maximum retry, nothing significant. And thus no signals, no significant signals got produced to indicate the activity of the cluster, and the customer thought that cluster hang there for no good.

Case study 2

– So, now let’s switch gears and look at a second case study. So in this case, a customer reported that a Spark SQL query had been stuck for multiple hours and they gave us permissions to do live debugging on the live running cluster. So through the Spark UI, we’ve been able to determine that the query was almost done and the only tasks that were still running were all in the final stage, writing out results.

Get thread dump from executor via Spark UI

So, now that we know that only a couple of tasks were still running, we can focus on one of them and find its executor and from the executor’s list, click on the thread dump button which will lead us to, the list of threads, and also their stack traces. So, by looking at the thread name, we can easily find out which thread is actually running our stack task, and in this case, it’s doing an I/O flush and it’s doing it asynchronously. And it’s pretty easy to find the other thread that’s actually doing that I/O operation in a thread pool. So, we can see at the bottom that this other thread is waiting on a socket read.

So, it got stuck for multiple hours, which leads us to thinking, “Are there any zombie connections involved?” So by running netstat -o, we can see that one of the connections had an established off state.

So, now that we know that there is a thread doing network I/O that got stuck and there is a zombie connection, are these two actually related? We need some hard evidence. So before that, we can use the command line HotSpot Debugger to debug the internal state of our JVM.

CLHSDB session example

So there are multiple ways we can introspect the JVM state. We can look at its heat, we can look at its threads, and the stack frames within each thread and usually, people will start with using the jstack or the pstack commands in CLHSDB, to figure out where they are. But in this case, we want more programmatic access to these stat frames. So what I’m using here is a jseval command, which allows me to run some JavaScript in a string That gives me very flexible access to the JVM state. So looking at the list of threads here, I can tell that the thread, in index four, is the thread that I’m looking for.

And then, I can run another command that allows me to introspect all the stat frames of this thread four, and I can see that socket read here.

And then I wanna know what is the state of that socket input stream. So by running this other, very complicated command, I can introspect into the inner guts of this object and I can see that there is another indirection for the implementation of the actual socket.

And now, here it is. So I found the SocksSocketImpl object that tells me the local port is 36332, which matches the port that we saw from the netstat output. And the other interesting thing from this object is that, it shows that the timeout is set to zero. And when you have a timeout equal to zero, plus there is no keep-alive configure for this connection, it would just hang there forever, if the service line never responds.

GDB example

And, we can also cross check with GDB. So turns out, modern OpenJK implementation, has this very nice integration with GDB, and then, you can actually see the Java stack along with the native stack, just from GDB itself. So, by running GDB, and attaching to our target Java process, we can run t a a bt, which is short for thread all apply backtrace, to see all the stat traces for all the threads in that process. And again, we can find our thread, and see this interesting bit at the bottom right, which tells us, yes, this socket read function was caught with timeout zero and that leads us to further investigation down into the actual underlying issue. But this is the very breakthrough that we needed to get to like solving the puzzle.

So to recap, what we did was we used the Spark UI to identify at what stage a query is running, and then which tasks are still running. So that gets us through the distributed part, and down into the single note part. And then, we can still use the Spark UI to get a thread dump on the executor that’s running one of the stuck tasks, to get an idea of what it’s doing, and when the tasks seemed to be stuck on network I/O, we can use netstat to get information on whether or not there are connections in weird state. And then, to get an idea of what the JVM state is, beyond what simple thread stacks tell us. So for instance, if we need to know what values some certain field of some certain object instance is, we can use various tools and one of them that we showcase here is the CLHSDB tool that comes with OpenJDK. You can also use jhsdb, which is the officially the supported version since JDK9. And we can also cross check our findings using GDB as well. So in this case, the particular bug was actually caused by a bug inside of the JDK and we were able to pinpoint that, and then submit that to OpenJDK for further fixing. All right, that’s the end of our talk, and thank you so much for joining this talk. Now, it’s time for a Q and A.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Cheng Lian

Databricks

Cheng got in touch with Spark since late 2013 and joined Databricks in early 2014 as one of the main developers behind Spark SQL. Now he's a committer of Apache Spark and Apache Parquet. His current areas of interest include databases and programming languages.

About Kris Mok

Databricks

Kris Mok is a software engineer at Databricks. He works on various components of Spark SQL, with interest on optimizer and code generation. Previously, he worked on JVM implementations, including OpenJDK HotSpot VM at Alibaba and Oracle and Zing VM at Azul, and had broad interest in programming language design and implementation.