How to Performance-Tune Apache Spark Applications in Large Clusters

Download Slides

Omkar Joshi offers an overview on how performance challenges were addressed at Uber while rolling out its newly built flagship ingestion system, Marmaray (open-sourced) for data ingestion from various sources like Kafka, MySQL, Cassandra, and Hadoop. This system is rolled out in production and has been running for over a year now, with more ingestion systems onboarded on top of it. Omkar and team heavily used jvm-profiler during their analysis to give them valuable insights. This new system is built using the Spark framework for data ingestion. It’s designed to ingest billions of Kafka messages per topic from thousands of topics every 30 minutes. The amount of data handled by the pipeline is of the order hundreds of TBs. At this scale, every byte and millisecond saved counts. Omkar detail how to tackle such problems and insights into the optimizations already done in production.

Some key highlights are:

  • how to understand your bottlenecks in Spark applications, to cache or not to cache your Spark DAG to avoid rereading your input data
  • how to effectively use accumulators to avoid unnecessary Spark actions
  • how to inspect your heap and non heap memory usage across hundreds of executors
  • how you can change the layout of your data to save long-term storage cost
  • how to effectively use serializers and compression to save network and disk traffic
  • how to reduce amortized cost of your application by multiplexing your jobs.

They used different techniques for reducing memory footprint, runtime, and on-disk usage for the running applications. In terms of savings, they were able to significantly (~10% – 40%) reduce memory footprint, runtime, and disk usage.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Good afternoon, everyone. Welcome to today’s talk on how to performance tune Spark applications in large clusters. Before we start, I want to set the context of what to expect from this talk. So this talk is not about how to set Spark configurations so that you can start running your Spark application. So most of the Spark configurations, you can find it online. So this talk is more about when we built Marmaray at Uber, which is a generic ingestion system. To ingest data from any source to anything. We did something differently than what is readily available when you Google outside. So that’s pretty much what we’ll be talking about in this presentation. I have divided the presentation into four different improvements. And, let’s look at them.

So this is brief introduction about who am I. So I am currently software engineer at Netflix. Before that, I was a software engineer at Uber, where I architected and author Marmaray. Prior to that, I worked with a company called Hedvig, which is software-defined storage company where I architected object storage and their NFS solutions.

This is agenda for today. I’ll briefly talk about JVM profiler, Hadoop profiler, Spark Listener, Auto Tune. And then the rest of the talk I’ll focus more on four different improvements, primarily storage, CPU runtime, efficiency and memory improvements.

JVM Profiler: Distributed

So why profiling Spark job is very important? And why is it very challenging? So profiling a simple standalone Java application is a very simple problem. Because you have the application running all the time so you can connect a remote debugger or you can even use different profiling techniques to profile individual API or certain class or parts. It becomes very challenging with Spark primarily because, first of all it is running on thousands of executors. And the second is if you have seen a Spark application running in production, then you know that the Spark job is divided into multiple Spark stages. And every Spark stage is executing different core parts. What it means is like, when you want to know, like, Okay, if my Spark job is running fine, or there is a room for improvement? I don’t know because since it’s spread across so many Sparks stages, I don’t know which Spark stages should I go after and there is a room for improvement. And that’s when a Spark team at Uber built JVM profiler so that all the Spark developers can debug Spark applications and performance tune that. This JVM profiler is available online.

It’s an open source and the team has already presented it at previous Spark Summit. So if you are interested, please go and check out their video.

Hadoop Profiler (Recap)

At a high level, what Hadoop profiler is, It’s basically a Java agent. So when you are defining a Spark launch in the extra arguments, you will provide some configurations which is listed on their website. And once you submit a Spark job, whenever executer comes up, it comes up with this Java agent attached. This Java agent lets you do some profiling, first is a CPU memory. So when we say memory, it also tells you the speed of the memory. So let’s say you are consuming some x GB of memory for your heap, some remaining x GB of memory for your office. Also, it will give you further more information like let’s say if you’re using direct byte buffers or if you are using memory mapped buffers, then it gives you pretty detailed information about the memory. This is the specific piece which we used when we were figuring out memory usage for Marmaray jobs. Which I’ll be talking about later in this presentation. The second thing is the method duration profiling. So let’s say you have some API calls which you are making to NameNode or DataNode, and you want to profile them, Hadoop profiler is the way to do it. And the third thing is method arguments profiling. So let’s say if you are making some remote API calls, and you are passing some arguments, Hadoop profiler lets you profile them as well. Once the information is collected in the Java agent, it needs a way to report them. So the default reporters are Kafka and InfluxDB. At Uber, we use Kafka. But if you want to add your own reporter, it’s open source so feel free to go and add your own.

Spark Listener

Spark Listener, we are using at Uber for different reasons, one use cases to detect if there is a node which is slow, because there is let’s say, a disc issue or a network issue. So we want to identify which task is running on that executer and probably kill that task or probably blacklist that executer. So that’s one use case. The second use case is to do the task analysis. So let’s say you have a Spark job run but now you want to know what happened like, which task was running on which executer? How much time it took? Was there a way I can change the Spark scheduling logic so that I can run this task on somewhere else? So that all information you can capture using Spark Listener. So, let me set the context for what we did as part of the Auto Tune. So typically what happens is data science team has a Spark constantly. And that is what they use for running all their iPhones, Blackberries. So what we saw as a typical pattern at Uber is, usually they will have like a default Spark configuration. So when I say default configuration, I’m talking about memory and number of executors they’re using to run the query. It runs fine till they start accessing bigger hive tables, and that’s when it starts breaking. So usually they tune that Spark configuration by adding more number of executors or adding more memory to it. The problem is, once that bigger table query starts running, they check in that configuration into their central repository. The downside of that is, it now gets used in all hive on Spark queries which they are running against our production hive cluster. So let’s say previously, they were using only 500 executors to run all their hive queries. Now suddenly they need 2000 or 2500 executors. The problem which we noticed is they actually don’t need those many executors to run all those queries. Only couple of queries need those many executors. So in order to save money for company, what we did is we basically looked at the historical pattern of those queries and analyze that okay, when they specify, let’s say, 500 executors with 16 GB of memory, do they really end up using that? If not, then there is a room for improvement. So looking at the historical pattern, we adjusted the number of executors and memory to see if the query finishes, if not, we will always fall back to the configuration provided by them. What this helped us is, this helped us a lot by reducing the Yarn queue usage footprint and thereby saving money for Uber. So all these Spark improvements we did when we were building Marmaray. So Marmaray is an open source project. It was open sourced in Spark in September 2018. I have added a GitHub link so if you are interested, please go and check it out. There’s also a link to the blog Post. So please check out the blog Post.

High-Level Architecture

This is a high level architecture of Marmaray. I’m not going in too much detail about it. There are many presentations we have given outside where we have talked about it in detail. The only thing I would like to highlight here is, the entire Marmaray ingestion framework is supposed to ingest data from any source to anything and it is built on top of Spark. So we use Spark as our execution chamber.

Spark job

Now let’s talk about Spark job improvements.

First I’ll be talking about storage improvements.

Effective data layout (parquet)

Entire analytical data at Uber is stored in parquet format. Parquet, as you all know, is a columnar data format. What that means is let’s say you have 100 rows, which you are trying to store in a bucket. And let’s say every row has four columns, then the way the data is stored in parquet is, for all 100 rows column one data will be returned first, followed by column two. Column two is data for all hundred rows, then column three and then column four. So we looked at the different types of compression we get when we write the data to parquet. There are two types of compressions you can get. One is columnar compression and the other one is snappy compression. Snappy or any compression type you have chosen. It could be Gzip, it could be Brotli or it could be something else.

What we observed is, since it’s a columnar storage format, you get much more benefit if you take advantage of the columnar compression, rather than using different compression types.

First Name

Let’s look at the example. So here we are trying to store user information in the parquet. And as you can see, we have total five columns, user ID is a unit identifier. So that’s never going to be unique. Now let’s say if we are sorting the records by user ID and storing that in parquet. You can see that in the upper table, all five columns do not have anything in common. So if you are writing this into parquet, you will not get any columnar compression. All you will get is the compression which is bias Let’s say if you’re using snappy or Gzip. So let’s say five or 10% of compression is what you will get. But if you agree or restructure this data, as I have shown it in the below table, let’s see if you sort these rows by city or state, then what will happen is all users, who are part of the same city are together. Now what will happen is, let’s say if you want to store the bottom table into a parquet, then column three and column four have identical values. So in the upper table, if you are trying to store in parquet, you would have ended up storing roughly about 10 values, five for city and five for state, but if you are storing it in a sorted format, since they are identical, you will only store two, one for city which is Atlanta city and one state which is Georgia. So, here you can see that you will get a lot of compression by sorting the records by city and state. So one thing I would like to highlight here is, so whenever you want to decide what sorting criteria you should be using. Take a look at your cardinality of individual columns. So it has worked the best for us, which is if you know the cardinality of the columns sort the records with increasing cardinality. That’s where you most likely are going to get better compression. But always try different combinations because you can have a column which has, let’s say, a textual data, and it’s a longer length. So it’s taking more storage space. So if you start with that, as against sorting by a byte, you will most likely be getting better compression.

CPU / Runtime

Now let’s talk about CPU and runtime. So in this section, we actually did a lot of improvements.

Custom Spark accumulators

So let’s first talk about Spark accumulators. We can see that Spark accumulators are used heavily. So if you are going to Spark UI, and you see all the metrics which are populated there, behind the scenes, there are all long accumulators.

So traditionally, that’s what like a lot of people are using in their smartphones. But along this long accumulator Spark framework provides something called an accumulator V2, which you can see on the right side. This accumulator V2 really lets you define your own custom accumulators. And that can be a very powerful tool. This is an example of how you can actually use accumulator to solve a real business use case the same time, reduce run time and as well as save some resources. So the use case we are trying to solve is let’s say you have a set of rider records, and you want to duplicate those records. While doing the deduplication, you want to figure out how many duplicates you see per state. So that’s the goal. So let’s start. I’m right now focusing on the left section of the slide. So first, we are trying to read the records from parquet. Then, before we do the duplication, we will try to find out the records of our state, I’m talking about a very nice way. Then, we will try to dedup the records which is step three. Step four, after you’re duplication, we want to find out how many records we have per state. Step five, the deduplicated records, we will write it to HDFS. Step six, since we have two counts, one after the deduplication and one before deduplication, so if we take a diff we’ll know how many duplicates we have per state. So let’s say if you run this per job, you’ll see that there are five Sparks stage and three of them are coming from count biking. Which is not required, and it’s definitely something we can improve upon. So let’s look at the right side now. So, what we have done is we have defined a custom accumulator called rights per state accumulator. And what it takes is basically a map of string comma long. So string here is the state ID. And long is basically number of rides per state. What we will do is, we will define two such accumulators one for rides before state and one for rides after. Sorry, rides before dedup and one for rides per state after dedup. What you’re doing here is, if you look at the argument which is passed to dedup function in line three, we are just chaining a map operation. And as you know map is not a Spark action. It’s a lazy evaluated core. So you’re just chaining some extra functions to be executed for every record whenever this dot gets executed. So, you’re adding such one map function and we are saying that for every record increment the counter based on its state ID. After your duplication is done, we are chaining one more map operation, where we are trying to increment the count for a rights per state after dedup accumulator. Finally, we are sending that for writing to HDFS. So here in this entire Spark dag write to HDFS is the only action. Once that gets executed and has finished on the driver you will have basically two accumulators populated with respective counts. So once the action has finished, you can take the diff and you will know that, how many rides have duplicates per state. The good thing is if you run this code, you will only have two Spark stages, and it will do exactly the same what it did on the left side. So, the good thing is like you will be able to save a large number of executers runtime and thereby you will be able to save cost. Let’s look at the other example.

ncreasing kafka read parallelism

So, we had an ingestion system, which was ingesting data from Kafka to Hadoop. I’m talking about a case where we have a topic which has 256 partitions. And just for this hypothetical use case, it has 3 million messages per run. The problem is Kafka, We were using Spark streaming library to read from Kafka and Spark streaming library has an inherent disadvantage. Which is it tries to pin Kafka partition to one specific executor. What that means is, let’s say you want to read from same Kafka partition from two executors it doesn’t allow you to do that. It basically tries to serialize the operations when you are trying to read from a same Kafka partition. The problem is, we have lots of operations we want to do on this data, which we have read from Kafka before writing it to HDFS. Like sorting the records, group by order by some number number of operations, right? So, for that, we can’t be using 256 execute orders, we want to use more executors. So typically, like this is what is done in industries. You read from Kafka and then you shuffle. So, you basically have a repartition stage right after that, and you send the data to as many executors as you have and then you get more parallelism. The problem with this approach is, as you can see, in order to do repartition, what you are basically doing is you are doing a shuffle write which is around 1.3 terabytes, and it is followed by a rematch shufflering, which is around 1.3 terabytes. So there are two problems, one, definitely CPU cycles are wasted in doing a shuffle write and shuffle read. And the second problem is that you are also wearing out disk. So, if you do this frequently, then you are basically trying to reduce the lifetime of disk. So you have to replace the disk very frequently. So, this was a problem and what we did basically is, we try to remove this restriction from Spark streaming library, and we basically split one Kafka partition into 32 virtual partitions. We made this configurable, so you can define how many virtual partitions you want to create for every Kafka partition, but what that let us do is, we can now read from more than one executor, same data partition. So, think of virtual partition as one Kafka partition, let’s say was, we were reading earlier 400 messages. Now, what we are doing is we are saying one virtual partition is read from Kafka partition one offset 0 to hundred second virtual partition, will try to read it from 101 to 200, and so on and so forth. So in order to achieve that, this is what we did. So if you look at the Spark streaming library code, then there is something called as Kafka RDD and in that there is a get preferred locations and that’s how they actually pin Kafka partition to executer inside override that feature.

Kryo serialization

So another way to improve Spark performance is by using Kryo serialization. If you have not been using it, I will highly recommend it. There are lots of advantages with Kryo one is it has a lesser memory footprint than Java serializer it’s faster, it supports custom serializer. There is one issue though with Kryo which we fixed in our internal Spark build. It’s not yet open sourced but very soon we will do it. Which is when you are setting some Kryo specific configurations on Spark driver. They today are not propagated to all the executors. What that means is like, if you look at the Spark UI, you will see that I have set all these Kryo configurations but they are not actually getting used. This is like something we were observing. We were debugging Kryo serialization streams when we were performance streaming Marmaray. So there are some things I have highlighted here. One is, let’s say if you are using a Spark with Avro genetic recording, then it’s recommended that you register your Avro schemas, because that helps you really, reduce your Kryo serialization string size. I will be talking about that in the next slide. And let’s say if you are using some custom classes, then it’s highly recommended that you register those classes using Spark Kryo classes to register. And if you’re not sure what classes I should be registering, just turn on the registration required configuration. In the logs it will start spitting out okay, this is the class I’m not able to find that we basically you can do some N iterations and add all the classes in (indistinct) So now let’s look at the Kryo serialization stream without schema added into the config and with schema added into the config. So on the left side we have not added required schemas into the config. So this is how the Kryo serialization stream will look like. So what Kryo does is, it will write schema for every record written to the Kryo serialization stream. So let’s say you’re writing that to disk or sending it to a network. This is how your serialization stream will be. So if your schemas are big, which is most likely guess for us, then your record size will be around 4248 bytes. But let’s say if you register the schema, then you are just writing a schema identifier which is four bytes. And let’s say your data remains the same 128 bytes. So then you are basically reducing your record size from 4000 bytes to around 132 bytes. So this is like a huge saving you can get. And this actually helps in our disk and network.

Reduce ser/deser time by restructuring payload

Another improvement which we did was by restructuring our payload. So let’s look at the left hand side. Which is we have a typical Spark payload, which has some custom metadata information. And then there is some extra payload. Here, line six is that a custom payload, which is basically a map, which has a string as a key and value as a generic icon. The reason I put it here is because let’s say this has let’s say 100 to 1000 keys okay. And the Spark payload is used in multiple Spark operations. So we have like, we are reading it from some place then we are doing sort by, we are doing town, we are doing group by order by bunch of operations we are doing. And finally we need this data when we are writing it to HDFS I’m just making up this use case.

The problem with this kind of a payload is whenever you use Spark payload in any of your Spark operation, Spark has to deserialize the entire Java class. So let’s say this map has thousand entries. And let’s say this generic record has a super complex schema. What will happen is Spark will spend a lot of CPU cycles in serializing and deserializing this object. Which is not required, let’s say, if you are just in this use case, if you are just looking at sorting key, then it makes more sense to serialize this map, which is what I have done on the right side. So let’s say if you have something like this, store it internally in a serialized format, so you can just add a byte array. The advantages, whenever the Spark payload is deserialized by Spark, all it has to do is read binary data from disk and put it into this. So the number of CPU cycles spent for this are way to less than spent on the on the left side.

Parallelizing spark’s iterator

Another improvement which we did was by paralyzing Spark iterator. So let’s look at an example. So I’ve added a very simple example where we have some map partitions function which provides you an iterator on records and we are just doing while i.has next, parquet writer dot write. And once I run out of records, then basically I’m going to call parquet writer or close. This is a simple way of writing records to parquet. And now let’s look at what happens as a part of this operation. Okay, I’m focusing on the left side. So, the color coding is this way, orange means everything happens in Spark line and green means everything happens in user line. So, what happens is whenever you are performing I dot hasnext Spark has to figure out if there is any record or not. So it is going to read the disk, will read the stream it will see okay, I have a new record it will return to. So whenever you say i dot next it will basically return you the next record. Once you get the record you will give it to columnar parquet writer. Once your row group is filled parquet writer will do columnar compression, write it to underlying output stream and data will be returned to HDFS. So far, so good. The problem we were seeing is, since we were writing one GB file, this whole operation was taking 45 minutes, 45 ish minutes. So that was not expected we wanted to you know, we wanted this to run faster. So what we did is we basically split the responsibilities. So we let the Spark thread read from disk and populate into an internal shared memory buffer. And we spawned a new thread, which can read from the shared buffer and do the parquet right operation. The advantage we got with that is since we have now two threads, and they are sharing a buffer, we can read very fast from Spark and also finish this whole operation quite fast. So we were able to reduce the runtime from 45 to 25 minutes by just doing this optimization. So, if you have a use case something like this, please think about this.

Improve utilization by sharing same spark resources

Now I will be talking about the efficiency improvements. So, this is a very simple use case, or a very simple Spark that I have written, which I’m sure will never run in production. But what I’m trying to highlight here is let’s say we have a Java Spark context and we are reading from Kafka. We have let’s say topic one and then we (indistinct) right? Now, if we try to see what happens when we run this code, this is how something like this… This is what you will see. So, the vertical axis is basically executor IDs and horizontal axis denotes time. And in between that, we have lot of red pointers. So those vertical lines basically mean stage boundaries. Now you can see that there are a lot of places where only you see one horizontal line, that horizontal line indicates that there is only one or maybe two Spark tasks which are running and every other executor is sitting idle. So a lot of white space here indicates that there is a lot of time spent as a part of running this Spark job where executors was sitting idle. So if you now see the efficiency of this job, the efficiency of this job is going to be very, very low. And that’s a very big problem, because finally, when you are running an ingestion system, all you look at is what is the dollar value you’re going to spend when you are moving one GB or one TB of data from store system to sync system. Now the dollar value is going to be very high for a system like this, because you’re wasting a lot of resources and there is a room for improvement. So what we did is, we instead of launching ingestion for one Kafka topic in one Spark job, we clubbed a lot of Kafka topics together. So what we did is we basically launched… At Uber we launched 32. But take a look at your own around… Take a look at your own Spark dag and you can decide your own configuration, but 32 worked very well for us. Now you can see that there is hardly any white space here. You will only see white color right at the end as a vertical line, but that’s expected because when you are almost finishing the Spark job, there is pretty much nothing left. And that’s why there are some executors who are going to be idle, which is okay. But if you look at the left half, this is completely filled. All these colors means that the Spark is good to start working on some tasks or the other. So finally, I will talk about memory improvements. Here, I’m primarily focusing on off heap improvements, which we had to do.

Off heap memory improvements (work in progress)

So if you remember, like in the beginning of my presentation, I was talking about JVM profiler and it helped us in memory profiling our jobs. This is what it helped us with. So what was happening is, our jobs were running with seven GB of memory, and they were running fine. So our job was running for, let’s say, 30 minutes. up to 25 minutes, everything was fine and suddenly we were seeing a couple of executors getting killed. When we look at the driver logs. this is what we saw. Container killed by YARN for exceeding memory limits. And we were not sure what’s going on because we were not using extra memory, we were not doing any off it memory operation. So we were not sure what’s going on. So, JVM profiler help here. So what was happening is we use persist very heavily in Marmaray. So we cache lots of stuff so that we don’t re-compute operations. What happens is, Spark let’s say you have to executor two and which needs data from previous stage, and if that previous stage pass did not run on the same executor, it will ask for the data from someone other executor. Now when it does that, what Spark was doing till Spark two dot one version is, it used to memory map the entire file. So let’s say you have some RDD blocks, which are one GB or two GB in size. Then let’s say suddenly, some external executors request those RDD blocks then Spark will simply start memory mapping those files. And what will happen is that will simply jump up your physical memory usage and, that’s why YARN which is monitoring the physical memory usage for every processor will go ahead and kill it. This is definitely not something what we wanted. So we had some ideas in mind of how we can improve it. So we were thinking about doing a rolling buffer where instead of memory mapping the entire file, we could do like some four MB buffers and keep rolling so that we reduce the physical memory usage. But parallely we also had an ongoing task of moving from Spark two dot one to two dot four. So the good thing here is like if you have switched to Spark two dot four you will not see this problem. The reason is because in Spark two dot four they have migrated from using this memory map like process to using file regions. They did that for some different reason. But actually this solves this problem as well. So if you are hitting this issue and you’re on Spark two dot one I highly recommend just move to Spark two dot four. That can save you a lot of memory and a lot of money for your company. If you want to fix it for two dot one, yeah, let’s talk.

That’s all I have for today. We are hiring. If you are interested and you want to work on interesting problems, please reach out.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Omkar Joshi

Omkar Joshi is a senior software engineer on Uber's Hadoop platform team, where he's architecting Marmaray. Omkar has a keen interest in solving large-scale distributed systems problems. Previously, he led object store and NFS solutions at Hedvig and was an initial contributor to Hadoop's YARN scheduler.