Tale of Scaling Zeus to Petabytes of Shuffle Data @Uber

May 27, 2021 12:10 PM (PT)

Zeus is an efficient, highly scalable, and distributed shuffle as a service that is powering all Data processing (Spark and Hive) at Uber. Uber runs one of the largest Spark and Hive clusters on top of YARN in the industry which leads to many issues such as hardware failures (Burn out Disks), reliability, and scalability challenges. Last year, we discussed with this forum about Zeus service architecture traits and early results. Since then we made great progress, we open-sourced Zeus last year and deployed it to our all analytics clusters.

In this talk, we want to talk about how we scaled the Zeus service to all the spark workloads, scaled to billions of shuffle messages and petabytes of shuffle data at uber. We will also talk about the strategies which we took to roll out Zeus to this massive scale without users noticing any difference or any service disruption. We also want to talk about further improvements which are on the horizon for Zeus as well as the performance and reliability improvements that have been done in future releases.

In this session watch:
Mayank Bansal, Staff Engineer , Uber Inc

 

Transcript

Mayank Bansal: This is Mayank Bansal, at Data Infra team at Uber. Today we’re going to talk about how we scaled our remote shuffle services, Zeus to Petabytes scale. Let’s have a little bit context about Uber. Uber is a small company with 15 billion trips, 18 million trips per day. We have 6 continents, 69 countries and 10,000 cities. We are actually have 103 million active monthly users, around 5 million active drivers. We have 22,000 employees worldwide and we have 3,700 developers worldwide. These numbers are a little older, so there may be some changes during pandemic, et cetera. So those things are not being considered here, the stats.
Data and ML use cases at Uber. Uber Eats, ETAs, customer support, driver/rider match, personalization, demand modeling. Actually pretty much each and every use case at Uber is data and ML driven. All Uber App is, actually runs on data and ML. Pretty much every decision has been taken by using data and online model and offline models, to better serve our users. So data and ML is actually a backbone of Uber.
Let’s talk about Uber’s data stack. We have many events comes in from different sources. It could be a mobile app events or it could be device events, it could be from car. So these events actually come through different online services and then those events has been captured and enlisted into the Kafka. Kafka, we have biggest Kafka installation in the world. So Kafka, we take this events from Kafka and ingest into the in-memory database, like Pinot and AresDB or a storage, something like hot storage, warm storage for HDFS, as well as we archive in the cloud. So we have a tiered data lake actually, which extract all of it.
We have compute fabric on top of it, which we run YARN and Mesos and Peloton. We also have a stream processing, Flink, as well as we have batch processing, we use Spark, Tez, MapReduce, which turns on these compute fabric. Then on top of it, we have realtime and pre-aggregated analytics, which we do through AthenaX and then we have query engines, such as Presto and Vertica. As well as we have batch and complex batch systems, like Hive. On top of it we have data preparation tools like Piper, uWorc. We have dashboards, like Summary, Dashbuilder, which do reporting on top of it. As well as we have ad hoc query engine, ad hoc query interface, which we call it QueryBuilder. Where everybody and anybody can go and write a query to any of these query engines. We also have BI tools, such as Tableau in DSW.
Let’s talk about Uber’s ML stack, which is Michelangelo. We ingest all the data from Kafka and then we do a data preparation through stream processing, Flink and batch processing from Hive. All this data has been powered by the data lake, HDFS or the batch processing, and the stream processing, we directly get data from Kafka. We then do prototyping with Jupyter Notebook, Spark Magic. We also do training through Tensorflow, Pytorch, XGBoost, Spark ML. And then for the inference, we have the realtime prediction service and batch prediction. Pretty much all this processing happens on the compute fabric, which is powered by YARN and in some cases it’s powered by Peloton or Mesos.
Let’s talk about Apache Spark at Uber. Apache Spark is the primary analytic execution engine teams, which uses at Uber. At Uber, 95% of the batch and ML jobs run on top of Spark. We run Spark on YARN and Peloton or Mesos. We also use external shuffle service for the shuffle data. Let’s talk about how does a Spark shuffle service work. As we already know, that Spark shuffle service is a separate daemon, which runs on each host. Along with the load manager of YARN or along with a separate daemon in case of Mesos. What happens when a mapper task wants to write data, wants to process data and then pass the data to the shuffle or to the reducer? It writes the data into the local disk, by that reducer can come and read that data.
In that case, mapper writes the data using local shuffle service and generates the index file and the partition into the local disk. So each mapper does that. Then when the reducer wants to take their data and do the aggregation, et cetera. It goes to each machine, talks to the local shuffle service, gets the data for the partitions it is actually responsible for and then merge the data in the memory buffer. In this case, reduce the task, spins the data on the disk in case of the data sizes are big and then it also iterate through it and then produce the final output into HDFS. So this is how very simply put, a local shuffle service work in the current environment.
So there are certain limitations of Apache Spark shuffle service, excellence shuffle service. The one of the issues which we found in our production systems was, because we uses the local base machines. Which is one terabyte disk and these are very much base machines, which has one terabyte disk, some CPUs and memory. The amount of data which our production service process and amount of data the shuffle service writes to the disk. We actually found out in our production system that our disk are wearing out. The reason for that is, every SSD comes with a DWPD, which is disk writes per day and if you write more data then it is intended for, it actually, the life of the SSD gets reduced.
As you can see in this graph, we are pretty much writing four terabyte to five terabyte data every day in these disk and these disk pretty much comes with one DWPD. So we are actually wearing out our SSDs four to five times faster than it should be. So when you think about if a machine, which is generally available for three years, is actually getting wear out within six months to one year. So we have to actually change our whole fleet six months to one year, if you don’t pick it up.
The other limitation of Apache Spark service is reliability. We had the huge number of failures in our production system, due to the Apache Spark shuffle service. As you can see, we have around 20,000 task failures per day. On an average, it is around 14 to 15,000 per day task fails, because of the different reasons of shuffle service, which is actually a big cause of our reliability challenges in Spark. Let’s talk about what are these different categories of issues are. So there are different categories of challenges, which we face. One is the shuffle failures. So we often find out that our jobs or task fails due to fetch failures. Right? So it reduces, go to the mappers and we can, in many cases, we found the mapper. Whether the mapper machine is not available or the mapper machine is very busy and then the reducer cannot go and talk to the mapper machine. And in that case, the fetch fail acception happens, which is a very generic acception and then your job of the stage fails.
Similarly, if the machines are busy or there is a network issue, that also is the case when we find these issues. So there are a lot of failures we find out in our production system because of these shuffle failures. Second, noisy neighbor issue. As I told you earlier, that our services writes a lot of data into the disk. We have these base machines with one terabyte disk, which is pretty much the bottleneck, in my opinion. Because we run around 50 to 60 containers per machine and any of these jobs or containers can write a lot more data into the disk than the other containers and because of that, there may be multiple issues. One issue, the space, it might happen that the jobs or it actually happens that the job which is writing a lot more data, fill up the whole disk. And then the other jobs which is running on the same machine gets filled, because of they don’t have space to write the data. That is one category of failure.
Second category of failure is the throughput. So one job can write a lot more data which can consume the whole network IO bandwidth. Because of that, the other jobs get slowed down in the machine. Third issue is, actually we write so much data. I showed you earlier that there is around five terabyte of data we write per day. The machines, because of the IO bandwidth is pretty much full, there is no more space for any other operations to happen, if somebody wants to write to disk. And if that happens, you can’t even SSH to that machine, there is nothing that can happen on that machine and that machine gets unresponsive. And because of that, a lot of failure happens. So this is also one of the major issue for us.
We have a shuffle service, which is running on YARN, as well as on Mesos and this is pretty unreliable. We have seen many cases, they get out of memory, they get crashed. The YARN manager gets crashed because of shuffle service. In Mesos, the reliability of the shuffle service is very, very low. We have to actually run a daemon to restart it from after a few days and even monitor the daemon and see if it gets crashed and then need to restart. So the reliability of these shuffle services are really shady and in that case, we see a lot of failures.
The fourth is, one of the main issue is the scalability. We have use cases which actually needs to write a lot of data to the disk and which is more than what we have the disk. Let’s say we have one terabyte disk as you’ve seen, one of the job we need to write more than one terabyte data on the machine for the shovel. And those are legitimate use cases and because of that, we can’t run those jobs into the Spark, because that is bottlenecked by the shuffle service. So these are the few reliability challenges which we see today in our cluster.
The other limitations is, we wanted to move to Kubernetes from our compute stack, because Kubernetes is the computing in which we wanted to adopt in Uber, for our batch processing as well. So right now there is no dynamic allocation support and also, we wanted to migrate our YARN workload to K8. We have a huge amount of YARN workloads which runs through there and without dynamic allocation. It would be a challenge for us in terms of resources to run it on Kubernetes cluster, because we would not have resources pretty much. So this is one of the big challenge for us, to run Spark on Kubernetes in Uber.
The other big challenge is collocation. As I said earlier, in my biggest [inaudible] that we write so much data on the disk. And because of that, we can’t collocate stateless and batch compute together or stateless or storage compute on the same machine. What it means is, so we have a lot of stateless services, microservices, which runs and there is a lot of CPU and memory available to consume during many times of the day. We want it to consume that, because we don’t want to waste resources. So we can run actually a batch compute on top of those machines and use those machines, and the utilization level for those machines will be high.
However, because we write so much data to the disk, it actually is not feasible for microservices or storage services to run with batch. And the reason for that is because if the machines is writing so much, if the machine is unresponsive and our IO throughput is totally taken, then the stateless jobs cannot run. Even the storage also, if you’re sharing the same disk, that is also a challenge for storage machines to collocate with the batch. But this is also a very big challenge for us, because we wanted to increase the utilization level of those stateless storage machines, but because of we write so much data to the disk, it’s not possible yet.
So I’ll talk about briefly the architecture of remote shuffle service. We have presented in the last summit, about how we lead to this architecture and in a detailed fashion we explained that. I’m not going to go explain that again. So I’ll just go over it very briefly here and then you guys can go back to our previous year Spark presentation, and see how we lead to this architecture and what is the benefit?
So what we do today, we have these two machines, very high level. We have the host of the mapper host and the reducer host. Mapper task runs and talks to shuffle manager. We extracted our shuffle manager to remote shuffle, so we actually use the same APA as Apache Spark. So shuffler manager, instead of talking to the local service, it talks to the remote shuffle service. And then it writes to each, all the mappers for the same partition, writes to one shuffle service. One shuffle service and then that reducer, which needs to get the data for the partition, it will go to the one server and then fetch the data.
How it helps? Because you’re writing sequentially and you’re drilling sequentially, and you actually inverted the MapReduce model. Where mappers write into one machine and reducer going to one machine and gets the data. So because of this inversion, we actually improved a lot in latencies. So these machines are pretty much stateless. So all the remote shuffle service machines are stateless. We use higher machines where we can connect to many servers together. Many mappers, many partitions can talk to one service at a time, but there is no state between these two servers. So you can scale out gently.
So as I said, it’s a horizontally scalable design, where we have multiple mapper for the same partition, talking to the same machine. There is no share state between shuffle service. So in this figure you can see many mappers, all the mappers are talking to one machine for partition one and partition two. And reducer one who’s responsible for partition, will go to only one machine and fetches the data. Because of there is no shared state, we can horizontally scale this model and you can write all the reads remotely. And because of this inverted MapReduce paradigm, we actually improved our latencies a lot, even though we are writing remotely. So what we have the similar runtimes off of our jobs, even if you are running remotely. What it means is, if you’re using external shuffle service, which is writing to the local, compared to Spark job which is writing through remote shuffle service, their runtimes are very similar.
We have done a lot of network optimizations. We use Netty, this is a high-performance asynchronous server framework. We’re also doing a lot of asynchronous processing in reading and writing and there is asynchronous reads and writes, in the client side, as well as in the server side. We also wrote our own RSS binary network protocol, which has efficient encoding and compression, which improves our writing to the network. So we do a lot and because of that, we can actually scale more. We’re already doing a lot of performance optimization and read and write on the disk, because sequentially writing it and sequentially reading it, which improves a lot. In two ways, one, we can read data faster. Second, our data is not being, because of the DWPD of the disk is not better yet. If you read and write to the SSD faster, we actually improve the DWPD of the disk.
So we also uses client side compression. We do that because we wanted to scale out the compression, compared to server side. So if we do the compression on the client side, we can scale it out for multiple clients, then it will bottleneck at the server side. We also reduce the network transport size by the compression, which will help us reducing the bytes on the network. We do parallel serialization on the disk and network both, which helps us improving the latencies, as well as we have asynchronous shuffle data commit.
So let’s talk about the fault tolerance. Shuffle servers. So we have ZooKeeper as a server registry. So each Spark driver will go to the ZooKeeper to figure out for which partition, which shuffle server we should use and that information has been passed from Spark driver is Spark executer. And then those Spark executor will use that shuffle server to read and write data. We do it because at any distributor system, your servers can go down anytime. You don’t want your jobs to fail because of that. So that’s the reason we have this architecture where you can discover your server and then if the server is down, you can discover another server to write and read.
The other thing we do, shuffle data replication. So what we do is, we have this server replication groups. What it means is, one shuffle can be written or one client can write to multiple server, in this case two. Where if the machine one goes down or shuffle server one goes down, reduce it and it’ll still go to the shuffle server two to read the data. We enabled it for many important jobs, because if the shuffle server is down because of multiple reasons, you still can recover your jobs without any user intervention or retrace.
So let’s see, we talked about a lot of problems. How did we do it? Did we solve these problems yet or not? So here is the disk writes now, our disk writes now reduced from 60 to 70% in our production environment. Now we are writing from pretty much 1.5 GB in a max case. Sorry, 1.5 terabyte in the max case and because of we are reading and writing sequentially, the same machines can handle that much of the DWPD. Our task shuffle failure count reduced to 90 to 95%. As you can see, our task failure was 20K, now it is pretty much reduced to 95%.
Collocation. We are doing two level of collocations in the production. We are doing HDFS collocation with YARN, we are running YARN with HDFS machines and because of the shuffle, we are not interfering on any disk writes and we are just producing output to the network through shuffle. So that helps us, collocating HDFS with YARN. Similarly, we are also collocating with the stateless and batch workload, through running YARN on Peloton. Peloton is a scheduler which runs on the stateless servers, YARN is for batch and we are running together to collocate batch and stateless workloads on the same cluster. Because of doing this calculation, we are saving tens of millions of dollars and that’s a huge improvement for us.
Uber uses Kubernetes. RSS already have a K8 implementation, this is the PR number which has been contributed by BO from Apple. In this architecture, RSS is deployed as a stateful set on K8. We have removed the ZooKeeper from the K8 implementation. Clients can statically discover RSS server and Kubernetes make sure those RSS servers has been started again and they can go randomly to one to another, if they don’t find the first RSS server is up. Rest of the RSS architecture is pretty much the same and now the good news is, we can do dynamic allocation on K8.
Let’s talk about the Zeus Scale at Uber. All Uber data clusters are using Zeus as a shuffle service today. All Uber analytics workload using Zeus for shuffle is Spark, Hive on Spark. So we run Spark as analytic workload, as well as we run Hive on Spark. So pretty much everything, 95% of the workload uses Zeus for shuffle. In our analytics cluster, per region we run around 450,000 jobs and that’s our older number, so it could be more now and 95% is using Zeus for shuffle. We can see Zeus is horizontally scalable. We are doing around 1.12 trillion shuffle records per day as a max and which is actually, we are adding more to it, because we are adding more workloads. On an average 685 billion shuffle records per day, we are adding through Zeus. We have around 3 billion shuffle records per minute as a max happening on Zeus.
We are writing around three petabyte shuffle data per day and our job latencies are on par, are really similar with external shuffle service. Roadmap. So we are still improving a lot in the GSA. We’ve added map side aggregation. So the runtimes of the jobs which has the map site aggregation we’re not on par. So we added this feature, which has been done in our team. It’s internally developed, it has great results and we are going to upstream it very soon. We also improved the ZooKeeper connection, because in our internal production, we use ZooKeeper and we internally deployed that. And we moved from 16,000 connections to 200 and it’s a great result. We are working on multi-tenancy, load balancing and K8 integration internally. Zeus is open source, please use this link, go to remote shuffle service. Please add yourself and try to use it. For any questions, please reach out to me. My email is mabansal@uber.com. Thank you.

Mayank Bansal

Mayank Bansal is currently working as a Staff engineer at Uber in data infrastructure team. He is co-author of Peloton. He is Apache Hadoop Committer and Oozie PMC and Committer. Previously he was wor...
Read more