Getting Started with Apache Spark on Kubernetes

Download Slides

Community adoption of Kubernetes (instead of YARN) as a scheduler for Apache Spark has been accelerating since the major improvements from Spark 3.0 release. Companies choose to run Spark on Kubernetes to use a single cloud-agnostic technology across their entire stack, and to benefit from improved isolation and resource sharing for concurrent workloads. In this talk, the founders of Data Mechanics, a serverless Spark platform powered by Kubernetes, will show how to easily get started with Spark on Kubernetes.

We will go through an end-to-end example of building, deploying and maintaining an end-to-end data pipeline. This will be a code-heavy session with many tips to help beginners and intermediate Spark developers be successful with Spark on Kubernetes, and live demos running on the Data Mechanics platform.

Included topics:
– Setting up your environment (data access, node pools)
– Sizing your applications (pod sizes, dynamic allocation)
– Boosting your performance through critical disk and I/O optimizations
– Monitoring your application logs and metrics for debugging and reporting

Speakers: Jean-Yves Stephan and Julien Dumazert

Transcript

– Hello everyone, thanks for watching our session about how to get started with Spark on Kubernetes. Let’s introduce ourselves. We’re the co-founders of Data Mechanics, a Y Combinator backed startup, building a very easy to use Spark platform. So it’s an alternative to services like EMR, Databricks, Dataprobe, Hortonworks, and so forth. I’m JY and prior to Data Mechanics, I was a software engineer at Databricks leading their Spark infrastructure team. So I have experience with Spark as an infrastructure provider. Julien, our CTO who will talk later, worked with Spark as a data scientist and data engineer at Contentsquare and BlaBlaCar. So he’s experienced with Spark as an application developer. Now we’ve talked about us and now we’d love to know who you are, so when this talk is broadcasted you should see a Live poll to cast your votes. So we’d like to know what is your experience with Spark on Kubernetes? One, never used it but curious about it, two, prototyped it but not using it in production or three, using it in production? We look forward to seeing the results. Let’s now see how are we’re gonna spend the next 25 minutes together. First, we’ll go over the motivations for running Spark on Kubernetes, then we’ll give you concrete advice to help you get started with it. We’ll try to give you a look and feel of the Dev Workflow on Kubernetes through a demo running on the Data Mechanics platform, and finally, we’ll talk a bit about the future of Spark on Kubernetes. But first I’ll say a few words about Data Mechanics so you understand our background with Spark on Kubernetes. So Data Mechanics is a serverless Spark platform. What do we mean by that? Well, three things. First, it’s an autopilot mode. Our platform automatically tunes the infrastructure parameters and Spark configurations of the pipelines running on it. Memory and CPU allocations, instance dive and disc settings, Spark configurations around parallelism, memory management and shuffle. This makes ETL jobs more cost efficient and more stable. Second, it’s fully containerized. Just submit your application as a Docker image and we’ll manage the underlying servers for you. And third, our fee is based on Spark compute time instead of server uptime. Because server uptime is often wasted. On our platform it’s our responsibility to manage servers efficiently to reduce our customer’s cloud costs. Now, on the architecture side, our platform is deployed on Kubernetes cluster in our customers cloud accounts. Why? Because this means our customer sensitive data does not leave their cloud accounts. In fact the Kubernetes cluster can be made private. Now inside this cluster, there’s one service running all the time that we call the Data Mechanic’s gateway, and this is the entry point for starting Spark applications. You can either connect with Jupyter notebook or submit the applications programmatically through our rest API or our Airflow connector. Once the application hits the gateway, we’re going to deploy, configure and scale the corresponding Spark obligation. We’ll also manage the underlying Kubernetes nodes to scale the cluster up and down based on loads. So as you can see, our entire company is built on top of Spark on Kubernetes which is why we’re giving the stack. So what’s the difference between Kubernetes open source? Well, we offer an intuitive UI to make it easy to use, we offer dynamic optimizations to make it cost efficient and we offer a managed service to make it set up and maintenance free. Now that you have a bit of background on us, let’s look at the main motivations for running Spark on Kubernetes. So Kubernetes is a cluster manager for Spark. It’s an alternative to Hadoop YARN, which is by far the most common cluster manager for Spark today. Since versions since Spark version 2.3, Spark natively runs on Kubernetes. What does that mean? When you submit a Spark application, your request is understood directly by the Kubernetes master. The Kubernetes master then starts a Spark driver container which is called a pod in Kubernetes language, and then the Spark driver and the Kubernetes master will directly communicate with each other to request and start Spark executors. In fact, if you enable dynamic allocation this communication is gonna continue dynamically during the life of the Spark application to add and remove executors based on the load. Now what are the motivations for deploying Spark on the Kubernetes instead of YARN? Well, first you get the advantages of a shared YARN cluster without it’s disadvantages. The advantage is you get the cost efficiency over a shared infrastructure, okay? And in fact Kubernetes can reallocate capacity between concurrent apps in just a few seconds so really quickly. And you don’t get, the disadvantage, you don’t get is you have the full stability of fully isolated applications. So each Spark app can have its own Spark version, it’s own python version, it’s own dependencies. The second main motivation for running Spark on Kubernetes is the fact that Kubernetes is a cloud-agnostic infrastructure that is very popular and maybe already be used at your company. By running Spark on Kubernetes, you’ll benefit for free from a rich ecosystem of Kubernetes compatible tools. You may also use the same Kubernetes cluster to run non-Spark workloads, like pure Python ETL jobs or to serve machine learning models. Last but not least, you’ll be able to use Docker to package all your dependencies and iterate on your code quickly. We’ll show you what this looks like later on in the demo. Okay, so we’re convinced, now how do we get started? Well, if you’d like to start a free trial of the Data Mechanics platform just check our website, schedule a demo with our team, and we’ll be happy to help you. But here, this is a bit of a guide if you would like to get started in a do it yourself way. So what should you do? First obviously create the Kubernetes cluster with proper networking, with proper node pools. We also recommend installing the Spark operator which is an open source project that makes Spark application management much simpler. And the cluster auto-scaler, which is another open source project that will make your Kubernetes cluster dynamically scale up and down based on nodes. On the monitoring side, you’ll wanna have at least access to the Spark UI as well as to some system metrics. And then there are a few critical optimizations that you should look into. Typically shuffle performance is super important so we recommend using local SSDs. The use of spot nodes can also have a huge impact on the costs. And also how to configure your pod sizes for optimal bin-packing and not wasting any capacity. So in this doc, we won’t have the time to cover everything but we wrote a blog post with technical details on any of these topics. So we’ll send you the link. And now let’s zoom in on the monitoring side. So, the main tool that people use for monitoring is the Spark UI. You can easily access the Spark UI off a live applications running on Kubernetes. You just need to do a port forwarding between your laptop and the Spark driver of your choice. Now for terminated Spark applications, it’s a bit more work. You need to host a Spark history server. So to do it yourself, you should configure a Spark to write its event logs to a persistent storage like S3, and then you should install the Spark history server on Kubernetes and configure it to read the logs from this persistent storage. This is the hard way, but we’re super happy to announce that as of this week, we’ll offer you an easier way. We have open source Spark agent which will run inside your Spark application and which will stream necessary metrics to our backend. And then we’ll give you access to the Spark UI hosted on our websites. And this is completely free and this will work anywhere in the cloud, in on-premise. In fact it also works on YARN, it works on EMR Databricks and more platforms. So check out our GitHub page for installation instructions. You may wonder why we’re doing this? Well, this is our first milestone towards releasing a Spark UI replacement that we call Data Mechanic Delight. And so delight is a monitoring tool that aims to make Spark developer experience delightful, and so it will let you visualize memory and CPU metrics alongside your Spark jobs, stages, and tasks. And again, it will be free and cross-platform and right. This is just the first milestone so it’s just a Spark UI but in January, we’re targeting a release with more features. We’re also looking for feedback and for better testers, so check out our website if you’re interested. Okay, I’ll now leave the floor to our CTO Julien, who will give you tips on how to configure your Kubernetes nodes. Thanks.

– Thanks JY. Another important aspect when setting up the Kubernetes cluster for use with Spark is support for multiple node pools. This is important because the different pods in your cluster have different needs. The system pods if you will, like the Spark operator pod that’s run on a node pool with small on-demand nodes, small to reduce costs as the Spark operator pods must always be up and on-demand to guarantee availability. The Spark driver pods must run on on-demand nodes of medium-size. This node pool should often scale from zero so that should not for resources when node Spark application is right. It’s important to use on-demand nodes here because Spark on the driver would cause the application to fail. And finally the Spark executor pods should run on larger spot nodes since the spot since the Spark application can’t recover from a lost executor. Let’s see how to set this up on EKS. First, you need to install the cluster-autoscaler. This is an open source project and it’s consisting of a pod that will run on the cluster and interact with the cloud provider to request more nodes when there’s no room left on the cluster for appending pods. Then you want to define the node labeling scheme so that you can select nodes with labeled selectors and assigned for instance, executor pods to spot nodes. Below we’ve defined the label acme-lifecycle that conveys whether the node is spots or on-demand and the label acme-instance with the instance of a node. Then to make the labeling scheme work for node pools that scale down to zero, you will need to manage node pools manually as auto scaling groups and add auto-scaling group tags matching your node labels. This is shown below. This way the cluster-autoscaler notice from the tags on the right that when a node from a developer scaling group is launched, it will have the node labels on the left. Okay, we’re finally ready to use node pools with Spark. Below we show how to configure Spark to use the node pools we’ve just defined. When running Sparks and mints, you can add a global norm selector for all Spark pods with this Spark configuration. This is slightly limited as the nodes selector will be applied to both driver and executors. Another option here is to use our templates which have a richer API. If you’re using the Spark operator, you can define node selector in a style very similar to the Kubernetes . Okay, now that we’ve set up a cluster, we’re right to walk through a full example of application developments in the context of Spark on Kubernetes. We use Docker to package our Spark application. This is actually a great progress that was enabled by the use of cube as a Spark scheduler. Before Docker, you always have this fear that when you push your application to the cluster, it doesn’t run as expected. Most of the problems came from dependency management. In Java and Scala, you need to make sure that your job dependencies have matching versions with their Spark distribution on your cluster. On Kubernetes, the Docker image, includes the Spark distribution. So if your app runs locally, it will run on your production cluster. No bootstrap crisis. In Python, you needed to package all your custom modules and third-party libraries in this deep file or in the wheel or in the neck push it to HDFS on market store and refer to it in your Spark’s This was super vital for a non-show python libraries, large num pi which uses C code in the background. Now, everything is in the Docker image and matches the target operating system by design. And finally Docker makes iteration cycles very short thanks to it’s smart layer caches. So we’re gonna show you an example of this Docker Dev Workflow, we’ll run an image locally then on the Kubernetes cluster, iterate a little bit on the code and optimize performance at scale. We’ll use an example, data set from Echo Nest, a company that was acquired by Spotify. The data set is a million songs with acoustic features attached. We’ll use those features to create harmonic playlist, hope you enjoy it. Here’s a local folder with the code of my Spark application. Thanks to Docker, the project can adapt to standard layouts for Python projects. There’s a requirements file with third-party dependencies. There’s a custom module with a few functions I wrote to analyze and compare songs. And the main scripts where most of the code is. It’s a Spark application where we load songs from a data source, we compute a signature for every song. We isolate the songs that we want to use as seeds for our playlists, we compare every seed song with every other song in the datasets and keep the 10 most similar songs to every seed song to form playlists. And eventually we collect everything in the driver and print the playlists to the driver logs. The Dockerfile is very simple, it uses a space image to PySpark image offered by Data Mechanics. It contains connectors to stores of the major top providers. The Dockerfile installs to the libraries from the requirements file and it copies over the source files to the working directory. Let’s build the image. This should run rather fast as it’s not the first time I’m building the image. Okay, let’s now run the image locally to make sure there’s no bug in the code. There’s one interesting thing to see here. I’ve mounted the working directory as a volume into the Docker container so that we don’t need to rebuild the image every time we make a change to the source code. So let’s follow on the application now. Okay, we have our playlists and it took about 2 minutes and 20 seconds to run the application log. Now let’s run it at scale on the Data Mechanics cluster. Let me run the command first and then explain. First the image is built again and then the Command makes a call to the Data Mechanics cluster rest API to submit the application. And here’s the call. So here I wrap the operations in the just command. Just is a replacement for make that’s easier to read and write. So here the methods of interest is run cluster and as you can see, it’s a wrapper around the curl request that’s made to the API. And here you can see the arguments that we’re passing off to API. Okay, the application is live and we can follow the live log stream from the cluster. Jump forward in time, I’ve waited a little and I’ve opened the Dashboard of the Data Mechanics platform. It seems that this Spark application has failed after a quarter of an hour. Let’s dig into the app. Here’s the configuration that has been selected by the platform for this application and below are the stack trace and the driver logs. The stack trace is fairly explicit. It shows that we have an issue with singular matrices. This is the life of a data engineer. There are some issues that you just run into at scale. You need to process enough data to encounter all the edge cases. So let’s fix this into the code. We’ll protect this part of the code against singular matrices or any other error. This should be enough. Okay, let’s run it again. As you can see, our changes are picked up and pushed to the Docker registry. Okay, the application is live again. Jump forward in time again. This time the Spark application has completed. So let’s have a look at the results. We can see the playlist in the driver log, they make sense to some extent. We have blue songs with blue songs, rock songs with rock songs and classical music with classical music. Okay, what about the performance then? It took more than 50 minutes to run the application so there might be something we can improve here. Let’s have a look at this Spark UI. Most of the time was spent in one large job which has one large stage with more than 600 tasks. The tasks are a little bit skewed, but this is in the data so there’s not much that we can do at this point. And it seems that the time spent in tasks is actual compute. One thing that we can smell from the Sparks UI is whether the application is IU bound or CPU bound. Here, we used eight cores for each executor, on the Data Mechanics cluster this maps to an R5, two X large instance with eight cores. This instance type actually has the same network throughput as the R5 X lodge, which has only four cores. So if the application is I/O bound, an idea is to use smaller executors, but more often. So let’s have another try of the applications with four cores per executor to test the hypothesis. So let me change it here and run the application again. This time the application has completed in 30 minutes, let’s go to the jobs panel of the Data Mechanics Dashboard to compare this trend of the application and the previous one. The previous round took about 50 minutes when the last one took only 30 minutes, this is a 40% improvement. The only change we made was to reduce the number of cores per executor from eight to four, which divided the time spent in Spark tasks by almost two from 73 hours to 39 hours. We did this tuning manually in this demo to showcase the API and the Dev Workflow but keep in mind that this type of performance tuning is completely automated in the Data Mechanics platform. We automatically adjust infrastructure parameters like the container sizes and the disc settings and Spark configuration around memory shuffle and parallelism. The goal is to make recurring Spark jobs stable and cost efficient.

– Thanks Julien, we hope this demo has given you the look and feel of what it’s like to run Spark on Kubernetes. In this last part, we’ll now go over projects that are being worked on right now for Spark on Kubernetes. Let’s go through a timeline first. Early 2018, native support for running Spark on Kubernetes was added and we have Spark version 2.3. Support was so pretty bare bone then, and then in November of 2018, Spark brought essential improvements like climb mode support and volume mounts. In June 2020, Spark 3.0 was released with the much awaited support for dynamic allocation. So let’s zoom in on that a little bit. Dynamic allocation means that Spark can adjust its number of executors at run time based on these feeds. This mechanism is particularly powerful when combined with the speed at which Kubernetes can start new containers, okay. Kubernetes can start a container in a couple of seconds when there is capacity and if it needs to get a new node from the cloud provider, maybe one to two minutes. But so if dynamic allocation gets it’s new executors really quickly, things through Kubernetes being fast, it means the application can get just the right number of executor it needs. Note however, that this feature is available via a mechanism called shuffle tracking which means that the Spark driver tracks which executor is storing an active shuffle file. If an executor is active is storing an active shuffle file then this executor can not be scaled down, cannot be removed. Otherwise it would lose the shuffle files and so we would need to recompute it later. However, it’s still a great feature. So as you can see Spark on Kubernetes has been getting much more mature in the past few years and many companies are in fact using it in production. And as a result in the upcoming version of Spark version 3.1, which is planned for December, the experimental label will be officially dropped from Spark on Kubernetes. So Spark on Kubernetes will be officially, generally available. One of the feature that’s coming up in Spark 3.1, and that I’m particularly looking forward to is a better handling for node shutdown. So what does that mean? It’s a mechanism that will occur when a Spark application loses a Spark executor because of dynamic allocation, or just because of a node going down. Maybe it’d be because of the maintenance or a spot kill. So instead of abruptly losing the executor, Spark will be warned maybe 30 seconds early that the node is going away. And we will use this termination notice to gracefully decommission the executor. What does that mean? It means that the Spark driver will stop scheduling new tasks on the executor which is about to disappear and that the shuffle and cache data stored on this executor will be copied to a surviving executor. And so as a result the Spark application just keeps going unimpacted because we didn’t lose any shuffle files. And so this will make the use of spot instances much more stable with Spark and it will also make dynamic allocation more powerful. So okay, so what’s beyond a 3.1 then? One interesting project is to use a remote storage like S3 to persist shuffle data. So like the feature we just described this will make Spark more cost efficient and more resilient by fully separating the compute infrastructure from the storage layer. We don’t have a timeline exact for that, but it’s a work in progress. Okay, I think this is it. I wanted to thank you very much for watching our session, we hope it has given you some good advice to help you get started with Spark on Kubernetes. If you’re interested check out our blog on our website for a lot more content on Spark on Kubernetes or book a demo with us to start a free trial on the Data Mechanics platform. Thank you guys.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About Jean-Yves Stephan

Data Mechanics

Jean-Yves is the Co-Founder & CEO of Data Mechanics, a cloud-native spark platform available on AWS, GCP, and Azure. Their mission is to make Spark more developer friendly and cost-effective for data engineering teams. They are active contributors to open-source projects such as the Spark-on-Kubernetes operator and Data Mechanics Delight. Prior to Data Mechanics, Jean-Yves was a software engineer at Databricks, where he led the Spark infrastructure team.

About Julien Dumazert

Data Mechanics

Julien is the co-founder and CTO of Data Mechanics, a YCombinator-backed startup building a cloud-native data engineering platform. Their solution is deployed on a managed Kubernetes cluster inside their customers cloud account. Prior to Data Mechanics, Julien was a passionate Spark user as a data scientist and data engineer at the ride-sharing BlaBlaCar platform, and the user analytics platform ContentSquare.