Apache Spark has the ‘speculative execution’ feature to handle the slow tasks in a stage due to environment issues like slow network, disk etc. If one task is running slowly in a stage, Spark driver can launch a speculation task for it on a different host. Between the regular task and its speculation task, Spark system will later take the result from the first successfully completed task and kill the slower one.
When we first enabled the speculation feature for all Spark applications by default on a large cluster of 10K+ nodes at LinkedIn, we observed that the default values set for Spark’s speculation configuration parameters did not work well for LinkedIn’s batch jobs. For example, the system launched too many fruitless speculation tasks (i.e. tasks that were killed later). Besides, the speculation tasks did not help shorten the shuffle stages. In order to reduce the number of fruitless speculation tasks, we tried to find out the root cause, enhanced Spark engine, and tuned the speculation parameters carefully. We analyzed the number of speculation tasks launched, number of fruitful versus fruitless speculation tasks, and their corresponding cpu-memory resource consumption in terms of gigabytes-hours. We were able to reduce the average job response times by 13%, decrease the standard deviation of job elapsed times by 40%, and lower total resource consumption by 24% in a heavily utilized multi-tenant environment on a large cluster. In this talk, we will share our experience on enabling the speculative execution to achieve good job elapsed time reduction at the same time keeping a minimal overhead.
Venkata Sowrira…: Hello, everyone this is Venkata and I have my colleague, Ron, with me for this talk. Today, we are going to present a talk title, Best Practices for Enabling Speculative Execution on Large Scale Platforms. All right. Let’s take a look at the agenda. Today’s agenda we’ll start with background and motivation, talking about some of the issues with enabling speculative execution on a large scale platform. Then we discuss about the enhancements we’ve made to Spark engine with respect to speculative execution. Configuring that with speculative execution. In the second half of the talk, Ron, would talk about speculation metrics and the analysis done before and after enabling speculative execution with the case study. Then we will share some of the learnings we got in the process of enabling speculative execution with user guidance, and finally controlling with the feature work and summary.
Okay. Let’s quickly do a quick primer on speculative execution. For those of us who are not about our speculative execution within Spark. Spark achieves [inaudible] by running tasks in parallel as part of a stage. Within this case, when there are tasks, which run significantly slower than the other similar tasks, Spark launches are duplicated execution of the task on a different host as part of speculative execution. Between the two instances of the same task executions, whichever one that finishes first that can be used for the research. And the other one is killed. This can be further conformed by checking this case study in Spark. Now that we talked about speculative execution. Let’s quickly look at some of the tuning knobs available for tuning. Spark speculation conflict is used to enable or disable speculative execution of a Spark application.
Spark speculation interval conflict determines the frequency at which to check tasks for speculation. The next two conflicts are the key conflicts for speculative execution. Spark speculation multiplayer on Spark speculation quantile, spark speculation multiplayer tunes the aggressiveness of speculative execution within a stage. How many times are task is slower than the median in order to be considered for a speculation. Spark speculation quantile determines the fraction of tasks that must be completed before speculation would kick in for a stage.
Let us now look at the motivation of the talk. All those speculative execution speeds up straggler tasks, but adds additionally overhead. So that doesn’t need to strike a balance between the team. Spark speculative execution default conflicts are generally aggressive especially for the bad tabs, also speculating tasks which are run for few seconds can be wasteful. So in order to avoid this, we also to investigate impact of data skews, overhead shuffle services in the context of speculation. Also impact of enabling speculative execution by default in a multi-tenant large scale cluster needs to be that early study. Now that we have seen the motivation for the talk, let us see some of the improvements we have done to Spark itself in the context of speculative execution. Tasks run for a few seconds, getting speculated this result is unnecessarily. In order to prevent a task that run for a few seconds getting speculated, we internally introduced the new Spark conflict, Spark speculation minRuntime Threshold to prevent tasks from getting speculated before running for a minute threshold time. Currently this is set to 30 seconds in our environment, based on our experiments.
Also similar changes added in Apache Spark later as part of SPARK-33741. Currently there are no more metrics available for speculative execution from Spark. Without enough metrics, it is hard to understand the order or outcome of speculative execution. In order to understand usefulness and overhead by [inaudible] the existing AppStatusListener specifically at the TaskStart in the event. Now that we see many additional metrics with the speculative execution. Let us see what are the additional metrics we have added for speculative execution here. Something like the regular stage level metrics, we have added metrics for speculative execution. So some of the metrics are total number of speculated tasks for a stage, number of successfully speculated tasks for the stage, number of killed and failed speculator tasks for the stage. Now with the newly added metrics we can get a speculation summary in for each stage under the case study as for Spark, which we can see from the screenshot. These metrics helps in further tuning the conflicts whenever needed.
Unfortunately, the default speculation parameters are often too aggressive and useful. LinkedIn’s Spark jobs are mainly offering bad jobs, which means the speculation parameters can be on the concentrated side. Let us see the tune speculation parameters for LinkedIn’s platform. So in our platform. We have speculation multiplier to four, compared to the OS’ 1.5. From our experiments, having speculation multiplier set to 1.5 often leading to premature speculation, leading to [inaudible]. Also we have Spark speculation quantile 0.9, compared to OS’ 1.5.
This is needed because it’s the last 10% of the tasks, which are taking longer than the set of tasks. Also Spark speculation minThreshold is set to 30 seconds as we have seen previously. Meaning that tasks that run for less than 30 seconds, one gets speculated. So now that minThreshold is set to 30 seconds. We can also relax our Spark speculation interval to one second, like we are not speculating tasks that runs at least for 30 seconds. No Ron will further take you through the analysis of the metrics as part of the case study as well as at the platform level. On to you Ron.
Ron: Thank you, Venkata. Now I am going to show you the speculation metrics data we conducted. I will also share with you our performance analysis we care about ROI, return on investment. We want to know the degree of the return or performance gain. For a given performance gain. We also want to know that the degree of investment or the additional overhead, and we measure various metrics for one week on a very large cluster with more than 10,000 machines. Basically our environment is a multi-tenant environment with more than 40,000 Spark applications running daily. And we enable dynamic allocations and the resource sharing and the contention, the job performance varies due to some changing delays or congestion.
Now, let’s look at statistics at task level and for a one week period. And we look at the ratio of all the launch speculation task or the task. By the way, as Venkata mentioned earlier, we set speculation in threshold to 30 seconds. So we will consider those tasks with duration greater than 30 seconds for speculation. And then now let’s look at that duration. The ratio of duration of speculation task over duration of all the regular tasks is just a small percentage in this 0.32%. And also for one week period, we launched a total of 2.73 million speculation task. And now I want to introduce this concept of fruitful speculation task. Our speculation task is fruitful if it can finish earlier than the corresponding regular task. And here we found out that if we divide the total number of fruitful tasks over all the speculation task launched, we get 60%. The success rate at 60% is high. This is because we set conservative values in our configuration parameters.
Now let’s look at the statistics at the stage level. So here we only consider those stages with duration which is at 30 seconds. And also with at least 10 tasks in a stage. We call this stage eligible stages. And among all the eligible stages, 41% of them have launched speculation task. And among those stages that have launched speculation tasks, 76% of them receive some performance benefit. It means that they have got some fruitful task. Let’s look at the statistics at application level. In one week, we had a total of 157 Spark applications. And 38% percent of all the Spark applications launched a speculation task and 87% of them benefit from that speculative execution is that those applications have a least one fruitful speculation tasks.
And over 82% of all Spark applications benefit from the speculation and execution. In our multi-tenant environment, we have many in production jobs, and also we have many development jobs rounding on our many launch cluster. And the jobs varies daily. And then here we just sample a mission critical application. And this application has been run every day, it has a total of 29 Spark application flow. Some spark flow run daily, some spark flows run hourly, and each flow has a well defined SLA. Surface level agreement. And we took measures of all the flows for two weeks before and two weeks after we enable speculation. Here in this diagram, we can see the job elapsed time both before and after enabling speculation. And the in order to give the equal way to each job flow, we compute the geometric mean of all the job elapsed time for all the flow. And that we found out that after we enabling speculation, we were able to reduce job elapsed time by 17%.
We also completed the geometric mean of the standard deviation of all that or the jobs elapsed time. And we found out that after enabling speculation, we were able to reduce the standard deviation of the job elapsed time by 41%. So 41% reduction in the standard deviation is significant. This is because before we enabled the speculation feature, all the job used to run, with a big range of the job elapsed time. So many of them did not meet SLA, surface level agreement. And after we enable the speculation, we are able to reduce the variation of the job elapsed time so that it can all finish within a narrow range so that they can meet SLA. We also look at total resource consumption in gigabyte-hours. Here they gigabyte as a memory consume, that was it hours in this CPU or the duration in hours and gigabyte-hours are a measure of the combined CPU and the memory resource consumption.
We used this formula first driver memory in gigabytes times app duration in hours. So this is total driver resource consumption. For the executer resource consumption, we used executer run time, which is task elapsed time. We basically add all the task elapsed time for all the tasks within stage, we add them together. And then we add all the numbers for all the stages together, and then times the executor memory in gigabytes. Then we can get the total executor resource consumption. We found out that after we enabled in speculation, we were able to decrease the total resource consumption by 24%. Now, this task until what situation speculation can help. The first case is, mapper task is slow because the rounding executor is too busy or there’s some system hang due to hardware or software issues. Before we enabled the speculation, we used to see ‘run away’ task sometimes due to some system hang. Basically, we see the system hang after a Spark driver has launched a task. But Spark driver has been waiting for a long time without ending response.
We call this as a ‘run away’ task. After enabling speculation, we rarely see ‘run away’ task. This is because that ‘run away’ task are later killed since their corresponding speculation task can finish it earlier. There is also another case. The network route is congested. Somewhere. When we launch speculation task, and this speculation task may take a different network route. Also in terms of better locality, regular task normally we will reach NODE.LOCAL or RACK.LOCAL copy. And the speculation task usually reaches the ‘ANY’ data copy. If the initial task was not launched in an optimal way, it’s speculation task can have a better locality. Also to have some situation where a speculation cannot help. The first one is data skew. When a data skew exist, even we launch a speculation task using different data copy, however, data skew still exist. So speculation cannot help the data skew case.
The second case is the overload of shuffle surfaces, which has caused reducer task delay. Because the shuffle service is shared by all the executors within the [inaudible]. And then also there is only one shuffle data copy. So the speculation cannot help this case. The third case, there’s not enough memory and it has caused task to have a disk spill. Basically, Spark driver does not know the root cause why a task is still. It does not know the current condition of all the components of a cluster system. You will launch other speculation task based on the configuration parameters values we set. Now, we want to summarize on what thought. At LinkedIn we further enhanced Spark engine to monitor speculation metrics, and also in this thought, we have shared our configuration settings to manage the speculation execution.
I will said that depending on your performance goal, you’ll need to decide how much overhead you can tolerate. Then you can set your configuration parameters accordingly. I want to point out that, if your workflow is like LinkedIn’s workflow, which is many batch offline and jobs, then you can use our work configuration parameters, setting as reference. In terms of ROI. If speculation parameters are properly set, there are a couple of investment overhead, there are also some returns. The first investment, there is a small increase in network messages. Second investment. There is a small overhead in Spark driver. Also there are some returns. The first return is good saving in the executor resources. The second return is a good reduction in the job elapsed time. The third return is, you can get significant reduction in the variation of the job elapsed time. You can’t have better predictable and more consistent job performance.
In terms of our future work, we will investigate in adding intelligence to Spark driver in order to decide whether or not to launch the speculation task. As I mentioned, there are situations speculation task can help. There are also some cases speculation task cannot help. We need to distinguish between these two cases if we can know the condition of all the components of our cluster system, this can help us distinguish which case we have. Also today, many people run Apache Spark on the cloud. On the cloud, we have unlimited resources. However, we need to factor in the monthly cost. What is not cost in launching additional executors. finally, we want to thank Eric Boideschweiler, Sunitha Beeram and all the members in LinkedIn’s Spark Team for their enlightening discussions and their insightful comments. So this is the end of our presentation. We welcome your feedback. Thank you.
Venkata is a Senior Software Engineer at LinkedIn working on a big data platform based on Apache Spark. Previously worked at Qubole building big data platforms primarily on public service clouds using...
Ron Hu is a Senior Staff Software Engineer at LinkedIn where he works on building a big data analytics platform based on Apache Spark. Prior to joining LinkedIn, he used to work at Teradata, MarkLogic...