Cloud-Native Apache Spark Scheduling with YuniKorn Scheduler

Download Slides

Kubernetes is the most popular container orchestration system that is natively designed for Cloud. At Lyft and Cloudera, we have both emerged the next-generation, cloud-native infrastructure based on Kubernetes, which supports various distributed workloads. We embrace Apache Spark for data engineering and machine learning, and by running Spark on Kubernetes, we are able to exploit compute power promisingly under such highly elastic, scalable and decoupled architecture. We made a lot of effort on enhancing the core resource scheduling, in order to bring high performance, efficient-sharing and multi-tenancy oriented capabilities to Spark jobs. In this talk, we will focus on revealing the architecture of the cloud-native infrastructure; How we leverage YuniKorn – an open-source resource scheduler to redefine the resource scheduling on Cloud. We will introduce how YuniKorn manages quotas, resource sharing, and auto-scaling, and ultimately how to schedule large scale Spark jobs efficiently on Kubernetes in the cloud.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Hello, everyone. Today, welcome to the Spark + AI Summit 2020.

Today I’m going to talk about Cloud-Native Spark Scheduling with YuniKorn Scheduler.

I’m Li Gao, I’m the tech lead engineer at the Databricks Compute Fabric and previously I was a tech lead at data infrastructure at Lyft. Today I also speak with a Weiwei Yang, he is the tech lead at Cloudera Compute Platform. And he is also the Apache Hadoop Committee and a PMC member and is Ex Alibaba and IBM.

Today’s agenda, I will start talking about why Lyft is choosing Spark on Kubernetes to run our Spark workloads. And then talk about the need for using a custom Kubernetes schedular for optimizing Spark running on Kubernetes. Then Weiwei, we’ll start talking about Spark scheduling with the new Apache YuniKorn scheduler and we will provide a deep dive into the YuniKorn features. And lastly, we’ll share the community and the roadmap of the YuniKorn Scheduler.

Role of K8s in Lyft’s Data Landscape

The Kubernetes is we use widely at Lyft for both data and services, and in this picture, showing the landscape of the data site at very high level. Our data use cases at Lyft, including Business Intelligence BI use cases, mostly based on Apache Superset for user interaction. And then we have the machine learning and deep learning data scientists using the Jupyter notebook to interact with our data. And on the data engineering side, most our data engineers are leveraging the open-source Apache Airflow to authorize the ETL workflows. While all those different use cases, leveraging those different data compute processing engines, applied Spark mainly for our batch compute and many of our jobs and interact with machine learning and deep learning engines. And we also have the Presto for our interactive query engines and we have Druid for our real-time metric store and compute. And all these compute engines are running on our compute infrastructure, which is mostly on Kubernetes and SAN still on EC2 and all the data in our data landscape are deep in the store cloud storage, which is s3, our DataBake and then sample metadata on the RDS.

Why Choose K8s for Spark

Why we choose Kubernetes for Spark. The Kubernetes is currently the most active orchestrator open-source orchestrator for containers and the benefit of choosing a containerized spark compute is it provide us a very shared resource across different machine learning and ETL jobs. And they also support multiple Spark versions, like Python versions that our customers need, and with a containerized approach, we also can enable version control containers on those share Kubernetes cluster for both faster dev iterations. And for our stable production for rollback and roll forward. There is also a way the Kubernetes engine, it provides us opportunity for a unified and advanced observability and a resource isolation support across our data compute and our micro services. And with the Kubernetes Support also comes with support for finer-grained access control that is very beneficial for us to support many compliance and security use cases on this shared compute resource.

The Spark K8s Infra @ Lyft

Now, let’s talk about dive a little bit deeper into how Spark on Kubernetes is used at Lyft. This is a very high-level overview how Spark is being used at Lyft.

From our jobs coming through the core data infrastructure gateway, which rout the traffic across many offer compute resources based on labels, and the label will shuttle to different cluster pools within a given cluster pool that could be one or more Kubernetes clusters you can share one or more s3 bucket as our persistent storage and some of them may have their own dedicated storage for privacy and security reasons. And all those jobs on the compute side also shared same set of metadata, which is their schema metadata, lineage and discovery metadata.

Multi-step creation for a Spark K8s job

Now, let’s dive a little bit deeper on how a Spark job is launched on Kubernetes on the Lyft environment. Well my job come seeing through this data infrastructure gateway with first resource labels or we land on cluster pool and with the cluster pool we have a concept called cluster pool controller, which will rout you a specific Kubernetes cluster and within the Kubernetes specific Kubernetes cluster, we have a concept of Namespace Group and the Namespace Group Controller which can split the job into different namespaces. And when once it lands on a specific Kubernetes namespace, it can materialize a Spark CRD which is an object supported by the open-source Spark on Kubernetes operator that is open-sourced by GCP, the Google Cloud Platform. And once the CRD is in, been read by the controller, it will materialize the Spark execotor and driver pod implementers, and finally, the computer can start and talk to the database.

Problems of existing Spark K8s infrastructure

Even with that designed, we noticed after our production of the system, there’s problems with AWS. Those Kubernetes Spark infrastructure. The first noticeable issue we have with this design is the complexity of the multiple layers of controllers to handle the scale of the system. And then because of multiple layers of controllers, the latencies in certain cases can be amplified from the lower layer to the upper layer. And on top of those two issues. We also noticed as we add more features, there are requirements for different priorities to share between jobs, clusters, and namespaces, which is very difficult to manage and coordinate among the different layers.

Now let’s focus into the single cluster. Even with a single cluster, we noticed with the default scheduler there’s a issue with high latency. This can happen when we have a very large volume of vasty batch jobs and to be observed in certain cases can go 100 of seconds of delay before a Pod even Spark Pod can be executed on a given node. Next issue we notice is the other end. When we have a very large batch job sharing same resource pool, it becomes, unpredictable for FAIR sharing among those Pods, and it’s undesirable for our end result. We talked about the different queue requirements, we have a need for both FIFO and FAIR sharing resource and need for job cluster and which is not well supported by the Default Scheduler. And then there’s as more jobs coming in, especially on the data engineer side and data science, workloads side. There is need for elastic and hierarchical support for priority management, which is lacking on the default scheduler to schedule pods. Lastly is there’s a desire from our customers that want to pour some of our visibility into station behavior that used to have on the Hadoop Young World into the components world and that is lacking with the default scheduler. And lastly, is we want you with this custom Kubernetes scheduler or we can have a simplified layer of controllers on our implementation.

Now I’m handing over to Weiwei to talk about the Spark Scheduling with YuniKorn with the introduction of the YuniKorn scheduler.

– Okay, so this is Weiwei and moving on I will be introducing the Spark Scheduling with YuniKorn and some deep dive into the YuniKorn features.

Flavors of Running Spark on K8s

So first of all, there are several flavors to run Spark on Kubernetes. One is the native Spark on Kubernetes which is actually user consent run the Sparks made the launcher Spark driver pod then the driver pod where applies for request for Scooterpods. And the other approach is to leverage the Spark Kubernetes operator, which is open-source by Google and instead of submitting jobs by spark-submit, it creates a Spark application CRD then the Spark operator will helped to manage the lifecycle of the jobs, Spark job for you.

Resource Scheduling in K8s

And for the resource scheduling in Kubernetes, when we run Spark, we see, we firstly need to understand how the resource scheduling is working in Kubernetes. So the default scheduler workflow is quite simple. In human language, it is like, the scheduler picks up a pod each time and find the best fit node, then launch the pod on that node. Since Kubernetes 115, it provides the scheduler framework or where you can actually plug a certain number from extensions to the framework. You can do some filtering, you can do some ordering, but overall the workflow is the same.

Spark on K8s: the scheduling challenges

Then, let’s say what kind of a scheduling challenges we have when we run Spark on Kubernetes. Our purpose is to, to move Spark workloads from the legacy Hadoop clusters to Kubernetes based clusters, because this will give us a unified architecture for, for compute platform that supports multiple environments on prime on cloud hybrid cloud. And Spark is a superpower for Compute Engine, right. It has multiple workloads. It supports multiple workload types, which includes the Ad-Hoc Queries, the Batch Jobs, and Workflows and Streaming jobs. So this is, when they combine together is quite complex runtime for the platform. And when we do the job to move Spark to Kubernetes, we noticed some of the challenges which mainly in two categories. One is the job scheduling. Because we have so many jobs running in the shared environment, possibly from multiple tenants, multiple users teams groups, so the job ordering becomes very important. And also the job queuing, there’s no queuing for the jobs. And secondly, the job level priority is also important to ensure some of the urgent jobs can be can run and resource fairness becomes important when we want to share the resources between jobs were the namespaces are queues and also the gang scheduling. The needs for gang scheduling is very essential for the machine learning workloads, as well as for Spark. So the second category is the resource sharing and utilization. What we have observed is when we run Spark on Kubernetes, the utilization is not that good. And sometimes the Spark driver pods occupy all the resources in the namespace, and sometimes there is a resource competition which happens, when it happens the big jobs get starved. And also there are some other misbehave jobs. By that I mean when some of the job maybe have some bad configuration or is being picky on nodes, and depending there those files will waste cluster resource. And also another important factor is the throughput. So for a batch-oriented system, the throughput is very important to ensure the latency. And when we move on the cloud, that is even more important because the throughputs are directly impact the cost. So the summary is the Kubernetes default schedule was not created to tackle those challenges. So it was built for services, normally services. But when it comes to such a complex scenario, it just doesn’t work.

Apache YuniKorn (Incubating)

That’s when we come up with the idea that why we just create another scheduler to fix these gaps. So, in early of 2019, we started a project called the YuniKorn. And right now it is our perch incubator project. And simply put this project is a standalone resource scheduler for Kubernetes. And it is focused on focus on building the scheduling capabilities to import big data on Kubernetes which one most important part is to support Spark. And it is very easy to see use. It is a stateless service running on Kubernetes and you can easily bring it up by using the Helm Charts and also it can be used as a secondary Kubernetes scheduler where the replacement as the default scheduler because it is the full functional Kubernetes scheduler.

Then let’s take a look how the resource scheduling is done in YuniKorn for Spark. Remember the workflow for the scheduling in Kubernetes is a very simple, three steps. First one is the user submits a bunch of applications, then the API server creates a bunch of pods for the application. And within this blue box, the resource scheduler we have some scheduling logic to determine which node I should put to allocate the pod. Then step three is to bind that pod to the specific node.

When we look at the default scheduler, it works very simple. So when the pods get created, the scheduler get notified and all the pods will be put in a queue, you know worker queue. And when we the scheduler runs a loop to select a pod from the queue, and at the same time, it caches all the nosy information, your sorting and to find the best node for the pod.

And then let’s look at what happens in YuniKorn. So in YuniKorn the major difference is when the pods are found in the scheduler, they were first recognized as applications and each of the application will be attached into a certain Lyft queue in the hierarchy. And when the scheduler does the scheduling it actually traverses further step into traverse all the queues, find the queue where has the most disire resource. And then instead of just queue, traverses all the applications, find the most the application where it desires the most resource, and then pick up the most important, the request instead of the application by the priority and at the same time there’s also we also cached the nodes for and to find the best node for the request. The rest of the staff is all the same, so bind it apart to the node.

So, what we can get from this architecture, so basically YuniKorn has the queue app concept, which gives us, provides the job scheduling capability and also the fine-grain control on the resource quotas.

Main difference (YuniKorn V.s Default Scheduler)

If we compared default schedule with YuniKithorn, we can list these features where we can benefit from YuniKorn scheduler. The first one is scheduling the application dimension. So, YuniKorn application is the first-class citizen and YuniKorn scheduler orders the application, for example, application submission time or the priority or the resource usage based on the policy you choose. And secondly, the job order is also enforced in YuniKorn. It’s supports the FIFO, the FAIR and Priority ordering policies. Then the fine-grained resource capacity management basically, YuniKorn can we divide the cluster resource into a hierarchy of queues and each queue can be configured with the min and the max resource gives the user the guaranteed resource and the max. And the resource fairness is enforced. Of course, these queues and more importantly YuniKorn natively supports the Big Data workloads.

So, we have done a lot of optimizations for the Big Data Compute Engine such as Spark, Flink and Tensorflow. And so user doesn’t need to modify anything from the application side and they can easily work with YuniKorn. And also for the scale and performance, YuniKorn is optimized for performance. It has much better performance than default scheduler.

Run Spark with YuniKorn

Then a little bit more detail about how Spark is running in YuniKorn. So first user, so this chart gives you a demonstrate how this works underneath. There’s three layers from a user’s perspective from the Kubernetes cluster perspective and what happens in YuniKorn. And from left to right is the timeline. So first the user has submitted a Spark job. And it could be either the spark-submit or from the Spark application CRD. And what users see what user will say is the driver pod get created on Kubernetes, and with the status pending, and then entering this YuniKorn will be notified the pod is created and it will create a application and the application will be added to one of the leaf queue. And then the second stage, this YuniKorn will run the scheduling on cycle, traverse the queues find application Spark application, find the driver part then schedule the driver pod onto a node. Then what user will say is the driver points depending but then once the schedule is done, YuniKorn will in the center stage, YuniKorn will ask the API server to bind the pod to a certain node and then in the Kubernetes cluster, the pod is bound. Were we say this user was say the Spark driver is running. Once the driver is running, the driver will start to request for a executors and from on Kubernetes the executors are created and they will be all in pending state in the Ender YuniKorn worlds get notified there some new executors for this job. They will be added to the job. Then the next stage we were do the same scheduling cycle as we do for the driver. Traverse the queues, and the app and the request in the allocated pod on the node. Then eventually we do this repeatedly for all the executors and all the executors will be bound. There’s one more stage being escaped by this slide is after while the job is finished, that means the exclusion will be finished, get cleaned up by the driver, then the driver will be complete.

Then in next section, we will deep dive into some of the YuniKorn features and why they’re important for Spark and including some of the performance numbers.

Firs thing is the job ordering. We’ve been talking about for a couple of times.

Job Ordering

So why it matters. Because from the user perspective, they usually expect there are some certain three use cases. So one, if I submit a job earlier, I want the job to run first, right. The second case is I don’t want my job run. I don’t want my job get starved because some other job or some other user, use my resource and certain used cases I have an urgent job, I want to run first.

So to support all these scenarios YuniKorn has the pluggable sorting policy. Which includes the FIFO, the FAIR and the Priority. So Priority currently is working in progress, and it will be released in 0.9.

The next main feature is about Resource Quota management. So, for existing Kubernetes clusters user are very familiar with the namespace ResourceQuota, which defines the resource limits. And actually, it’s enforced better and quota admission controller is not even done by the scheduler. The ResourceQuota is not enough for the scenarios to run Spark. There are several problems with the ResourceQuota. One is it is very hard to control when the ResourceQuota are over-committed, because to get a certain utilization of the cluster, the ResourceQuota the namespace is always overbooked. So in this case, is very confusing and very hard for users to set up a reasonable Quota of the namespace. And then secondly, a user has no guaranteed resource, which means even you have Quota it’s possible that you could not get any resource because some other namespace or some other user is using your resource. And third is the call that could be abused. By that I mean, in a traditional batch system, there are always some pending jobs or pending pods. In that case, the even for the pending pods, the Quota will be used and in that case, the utilization is really bad. And also it doesn’t provide any queuing for the jobs. All this sum up together is what lead up into some problem like the utilization is really, really low and also the latency of the job could be impact.

Resource Quota Management: YuniKorn Queue Capacity

The alternative way to manage the quota is using the YuniKorn queue capacity. And the queue provides optimal solution to manage the resource quotas. Why I say that? There are basically a few reasons. One is the queue can be mapped to one or more namespaces automatically, which means this gives you the zero-configuration, zero overhead for queue management. Secondly, the capacity for YuniKorn is elastic from min to max, which means the user won’t get the guaranteed resource for the queue and also have the max enforced. Third, the queues honor the resource fairness, which means we enforce the fairness, resource fairness between the queues, and each will help to avoid starving jobs and users.

And then the quota is only counted for the pods which actually consumed the resources. That means, the resource counting for the job is more accurate and also it can help to improve the utilization. And at last, the queue provides the queuing, job queuing. And the jobs can be queued in the scheduler waiting for some conditions, we’re waiting for the resources. So this way will keep the client-side logic very simple.

Resource Fairness in YuniKorn Queues

This is a very, This chart is about the resource fairness evaluation. We have done the experiments with simulated Kubernetes cluster with 2000 nodes into 4000 nodes. And this is a very simple example we set up before queues and we submit a lot of different pods to each of the queues. And from the chart, we can see the resource fairness is being enforced.

Scheduler Throughput Benchmark

And also we have done some more, some testing about to get as throughput benchmark. This test are done on the simulated environment 2000 nodes and 4000 nodes on 20 physical nodes, and we scheduled 50,000 pods on to the cluster. What we observed is the red line represents for YuniKorn and the green line represents for the default scheduler, roughly we can get a twice better performance comparing to the default scheduler in any skill.

Fully K8s Compatible

And one more thing which is pretty important. YuniKorn is fully Kubernetes compatible, which means it supports all the Kubernetes predicates node selector, pod affinity/anti-affinity, taints tolerations and so on and also supports the volumes. Volume bindings, dynamic provisionings and it is integrated with the Kubernete event system, where you can get events from directly to describe the pod. And also it works with the cluster autoscaler. And this is actually one of the most important used case for us too. And it supports the management commands like quota nodes.

So, on the other hand, YuniKorn provides a management console gives us user overview of the cluster resource utilization, and also the applications running in this cluster.

YuniKorn Management Console

This is the summary cluster info page where you can get overview at the how many applications are running and some of the historical info. And there is also the nodes page where you can get the idea about how many nodes are running and the resource utilization for the nodes.

For the application similarly, we can have a list of applications showing up in this page, where you can actually view their applications and their locations.

You can also try to filter the applications by search, using some of the keywords and it also provides a view for the queues where you can view the hierarchy of the cluster hierarchy queue of the cluster and each of the queue what kind of utilization it takes right now. So, this is very important, useful for other means to marge the cluster.

Compare YuniKorn with other K8s schedulers

And comparing YuniKorn with other Kubernetes schedulers, so in seek scheduling, except that the default scheduler is there’s another, there is another scheduler called kube-batch. So we do a simple configuration here. This is majorly based on the functionality. From our perspective, we think there main features need to cover for the scheduler are resource sharing, resource fairness, preemption, gun scheduling being packing and throughput . So basically, when we compare to them. A YuniKorn has all these features built-in. And for the throughput, we have done some comparison with the default scheduler, side by side, but we haven’t done any testing was Kube-batch but based on the existing issue, I think it might not that good.

Then let’s talk about the community in summary and some of the roadmap.

Current status of the project. So, YuniKorn open-sourced at July 2019.

And it enters the Apache Incubator since January 2020. And the latest datable version is 0.8, which is released on May. And the right now we have a very diverse community with members from Alibaba, Cloudera, Microsoft, Linkedin, Apple, Tencent, Nvidia, and all these members are giving a lot of good feedback and help us to build a comprehensive scheduler for to satisfy all the needs we just talked about.

The Community

And from the community side, we already have some early users in the community and the first one is Lyft. So Li, do want to introduce the Lyft part. – Sure. So on the community on the website, we deployed the early version of the YuniKorn scheduler in non-production Kubernetes clusters early in early 2020. And we lost hundreds of dollars jobs per day on this non-production clusters with the YuniKorn queues that gave you the queues and from our observation for some of the large jobs, scheduler latency has been reduced by a factor of 3x during some peak time. And then, we’ve also noticed on this large job clusters, the Kubernetes cluster overall resource utilization has improved in terms of cost per compute over the default kube-scheduler for the mixed workload. And lastly, the requirements of FIFO and FAIR request are more frequently met than before comparing to default scheduling. Back to Weiwei. – So in Cloudera, YuniKorn is shipping with the Cloudera Public Cloud offerings. And it provides the resource quota management and advanced job scheduling capabilities for Spark. And more importantly, it is responsible for both Microsoft’s service and the batch job scheduling. So it’s used as a replacement as the default scheduler. And it is running on Cloud with auto-scaling enabled And in Alibaba, so Alibaba is a early community user for YuniKorn and also very important member in the community. So Alibaba deployed on the pre-production environment with over 100 nodes for well, and that is measured on prime and they plan to deploy on 1000 more nodes in production use for this year. And this is, so Alibaba is leveraging YuniKorn features such as hierarchy queues, resource fairness to run large scale Flink jobs on Kubernetes. And from the observation that we observe there’s almost four times improvements for the scheduling performance. And the roadmap. So currently we 0.8 is the latest datable version and already shipped with a bunch of features such as Hirechay queues, cross queue fairness, Fair/FIFO job ordering policies, Fair/Bin-packing nodes sorting policies, self queue management, pluggable discover, metrics system, Prometheus integrations, this is a long list, but one simple summary is YuniKorn can be used as primary schedular to support the use case to run Spark, run big data. And the data coming in 0.9. We are working on the gang scheduling, job and task priority support, and also to support a Spark dynamic allocation.

Our Vision – Resource Mgmt for Big Data & ML

Put all those things together, this is our vision. So for a computes, we know that for big data, we would want to move big data from legacy Hadoop cluster to Kubernetes. We want to do certain computes to be able to run lessly on Kubernetes. That includes the data engineering, realtime streaming, machine learning and the Compute Engine such as Spark, Flink, Hive, TensorFlow (murmuring) And also the platform will need to support Microsoft batch jobs, long running workloads, all kinds of workloads that could be happening in the cluster and also for targets from the user perspective, we want to deliver the multi-tenancy environment with the SLA enabled with the proper resource sharing, and good utilization, good cost management, budgets and so on. So, all these things, our vision is to build a unified compute platform for big data and machine learning, which is based on Kubernetes plus the features with YuniKorn as the scheduler.

So, YuniKorn’s still a young project, so join us in the community. We have the website listed here and some of the materials you might want to check out and we also have some committee meetings to think up and to discuss those issues.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Li Gao

Lyft Inc.

Li Gao is the tech lead in the cloud native data compute team at Lyft that powers many of Lyft's critical data and ML compute pipelines. Prior to Lyft, Li worked at Salesforce, Fitbit, and a few startups etc. on various technical leadership positions on cloud native and hybrid cloud data platforms at scale. Besides Spark, Li has scaled and productionized other open source projects, such as Presto, Apache HBase, Apache Phoenix, Apache Airflow, Apache Hive. Spark Summit 2019 - Scaling Spark on K8s https://maxmind-databricks.pantheonsite.io/sparkaisummit/sessions-single-2019/?id=46

About Weiwei Yang

Cloudera

Weiwei Yang is a Staff Software Engineer from Cloudera, an Apache Hadoop committer and PMC member. He is focused on evolving large scale, hybrid computation systems, he has lots of experience with building mission-critical infrastructure for storage and computes. Before Cloudera, he worked in Alibaba's realtime computation infrastructure team that serves large scale big data workloads. Currently, Weiwei is leading the efforts for resource scheduling and management on K8s in Cloudera. Weiwei holds a master's degree from Peking University.