Over the past 3 years, Apache Spark has transitioned from an experiment to the dominant production compute engine at LinkedIn. Within the past year, we have seen a 3X growth of daily Spark applications. Nowadays, it powers many use cases ranging from AI to data engineering, to analytics. 1000+ active Spark users launch 10s of thousands of Spark jobs on our clusters processing PBs of data on a daily basis. Throughout this journey, we have faced multiple challenges in scaling our Spark compute infrastructure and empowering our fast-growing users to develop working and efficient Spark applications: Remove the major infrastructure scaling bottlenecks by optimizing core Spark components such as shuffle and Spark History Server Balance between the limited compute resources and users’ ever increasing compute demands by improving cluster resource scheduler Improve users’ development productivity without falling deep into the ‘support trap’ by automating job failure root cause analysis Boost users’ Spark jobs efficiency without hurdling their development agility that comes with repeated tuning of the jobs. In this talk, we will share the work we have done that tackles these challenges and what we have learnt during this process.
– Good afternoon everyone. Thanks for attending our session. In the next 30 minutes we will be sharing our experiences of operating Spark at LinkedIn and tackling some of the biggest scaling challenges. I hope you will find this useful and we look forward to the conversations we might be having after this session. After that, we will deep dive into two areas to share the problems we face and the solutions we have. So to realize LinkedIn’s vision to create economic opportunity for every member of the global workforce, LinkedIn’s platform has become a digital representation of the global economy, known as the economic graph. This representation not only powers multiple products on LinkedIn’s platform, it also surfaces unique insights into the global economy. To analyze such a massive representation you know, in order to surface insights and to engage with LinkedIn’s members, it actually requires a massive amount of infrastructure. At LinkedIn, Apache Spark is the dominant compute engine in our offline and other infrastructure.
The Spark system on LinkedIn has two characteristics that actually stand out. On one hand, it is a massive scale infrastructure, across our two largest and business Hadoop clusters, we have 10,000 nodes, with about 1 million CPU vcores and more than two petabytes of memory. Every day about 30,000 Spark applications run on our clusters. These Spark applications contribute to about 70% of our total costs for computer resource usage. And they’ve generated close to five petabytes of data to be shuffled daily. This massive scale infrastructure also grows very fast. We saw more than three x growth in just last year for our daily Spark application count. On the other hand, Spark at LinkedIn is also a diversified ecosystem. Our Spark users interact with our customers through either asking them for scheduled workflows, or Jupyter notebook and Spark shell for ad-hoc interactive curries. We have thousands of internal Spark users developing applications using Sparks rich collection of APIs. About 60% of our Spark applications are Spark SQL apps using APIs such as data frame, data set, and SQL. Our Spark applications power a wide range of use cases at LinkedIn, including AI, data analysis, data warehouse, A/B testing, and metrics reporting substrate. So this fast growing massive infrastructure and the diversified ecosystem with many parts actually present multiple scaling challenges at different layers of the compute stack.
At the bottom, we saw a resource management challenge. Managing the resource demands to satisfy the extra requirements in a cluster with thousands of users could pose some real operational challenges. On top of this, there are also scaling challenges for the compute engines. The faster usage growth requires a scalable compute engine to ensure stability. Otherwise, the unstable infrastructure would significantly increase operational overhead and cause users to be unhappy. Furthermore, we also saw a user productivity dilemma. When Spark usage grows, more support requests are generated. To ensure our Spark users’ productivity, more effort in the team will then be put into support. However, this could also lead to the reduced effort on improving the infrastructure itself, which will just lead to more support requests in the end. Such a vicious circle becomes a support trap for the team. At LinkedIn, we have made a systematic attempt to address these challenges across the compute stack.
At the resource management layer, our effort has largely been to decouple the cluster admin from the business of resource management. We created org-centric resource queue structures that enable individual orgs to self manage their queues. We have also enhanced the resource scheduler to automatically manage queue elasticity by assigning the cluster idle resources to queues that are most in need. As a result, the mundane resource management processes are largely automated or delegated to our users. At the Compute Engine layer, we have spent major effort optimizing Spark shuffle service, which is the biggest getting bottleneck at this layer in our experience. In addition, we have also been trying with multiple SQL automation techniques, even including adaptive query execution. SQL optimisations have this nice feature to help to reduce compute workload to further relieve the compute engines pressure on their fast adoption growth. At the User Productivity layer, our aim has been to get out of the support trap by enabling automated Spark job understandability to answer the three most common customer support questions we faced. Basically they are, Why does my job fail? Why is my job slow? and how can I make it faster? So due to the time constraint in the rest of this talk, we’ll only cover our user productivity and shuffle optimization efforts. If our other efforts also interest you, do let us know. We’ll be happy to share experiences after the talk. Now I will hand over to my co-speaker Zoe to first talk about our efforts in scaling user productivity. – Thanks Min for giving us an overview of scaling Spark at LinkedIn. As Min just mentioned, scaling our user productivity and the ability to understand Spark jobs are critical. Given that Spark is a very complicated compute engine, it can be painful to debug and tune the applications. Considering the massive scale we have at LinkedIn, doing user support is a huge distraction for our Spark team, as we only have limited resources within the team. And if we spend too much energy on supporting individual users and firefighting here and there, we probably won’t be able to really focus on improving the underlying infrastructure in a systematic way. This will lead to the negative feedback loop and will eventually fall into the support trap. As Spark is growing really fast in our community, many other companies might potentially suffer from this problem as well. We would like to share our experience of breaking away from the loop.
As already mentioned, here are some of the most typical questions Spark users would ask, Why did my job fail? Why is my job slow? And how to make it run faster? In Spark failures can happen anywhere, and it takes at least a few clicks to get you all the logs. And sometimes it’s super hard to find out the root cause of the failure even with the logs. And oftentimes, as a Spark user, you’re able to finally get the job running without failing, but get very frustrated and wonder why your job is running slow and how to make it run faster. We developed a set of solutions to automatically support our users, including automatic failure root cause analysis, GridBench for performance analysis, and various tuning recommendations. With all these solutions, we are able to greatly boost our users productivity, and at the same time, reduce the burden on us, the Spark team. In the next few slides, I’ll walk you through the solutions, and at the end, I will discuss the infrastructure behind them.
Let’s start with the first question. Why did my job fail, and how our automatic failure analysis can help. As a Spark user at LinkedIn, a typical scenario in the case of job failure would be, you go to our workflow manager Azkaban to check the job. And instead of digging for the failure root cause in different logs, you can simply go to the exceptions tab and it will list all the exceptions found in all the available logs. The failure root cause summary is in the exception tab, is in the exception category, which indicates for this specific job it failed because of out of memory. It’s followed by the detailed diagnostic info, you can click into the links to check the full logs.
In addition to the direct benefits to Spark users from the UI, with automatic failure analysis we also have a few failure dashboards, which can benefit different teams that are using Spark. Here’s an example of the failure breakdown for a specific team. For this team, the dominant failure type is input missing, and there are also some access control issues and out of memory issues. With this breakdown, teams can understand where the hotspots are, and take actions accordingly to reduce the failure rate.
The automatic failure analysis can also benefit infra teams. For example, here’s the trend line about different types of infra failures. We can see network issues, data shuffle issues, and so on. This trend monitoring is critical for our cluster health and helps us to detect abnormal behaviors early on and identify potential problems and opportunities for improvements in the clusters.
For the second question, why is my job slow, and how to analyse job performance? We have GridBench. GridBench generates various reports to provide summarized views of Spark application performance metrics for users. It can compare sets of application runs to identify bottlenecks, detect performance regressions, and benchmark our infrastructure. Here’s an example of a regression analysis report for a specific job. It compares a set of recent executions with a set of past executions to determine whether the job performance has regressed or improved. And here, there is very likely a regression in the executor CPU time, and users can pay more attention to it when optimizing the job.
To answer the third question, how to make my job run faster, and how to tune it, we have actionable tuning recommendations for you. We designed a set of heuristics to check job configuration, resource setting, job skew, and GC. If the heuristics shown in red, it indicates room for improvement. And if it’s green, it’s healthy. For each heuristic, there’s a corresponding recommendation. For example, here, the executor memory is set too high, and the recommendation is to set it to a lower value to avoid resource wastage.
After demoing the solutions we have, let’s talk about how we gather the data and build all the solutions to scale our user productivity. Metrics are central to our work. Our Spark metrics data set tracks detailed information for every single Spark application running in our clusters. The metrics are then served to various consumers including the failure root cause analysis, GridBench, tuning recommendations, ad-hoc user queries, and so on. But where do all the metrics come from? We have Spark history server. It does history log parsing for all Spark applications. It has web UI for Spark users to check their application details. It also has rest API’s, which are leveraged by our tracking service.
However, the history server has some serious scaling problems when dealing with our massive workload. This not only hurts engineers’ productivity, but also hurts our tracking service stability. There are two major problems with the history server. First, it’s not able to handle high volume concurrent requests. Second, parsing history files is very time consuming, especially for large log files. To address the first problem, we designed this distributed Spark history server with one proxy server, and a few worker servers. With multiple servers, we are able to horizontally scale our history server.
To solve the second problem, we have incremental parsing. Traditionally, the history server only starts parsing the history file after the application finishes, and the latency of log parsing is pretty high. To address this problem, we incrementally parse the history file while the application is still running. So that very soon after the application finishes, we have the metrics available for the end users to consume. With incremental parsing, we are able to bring down the 99th percentile for log parsing from one hour to 18 seconds, and we’re seeing a 30 times reduction in loading delay.
With the scalable Spark history server, we are able to achieve low latency in our metrics pipeline. The Kafka and Samza based Spark tracking service is the component that tracks the application metrics data. As you might already know, Kafka and Samza are the stream messaging and processing frameworks that are open sourced by LinkedIn, and they’re here to enable our low latency analytics. The Spark tracking service first reads the stream from the resource manager to get application ID and relevant RM information. It then queries the Spark history server to get individual Spark applications raw metrics, and further compact and generate derived metrics. Finally, the tracking service publishes the metrics to the Spark metrics data set. With this low latency data set, we’re able to accomplish the various pieces of work mentioned previously, and it also has the potential to achieve much more in the future. Now I’ll hand over to Min to dive deeper into scaling Spark infra. Thanks.
– Thanks Zoe for giving a summary of our effort in scaling user productivity. With these tools, the devs’ efforts in the team now can finally be reserved for the core efforts, which is to improve the compute engine itself. But which components will be improved first? As we experience the fast Spark usage growth, Spark shuffle service becomes one of the first set of Spark components to experience getting bottlenecks. At LinkedIn, we deploy Spark on YARN and enable external Spark shuffle service to manage the shuffle files. Such a configuration is necessary to enable Spark dynamic resource allocation, which is critical to ensure fair resource sharing amongst Spark apps in multi tenancy clusters. In such a setup, each compute node in the cluster would have one Spark shuffle service instance deployed.
Each Spark executor would register with this local shuffle service upon starting up. The shuffle map tasks will generate shuffle files, each consisting of multiple blocks corresponding to different shuffle partitions. These shuffle files will be managed by the external shuffle services, and when reduce tasks start running, they will fetch the corresponding shuffle blocks from the remote shuffle services. In a busy production cluster, a single shuffle service could easily receive thousands of concurrent connections from reduce tasks coming from 10s of different applications. Due to the sheer nature of the Spark shuffle services, we experienced a few major issues with operating them at scale.
The first we saw is a reliability issue. In our production clusters, we have noticed somewhat unreliable connection establishment to additional services during the peak hours. This will result into a shuffle fetch failure and which will also lead to expensive state retries. This can be pretty disruptive in terms of leading to workflow SLA breakage and the job failures. The second issue we saw is an efficiency issue. At LinkedIn, we store shuffle files on hard disk drives. Since the reducers’ block fetch requests arrive in random order, the shuffle service will also access the data in the shuffle files randomly. If the shuffle block size is small, then the small random reads generated by the shuffle services can severely impact the disks’ throughput, which will extend the shuffle fetch read time. The third we saw is a scaling issue. An abusive job shuffling many small shuffle blocks could easily choke a shuffle service, leading to performance degradation. This not only impacts this abusive job, but also all the neighboring jobs who are sharing the same shuffle service. It’s not always easy to tune these jobs to shuffle larger blocks, and whenever it happens, it would just lead to unpredictable delays to other jobs behaving normally. The small shuffle block issue is also illustrated in this graph. So this is actually a part of a down sample of 5000 shuffle reduce stages for their average block size and the per task shuffle fetch weight time. This data is actually gathered from our production cluster in late 2019. From this graph, we can see that the majority of the stages experiencing long fetch delays are also the ones with small shuffle blocks, as illustrated in this long strip in bottom of this graph. This shows the correlation between the block size and the shuffle fetch delay. So now we have seen those multiple issues with shuffle services, and what have we done?
So to tackle these issues, the first thing we did is to actually harden the Netty server backing the Spark shuffle service. So as shown in this graph which also comes from the cluster failure dashboard Zoe mentioned earlier, the dominant type of shuffle failures, surprisingly, is not related to network connectivity, but a server side timeout issue on processing the authentication requests. This issue, as shown in this graph, averages around 1000 occurrences per day in one of our production clusters, and they can even peak to close to 6000 on certain days. Upon deeper investigation we found out that this is actually due to a Netty server issue, where the lightweight control plane RPCs, such as this authentication request, are not isolated from the much heavier data plane RPCs. As more time are naturally spent on processing the data plane RPCs, which is to transfer the shuffle data, the lightweight control plane RPCs are getting time off. We’ll fix this issue in these two upstream tickets. And after deploying those fixes, we immediately see cluster-wide impact, which significantly reduce the occurrence of this type of issue. So now, we’re only seeing this issue occurring less than 10 times per day on our clusters. This helped us significantly improve the reliability of our shuffle infrastructure. So, so far, so good, but there are also other issues that needs to be resolved. So in the next stage, we started looking into keeping the shuffle abusive jobs under control. To help our shuffle services sustain the fast usage growth, it is necessary to make sure that we’re able to keep those abusive jobs under check. We discovered that the biggest impact coming from this abusive shuffle jobs is their block fetch rate. Since these abusive jobs would generate a large number of small shuffle blocks, when its shuffle reducers start to fetch the blocks, the shuffle service can easily overwhelm the available disk bandwidth with all the small random reads. In our production clusters we have seen abusive jobs significantly increasing the shuffle fetch delays of their neighboring jobs. Based on this finding, we developed the shuffle service throttling mechanism, which tracks per-application shuffle block fetch rate, and throttle the users which are going over the threshold by telling them to exponentially back off. The reducers receiving such requests will either react by reducing the block fetch rate or by reducing the number of concurrent block fetch streams. We have recently enabled this feature on our production clusters. In the metrics we gathered after the enablement, we observe that the spikiness of the block fetch rate across all nodes in the cluster gets significantly reduced. This means the abusive jobs are under control and their impact to neighboring jobs are much limited. This is also shown in this graph, as you can see to the right of this red dotted line is when we enabled the throttling feature, and we’re no longer seeing those high spikes indicating very high block fetch rate. But in the meantime, we also observed that the overall shuffle data transfer rate, in terms of bytes per second, does not have a visible change. This means that we’re not hurting the overall shuffle infrastructure’s data throughput by throttling the most abusive jobs. This is also shown in this graph, and we can see that to the right of the red dotted line, you can barely see any differences compared to when we when the throttling was not enabled. So although throttling helps to protect the shuffle services from abusive jobs, it doesn’t really solve the fundamental problem of the small shuffle blocks. The jobs that are throttled might be high priority, tight SLA jobs that simply cannot afford being throttled. And as mentioned earlier, it’s not always easy to tune these jobs to increase the shuffle block size. And this will also increase the task input size, which might negatively impact the other parts of this job. To solve this problem, it actually requires a more fundamental improvement to the shuffle process. So for this reason, we have further designed and implemented magnet, which is a novel push based shuffle service. Our work on magnet has recently been accepted by the VLDB 2020 conference, and we have also put up an upstream SPIP in this ticket. Due to the time constraint, we will only be covering some high level ideas behind magnet. And if you’re interested in more details, in addition to referring to the paper and this upstream ticket, we’ll also be writing a blog post about our shuffle efforts. So stay tuned. So to give some high level ideas behind magnet, what magnet does is it actually adopts a push and merge style of shuffle.
So in magnet, in addition to generating the shuffle files consisting of the multiple shuffle blocks, the mappers now have the additional responsibility of dividing the generated blocks into multiple groups. Each group usually contains a few megabytes of blocks that are continuous in the shuffle file. A separate thread will then read entire group at once and transmit the individual blocks to remote shuffle services to be merged per shuffle partition. The Spark driver would select a list of shuffle services at the beginning of the shuffle map stage, and each mapper would receive the same list. So they can guarantee that blocks belonging to the same shuffle partition will always end up being sent to the same shuffle service. On the shuffle service side it merges the incoming blocks into per-partition merged shuffle files in a best effort manner. So when this push and merge process gets finished, in addition to the size and the locations of the original unmerged blocks, the Spark driver will also receive the same metadata for these per-partition merge shuffle files.
When the reducers’ tasks start running, they will query the Spark driver for the locations of their input shuffle blocks. The driver now takes into consideration the locations the merged shuffle files, so that the reducers no longer need to fetch the individual blocks, and thus can avoid the small shuffle block issue. Since the merge process is best effort, as mentioned earlier, the reducers might not getting their entire input from a single merged shuffle file, and for any blocks that are not merged, reducers will fetch the original unmerged blocks. In addition, since the majority of our reducers input data now gets merged at a single location, this also creates a natural locality preference for running the reducer pass. The Spark driver in our design takes this into consideration to schedule reducers with a much improved shuffle data locality.
So to recap, with magnet shuffle service, the mapper generated shuffle blocks now get pushed to remote shuffle services to be merged per shuffle partition.
And this process helps to convert the small random reads in the shuffle into the large sequential reads, which helps to significantly improve disk efficiency. In addition, the merge process effectively creates a two-replica for the shuffle intermediate data, which helps to further improve the shuffle reliability. And the locality-aware scheduling of the reducer tasks to relocate the reducers with the merge shuffle partitions also helps to further improve the shuffle performance. With magnet we have already observed significant performance improvements of Spark jobs at LinkedIn, and that also helps to improve the shuffle behavior of an otherwise abusive job, avoiding them being throttled. So we’re currently rolling it out to more production jobs at LinkedIn. And as a reminder, you can always refer to the paper or the upstream ticket for more details on magnet. So this concludes our talk. Some of the key takeaways are, the scaling challenges can actually hit all the layers in the compute stack, if you’re experiencing a rapid usage growth. You will need a holistic solution to ensure your infrastructure as a whole can scale. In order to that extent, it’s also worthwhile to invest in automated user productivity tooling, so that you can focus more on solving the core issues at the end. So we didn’t get to talk about all our efforts, but if you’re interested in our other efforts, do let us know. And if you’re also interested in reading the magnet paper, and have more detailed questions, you can also reach out to us. We’re also looking to contribute our software efforts upstream, including both magnet and the shuffle throttle. We hope to collaborate with some of you in the community on these efforts. We will also have an engineering blog post about our shuffle efforts coming soon. So stay tuned.
Min Shen is a tech lead at LinkedIn. His team's focus is to build and scale LinkedIn's general purpose batch compute engine based on Apache Spark. The team empowers multiple use cases at LinkedIn ranging from data explorations, data engineering, to ML model training. Prior to this, Min mainly worked on Apache YARN. He holds a PhD degree in Computer Science from University of Illinois at Chicago.
Zoe is a software engineer on LinkedIn's Spark team, where she supports Spark use-cases at LinkedIn and tackle various platform challenges, mostly focusing on Spark tracking, metrics and tuning. Previously, she went to UC Berkeley and Carnegie Mellon University.