Scale-Out Using Spark in Serverless Herd Mode!

Download Slides

Spark is a beast of a technology and can do amazing things, especially with large datasets. But some big data pipelines require processing the data in small chunks and running them through a large Spark cluster can be inefficient and expensive.

In this talk we’ll describe a system we’ve built using many independent spark clusters running in parallel, side by side, in Serverless style. We run them on a Kubernetes cluster, but don’t let this confuse you with Spark on Kubernetes which runs one large Spark cluster on Kubernetes. Our system scales up and down on the fly by spinning up/down more independant Spark clusters and is capable of processing huge amounts of data, at an affordable cost.

We’ll walk you through the reasoning behind this unique Spark serverless architecture, its’ benefits and how we went about building it. You’ll learn how to evaluate your own Spark cluster architecture and figure out if you too should consider using such an approach to save costs and reduce processing time.

Topics include:

  • The task scheduling problem
  • Considerations for a cost-effective task workflow
  • And much more….

Speakers: Opher Dubrovsky and Ilai Malka

Transcript

– Hi everybody. Welcome to this talk about “Scaling Out Spark in a Different Way We Call Serverless Herd Mode”. So first, I wanna talk a bit about spark. Spark, as you know, has super powers. On the one hand you can process huge amounts of data with it. And on the second hand, which is great you can easily scale it up by adding more nodes. So that’s really awesome but like every superhero, it has its weaknesses and we really wanna be aware of the spark kryptonite. So kryptonite number one is shuffle. Everybody’s aware of shuffle. Shuffle is when the nodes have to exchange data over the network. And that’s something to always be aware of. Kryptonite number two, is skewed data. Skewed data is when you have lots of different tasks, some of them are large and some of them are smaller because of the skewed data. And the small ones finished processing first and then have to wait for the longest test to finish. So that really creates processing delays and obviously extra costs because you have all these executor’s idling away wasting your money. So kryptonite number three is the whole cluster model. Clusters are built for a reasonable load and we need to basically select the cluster size. So for this particular amount of data, let’s say, this is the cluster sizing we select. As you can see in some hours of the day, this cluster is over-provisioned. We have too much capacity for the data coming in. So that means that we have a lot of idle time and that idle time translates to wasted money basically because we have idle resources idling away. The other problem is that when you do get that unexpected load then the cluster sizing is too small for it. So then you end up having queues and longer wait time for processing of the data. So overall you get excessive costs and you get delays. If we look on the other hand at the serverless approach. Serverless approach says let’s spin up resources only when they’re needed and that means you only pay for the processing that’s actually being done. So when you finished processing you can tear down the resources and stop paying for them. The nice thing about it is when you do get that a big burst of data, you can just spin up more resources and not have to wait longer for it to process. So you get no delays and it’s very cost efficient. So in this talk we wanted to tell you about a serverless approach for scaling out spark, which we call spark serverless herd mode. We’ll tell you about a real life case study of a system we’ve actually built to do this. And we’ll tell you about some use cases to consider where you might if you have some of them or something similar you might wanna consider such an approach yourself. Let me introduce ourselves. So, I’m Opher Dubrovsky, a big data dev lead at Nielsen Marketing Cloud. I love to focus on improving and tinkering with systems and constantly improve them. And with me is Ilai Malka. Ilai.

– Hi guys, I’m Ilai Malka. I’m a big data engineer and for the recent two years I work a lot on the serverless and from there we took the concept of the solution that we are going to show you today.

– So we are basically heavy spark users. And in addition we do a lot of serverless work and we really, really wanted for a long time to marry the two approaches together. We finally did it and this is what our talk is about. So to give you some context, I wanna tell you what Nielsen Marketing Cloud is. Nielsen marketing cloud is part of the Nielsen Corporation. It’s a product that’s basically a DMP data management platform used for marketing. It was acquired from a startup called eXelate in 2015. And we use this product to build marketing segments which can then be used to run campaigns on ad networks and build device graphs. So this is really useful for targeting and business decisions. To tell you a bit about, how much data we run we’re all cloud native. We’re heavy users of spark. We love spark. We run a lot of spark in the old cluster model way. Roughly we run about 6,000 nodes every day. We process about 60 terabytes a day of a new data every day and all in all we store about five terabytes of it. The system we’re gonna talk about is called data out. So this system is in charge of taking all this marketing segmentation data and uploading it to our ad platform partners. So in order for people to use them and run campaigns they need to go to one of our ad platform partners and then select the segments they wanna use and target them with ads. The system on its heaviest day ever ran about 250 billion events on that day. And it’s running all on serverless. We did recently a video on this system with the AWS, Amazon web services. It’s a great video to give you an overall view of what the system does. So if you’re interested, go watch it. I encourage you to do so. It’s on bit.ly/TMA-serverless, you can see the link below. So some numbers about this dateout platform. So as I said, top day ever we ran 250 billion events, the events come in file, so we had about 17 million files to processed all in all about 55 terabytes of data. The other interesting thing about it is that the system scales up and down over the day the peak hours in a normal day have about six terabytes of data and the low hours have about one terabytes of data. So that’s a big difference and we really like our systems to scale up and down over time. As you can see, this is incredible power and scale. So, we really needed good solutions for this. So let’s look at the old architecture. Originally, this is how it was built. On the left we have files coming in that need to get transformed. And we have about 140 platforms to send data to with about 600 accounts on them. Each one needs a different transformation so we have lots of transformations to do. We were using a spark cluster to do this part of the work. The spark cluster would transform the data and send the output files to a S3 bucket where they would be stored. And then on the right, we have Lambda functions that would actually pick up one or more files and send to the right ad platform for each file. You can notice that on the left, the transform side with spark was running the cluster model and the right side with the Lambda functions, was actually is pure serverless. So it was like scaling up and down over the day as the work came in. So this part really troubled us, we really wanted to have a seamless serverless model for everything. And for a long time, we were really looking for a way to merge these two different approaches. When we looked at the spark cluster, we saw that basically the throughput or the efficiency of the cluster went down as we increased the size of it and added instances. So you can see on the left side, when we had a few instances in the cluster, we had a pretty good throughput, but by the time you get to 14 instances, the throughput dropped to by about 35%. So that means each instance process about 25% less data. When we got to about 40 instances the drop was even more significant was down to about 60% drop. So this was not great. And every time we had a burst, this was a big challenge for us. You can see that the sweet spot was around up to 10 instances. So as long as we kept the cluster at the round up to 10 instances, it was okay. So originally our solution was to basically make sure the system is architected in a way that we can run multiple clusters. And every time we would get a burst, we would spin up more spark clusters and run them in parallel. So we could run one, two, three clusters, even more, and we made sure that each one is isolated and doesn’t mess up the work of the other clusters. But this really bugged us. We didn’t really like the solution. It worked quite well, but it was not elegant. So for a long time, we really wanted to do something better. And when we had the time to do it we sat down to think of how can we improve this solution. So, we decided to look for a better solution and we set out some goals for that. So, goal number one was it had to scale well. We wanted it to scale up and down really quickly, both to save on the cost as well as be able to handle the burst. Second, it had to be really efficient and we want it to pay only for the processing times that we do but also that the total cost would be reasonable. And then the last thing, as I mentioned before it needed to be able to handle bursts really well in a timely manner. If you think about it, this is really a serverless like system. So back to the architecture, we really wanted to take the spark cluster on the left and then replace it with something better. But it took us a while to figure out what it should be. So to tell you all about it, I’m going to call Ilai and he’ll explain the gory details. Ilai.

– Thank you, Opher. So let’s see what a serverless mode and which benefits it can bring you. So the main point is that instead of finding a star cluster with on multiple isolated standalone spark pods. And each spark spod is independent of the other, which mean that in this solution, we don’t have shuffle. We have a task queue, and it’s spark pod such as a task queue and process it. And when you finish it fetches the next available task. Now the cool part in this solution is that it can scale up and down according to demand. When we have more tasks in the task queue we can spin up more spark pods. And when we have less task in the task queue, we can remove spark pods. Let’s see how we did it. So the task queue is SQS, which is a queue service by Amazon. And we have a work manager that’s responsible for repairing those tasks. And it propels them in a way that each task contains the proper amount of data to be processed in a single spark body. Now, we are running this solution of EKS, which is managed service by Amazon. And every few pods are running on EC2 machine. And we have two levels of scalability. The first developed scalability is using horizontal pod autoscaler or HPA that are responsible for adding and removing spark pods. The HBA is monitoring the number of tasks that we have in task queue and decide accordingly how many tasks we need to have. The second level of scalability is using cluster autoscaler which are responsible for adding and removing EC2 machines in order to fit all our spark pods on the EKS cluster. So notice that this solution is very similar to the serverless concept because we are getting computation power according to the amount of work that we need to do. And when we finished this work we tear down all the no longer necessary resources. Do you remember kryptonite number one? Shuffle in this solution, we don’t touch shuffle since every port is isolated and hung into attendant on the spoke. We don’t have shuffle at all.

– In regarding kryptonite number two, skewed data we still have skewed data because this is the nature of the data system, but now it doesn’t have any effect on the system. Since every point is isolated, we don’t need to wait for the slowest executer to finish the walk. We can start a new task immediately and not having idle times. Now let’s see the results. This is the scalability of the system in action. At the top of graph, you can see the number of tassk and at the bottom graph, you can see the stock pods and you see two machines. So you consider those two graphs are moving together very closely. When we have more data came in into our system we spin up more spark pods in this two machines. When we get to less data into the system we’ll remove spark pods a single machine. So we are scaling according to demand. Now, let’s see what happened when we have burst. At the top graph, you can see that the number of messages goes between 20 and 60. This is the normal load of the system. And now take a look that huge data burst coming into our system and the system immediately responds by and scaling up and start dealing with the queue. Do you remember the three goals that we sent a man to do? We do all four of them. We scale up and down, according to demand, we pay for what we use because if we remove the resources that we don’t need and we can enter the bills. Now let’s take a look at the performance of the system. As we add more and more instances, the number of megabytes with posters to instance per hour remains very flat. This is exactly the linear behavior that we wish to have. Do you remember how it was with the old solution? As we added more and more instances, the efficiency went down. And you can see that the new solution all across the board is better than the old one. For example, if we will take a look at 10 instances we have an improvement of 128%. Also, do you remember where it was the sweet spot? It was ideal. So even in the sweet spot, even in the sweet spot, the new solution is better. Now let’s talk about money. This is a cost comparison between the old system on the left and the new system on the right. In the old system, the cost was really fixed because we were paying for a cluster we fixed amount of instances that were running 24 seven, but in the new solution, the cost is changing every day according to the amount of data that we need to process that day. And of course the new solution is cheaper. This is all the savings, this is actually 55% of saving, which in our case was $15,000 per year. Now let’s look a bit under the hood see some extra stuff that might be interesting for you. We are using instant slit, those are the fleet instances that we’re using. Those instances are similar in cost to the instances that we were using in the old system. And each instance has eight calls and felt it to gigabyte of memory then we can squeeze up to five spark pods on each instance. And we give each spark pod eight, five gigabytes of memory. And in this spark config we can see that we are running one executer with one quo and two gigabytes of memory. And then this is standalone stock. So by looking at those numbers, we can understand that the door is open for more authorizations. For example, we can add more memory for each spark pod or squeeze more pods on each instance and we are going to play with it in the future. Do you all remember that we are using horizontal pod autoscaler in order to spin ups and down the spark pods. Now let’s understand how it works. So this is the formula that we are using in order to determine how many points that we want to have. We divided the number of task in the queue by two. Let’s see a single illustration. We have four tasks in the task queue. So the desired pod is two. Now let’s assume we have one pod currently hunting but we wanna have two. Then we’ll just need to spin up another pod. And the reason that we are dividing by two is that each task one wait longer than two processing cycles. Now scaling down is dangerous. Let’s understand why. Let’s assume the desire of the pod is two, and the current pod of four. So we need to remove two points. The problem is that it is possible that we will remove a pause while he is processing a task. So the task will be stopped in the middle of the walk, and we will have lost tasks. We actually suffered from this problem and we had 240 lost task every day which translate to lost processing hours and we post the thing of those tasks. So obviously we have to find a way to overcome this issue and we find a way to do graceful termination of the pods. So instead of killing the pod immediately we found a way to let the poison finish processing the current task, and then terminate. Let’s see how we did it. So let’s assume Kubernetes need to terminate point number two. Then Kubernetes we sent a letter today signaled, and then waits until you get back ready to die signal from the pod. Let’s see how we did it. We had to change two places, one in our application code. And second in Kubernetes body life cycle. I’m not going to go over all the code because we don’t have enough time. But the main point is that we override the please stop command of the pod life cycle so it will send a signal to the application and then waits until the application return back a message, which means that it’s done processing the color in task and then we can terminate the pod. We can also access the spark UI of the life’s pods. In this dashboard, every box is a spark pod they’re trying right now. And by clicking on the URL, we can access the spark UI of this live pod. Now we are going to talk about some cool used cases where our solution is good for, and for that I’m going to end it over to Opher.

– All right, thank you, Ilai. So, let’s talk about a few use cases to see what this is really good for. So use case number one is machine learning classification of a pipelines of videos, images, and maybe some clips. So usually these come in at varying rates over the day maybe in the middle of the day, you might have more and at nighttime less. So you really wanna have a system that it can scale up and down with the traffic coming in. So this system is perfect to do classification of these events coming in or files. The nice thing about it, not only do you save money, but when you do get that big burst of data, you can guarantee that you can process it really quickly and classify it quickly. So another use case is bursty data pipelines. This is basically the use case we talked about. This is great when you have a pipelines that have different bursts and varying amounts of data coming in, or that have very skewed data. So this solution is perfect for that as well. And the third use case we wanna highlight is basically machine learning training. Where you have very large divisible datasets. So an example of something like this is maybe you have to train on millions and millions of images. So the common best practices to divide up the image base into lots of smaller image basis and do the training separately on separate machines or clusters. So this is perfect for our system as well because the system can scale up and down with the amount of data coming in again at perfect solution. I wanna summarize what you’ve seen. So we talked about a system of spark that we call spark serverless herd. It’s a system that’s very much serverless like. It has the ability to handle huge amounts of bursts and scale up and down with the traffic as it comes in. And what’s nice about it, it’s very cost-effective. So not only can you process data faster you do it at a much cheaper cost. We really encourage you to consider trying out this methodology. And if you have interesting new use cases, send us a note we’d love to hear about it. I wanna remind you to fill out the feedback for our talk. So don’t forget to rate and review the session. And in case you wanna get in touch with us, both Ilai and I are on LinkedIn. I wanna remind you to watch the video about the dataout system. Again, it’s on bit.ly/TMA-serverless. The link is on the slide, and if you wanna read our blog the link is on the slide below. Thank you guys and thank you for coming to our talk.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About Opher Dubrovsky

Nielsen

Opher is a big data team lead at Nielsen. His team builds massive data pipelines that are cost effective and scalable (~250 Billion events/day). Their projects run on AWS, using Spark, serverless Lambda functions, Airflow, OpenFAAS, Kubernetes and more. He is passionate about new technologies, data, algorithms and machine learning. He loves to tackle difficult problems and come up with amazing solutions to them. He holds 4 patents in the area of security, and lots of ideas for more..

About Ilai Malka

Nielsen

Ilai is a Big Data Developer at Nielsen, responsible for building massive data pipelines that stream huge amount of data (~250 Billion events/day). Our projects run on AWS, using Spark on EMR, serverless Lambda functions and Kubernetes. He has a B.Sc in Computer Science and started his programing career 13 year ago. He then moved into the Big Data area which he loves. He is especially passionate about tackling complex problems, building huge pipelines and sharing his knowledge with others.