End-to-End Deep Learning with Horovod on Apache Spark

Download Slides

Data processing and deep learning are often split into two pipelines, one for ETL processing, the second for model training. Enabling deep learning frameworks to integrate seamlessly with ETL jobs allows for more streamlined production jobs, with faster iteration between feature engineering and model training. The newly introduced Horovod Spark Estimator API enables TensorFlow and PyTorch models to be trained directly on Spark DataFrames, leveraging Horovod’s ability to scale to hundreds of GPUs in parallel, without any specialized code for distributed training. With the new accelerator aware scheduling and columnar processing APIs in Apache Spark 3.0, a production ETL job can hand off data to Horovod running distributed deep learning training on GPUs within the same pipeline.

This breaks down the barriers between ETL and continuous model training. Operational and management tasks are lower, and data processing and cleansing is more directly connected to model training. This talk covers an end to end pipeline, demonstrating ETL and DL as separate pipelines, and Apache Spark 3.0 ETL with the Horovod Spark Estimator API to enable a single pipeline. We will demonstrate 2 pipelines – one using Databricks with Jupyter notebooks to run ETL and Horovod, the second on YARN to run a single application to transition from ETL to DL using Horovod. The use of accelerators across both pipelines and Horovod features will be discussed.


Try Databricks

Video Transcript

– All right. Hi everyone. I’m Travis Addair. I work at Uber on our machine learning platform. I’m the lead maintainer of the Horovod open source project. And I’m here with Thomas Graves from Nvidia to tell you about how we use Horovod on Spark clusters to train deep learning models end to end, going from feature generation to model training, all the way to inference and production.

End to end Deep Learning with Horovod on Spark clusters

So in this presentation, I’ll start by giving you an overview of the problem that we’re trying to solve, tell you a little bit about the background on the Horovod project itself and how it’s being used to scale deep learning training up to hundreds of GPUs and tell you about the Horovod Estimator API that we’ve recently introduced that enables you to do large scale distributed training of your deep neural networks on top of Spark. And then I’ll hand it off to Thomas, who will tell you about the recent developments in Spark 3.0 for Accelerator-aware scheduling as well as show you a demo of how all this fits together into a single project.

So to start, let’s talk a little bit about how data processing and deep learning fit together.

End to End Pipelines

Now, where I work at Uber, Spark is an integral part of our data science machine learning process. And we have ETLs written in Spark using Spark pipelines that stitch together all of our model training into a single deployable that we then use for inference and production scenarios. What this entails is splitting of the application into separate stages, where we first use Spark to do some amount of processing, extracting data from hive, transforming the data into features. And then we use deep learning with Horovod and GPUs to do the actual model training, and then finally stitch it together into a single Spark pipeline. And you the Horovod Estimator API that I’ll tell you about does is it enables us to fit those two pieces, the Spark ETL and the deep learning together into this single pipeline so that users don’t need to context switch between separate environments, separate resources. And all of this is facilitated by the work that’s been done in Spark 3.0 to enable this accelerator-aware scheduling so that the deep learning can take place in your Spark cluster on GPU without having to switch to a completely different environment.

So to kick things off, let me first tell you a little bit about the Horovod project and what it means for doing large scale distributed training.

Deep Learning Refresher

Now, as a quick refresher, deep learning training is different from most of the problems that Apache Spark attempts to solve. And that it’s not what you would call an embarrassingly parallel problem. So this pink box here that says preprocessing is where your typical Spark ETL would take place where we need to extract data from our data source, do some feature transformation, et cetera. All that can be done at scale very efficiently using your normal Spark operations. But once we get into the deep learning model itself, things get a little bit more complicated. We have to sync global state very frequently, every batch in order to ensure that the model is training consistently. And we see that during the backpropagation phase, there’s actually a very limited opportunity for the kinds of parallelism that Spark is most efficient at taking advantage of. And so we need to think about this problem a little bit differently.

Distributed Deep Learning

Now, when people think about how to distribute deep learning training, the way that Spark allows you to distribute your data transformation, there are two broad ways that people approach the problem. The first is what we would call model parallelism where we think about taking individual layers of the model and splitting them on two separate devices or separate GPUs. And the second is what we would call data parallelism, where we essentially replicate the model across multiple devices, and then we divide the data such that every one of those replicas processes some subset of the data.

Early Distributed Training – Parameter Servers

In practice, the data parallel approach has been the most prominent in industry. One of the earliest examples of data parallel training is parameter servers where every replica communicates with a central storage layer or authoritative source of truth, all the parameter server, that, at each step in the training process, takes all of the weight updates from each of the replicas, aggregates them together and then sends back the final result to each of the replicas so that they update their global state of the model together, and it’s consistent across all the workers.

Now, this approach parameter servers has some various trade offs. Several pros that it has are native fault tolerance. So the parameter server goes down. You might have several replicas set up so that it can be recovered very easily. And it supports a technique called Asynchronous Stochastic Gradient Descent which allows you to have individual workers processing at different frequencies without having to be blocked by slower workers. But there are also a few downsides. The usability has been a historic challenge with parameter servers In the early days, the API is often required writing a lot of custom code to scaffold or set up your parameters server infrastructure. And the scalability has typically been an issue as well. So you have this many-to-one communication problem. So if you’re a network in your cluster is bandwidth-bound or if your processing is bandwidth-bound, I should say, what you typically find is that parameter servers are not the most efficient way to do this operation, they ended up being a major bottleneck. And the asynchronous SGD is, in some ways, an advantageous thing to be able to support. It’s not very commonly used in practice because it leads to typically degradation and the convergence of the model. And so what we find is that even though Asynchronous Stochastic Gradient Descent requires that all the workers proceeded the same frequency, in practice, it tends to be preferred because it leads to better model performance in the end.

Introducing Horovod

And so because of the limitations of existing frameworks, which were mostly parameter server-based at the time, at Uber, we developed the framework called Horovod in order to do fast distributed training in a way that was very easy to use, required very minimal changes to your existing API. So some of the advantages of Horovod are that it’s framework agnostic, it supports TensorFlow, Keras, PyTorch and ApacheMXNet, as well as all of those running on top of Spark. And it supports high-performance features to enable very fast computation such as Nvidia’s nickel framework, GPUDirect, as well as RDMA more broadly, as well as custom features such as tensor fusion that enable you to batch small operations together to make better use of your network. And it’s very easy to use, it just requires five lines of code added to your existing Python script, or if you’re using Horovod Spark Estimator is even fewer as we’ll show you. And it’s open source as part of the Linux foundations, AI Foundation. And it’s very easy to install. Just pip install Horovod and you’re ready to go.

Horovod Technique Allreduce

Now, what makes Horovod different from the parameter server approach at the most high level is the technique that we use to aggregate the updates. Instead of using a parameter server, we use a decentralized technique called Allreduce, where each step at each batch, each of the replicas is going to process their own batch of data and then send the updates around the network to different workers, and what’s typically a ring formation, but depending on how nickel wants to optimize, it can also be a tree formation, et cetera, whatever ends up giving you the best performance. And in the end, you end up with the global update of all the across all the workers as quickly as possible requiring the minimal amount of network bandwidth when using the ring Allreduce operation.

Benchmarking Horovod

In practice, we’ve seen that this gives very good scaling up to very high numbers of GPUs. So well beyond hundreds of GPUs. In fact, we’ve seen people running Horovod on top of the summit supercomputer with 27,000 GPUs achieving 90 plus scaling efficiency.

So now that I’ve introduced the core concepts of Horovod and hopefully convinced you that it performs very well for making your deep learning training scale up to very large numbers of GPUs very efficiently, so to save you lots of time during the model training. Now, I’d like to tell you a little bit about the Horovod Spark Estimator API and how it can be used to speed up deep learning in your Spark applications without any additional code specific to the distributed training.

Deep Learning at Uber: Recent Trends

So at Uber, there’ve been some recent trends in the use of deep learning that have motivated the creation of the Spark Estimator API for Horovod. The first is that deep learning is now achieving state-of-the-art performance with tabular data, whereas it used to be more limited to image classification problems, natural language processing. And for tabular data, XGBoost tended to be the preferred approach. But now that we’re starting to see more papers come out, demonstrating the impact of deep learning, even at some of these other problems, a lot of data scientists are now trying to migrate their existing tabular data of based-Spark applications, Spark classifiers to deep learning. And in these applications, what we typically see is that there are many features, but the features are not as high of quality as the ones you would typically get a natural language processing or image classification problems. And so there’s a lot more iteration between feature engineering and model training. And so having a framework that enables users to quickly iterate between those two steps is very important for our users.

End-to-End Deep Learning at Uber

So to zoom out for a second and look at how end to end deep learning works at Uber, I want to show you this picture of how our total Michelangelo ecosystem looks. Michelangelo is our machine learning platform. So what we see is that we have both real time and batch training as well as prediction that takes place. So in the real time scenario, we have messages coming in from a Kafka queue that gets taken in by our streaming engine and then fed through our feature store to transform into features. And in the batch scenario, we have data coming from our data lakes, so something like Hive and HDFS, but then goes through a Spark transformation process to be converted into features. And then the model training itself occurs, which we’ll discuss in more detail in a moment. So that will be the combination of your Spark application, as well as your deep learning. And then once the model is in a satisfactory state, we publish it to a model store. And then if we’re doing real time inference, that gets put into our online prediction service, which handles real time requests from say the client application, like an app on your smartphone. Or if we’re doing batch prediction, then we will run it using some sort of a crown-like application to schedule say daily computes over entire table’s worth of data. And then we output the predictions back to the data lake to be consumed by a separate process.

Model Training in Production

So then the question becomes, okay, if we want to drive this whole process by Spark, then what do we need to do in order to make deep learning fit into the Apache Spark ecosystem at scale so that we’re not just running say model training with a hundred terabytes of data all on a single driver node or something like that?

And so the first thing we need to understand before we get to the details of how we do the actual deep learning on Spark is that preprocessing often requires its own set of full passes over the data sets. So we have some example dependent preprocessing steps, such as image, color adjustments or imagery sizing, which don’t require such a full cast over the dataset, whereas we have other operations such as string indexing or normalization that require you to actually understand the statistical characteristics of your data set in order to properly compute the result. And so what this means is that we need to have some sort of fit operation, much like what we do when we train a model before we can apply the transformations to the data itself.

Spark ML Pipelines

And so in Spark terms, what this means is that we need to have some concept of an estimator. So an estimator is an object that will take a dataset and then will fit a model to that data set so that it can make the correct transformations on the data after it’s been fit. So for example, first you fit your string indexer, which gives you a transformer that you can then use to transform your data frame with unormalized an unindexed strings into index strings, which can then be fed downstream to your model itself. So ideally, we’d like to fit both these preprocessing steps, as well as the actual training of the model into a single one of these Spark pipelines so that we only have to call fit once. And what that enables us to do is very quickly iterate on the feature transformation and the model training together without having to move between separate contexts or environments in order to do so.

Horovod Spark Estimators

And that’s where Horovod Spark Estimators comes in. So Horovod Spark Estimators are that glue code that essentially takes your Spark application that does your normal feature processing and data transformation, as well as your deep learning model, and then puts that deep learning into just a single estimator that you can stick in a normal pipeline, all fit on it, and get a transformer that will enable you to make predictions on your data.

And so to take a deep dive into this code, let’s first look at the parts that are deep learning specific. So everything that’s highlighted here is just your ordinary Keras code. So you’re importing from TensorFlow, you’re creating a simple model, as well as defining your optimizer and your loss function.

And then the Spark code is all just your normal Spark code. You have your pipeline stages, so some amount of feature transformation, maybe string indexing, normalization, et cetera, followed by the fit call, and then finally, you can make predictions using your fitted pipeline on your test data frame and receive your final predictions. And so what this means is that the only part of the code that’s actually unique to Horovod, in this case, are these two lines here where we first import the horovod.spark.keras package. And then we just had this one line of code where we define our Keras estimator, given our model are optimizing our loss, and then we stick it into the pipeline. And with that one line of code, we’d have all of this powerful distributed training, all executed within our existing Spark application.

Deep Learning in Spark: Performance Challenges

Now, the next thing I want to do is take a little bit of time to deep dive into some of the unique performance challenges that we faced when building this API and some of what you could call secret sauce of what makes the Horovod Spark Estimator unique. So a couple of things that are worth pointing out about performance with doing deep learning on Spark is that one, data frames and RDDs are not very well-suited to deep learning natively. So when you do deep learning, you want to have random access to your data set, but that’s not the way that the RDD is organized in memory, as well as having the ability to run on GPU for your deep learning, as well as CPU for your ETL. So with ETL, you typically want to fan out as broadly as possible, which means using as many resources, and CPU machines tend to be more readily available than GPUs, so it makes sense to do your ETL on CPU. But for deep learning, you really want that powerful ability that GPUs provide in order to do very fast matrix operations. And that’s why, even though you can’t maybe fan out as broadly with GPUs, you still get much, much better performance doing your deep learning on GPU. So we want to be able to blend those two together if possible.

Petastorm: Data Access for Deep Learning Training

So we address the first challenge, we have a framework that was recently opened sourced and developed at Uber called Petastorm that allows you to do data access for deep learning training on the parquet file format that integrates very nicely with Spark. So some of the challenges of training on large data sets that need to be taken into consideration are how you’re going to shard across your different workers, how you’re going to stream the data from say HDFS or S3 into your workers, and how you’re going to shuffle it and buffer it and cache all that data so you can efficiently feed it to your GPU for doing the forward and backward passes without making your training process bottlenecked by I/O. So Petastorm is designed to address all these challenges for you so that you don’t have to think about them. It supports large continuous reads, so it’s HDFS/S3-friendly. Fast access to individual columns because it uses the parquet format. And the nice thing about parquet is that it’s written and read natively by Sparks so it can integrate very nicely into your Spark application without needing to installing any additional dependencies.

Deep Learning in Spark with Horovod # Petastorm

Here’s a quick diagram that shows you what this looks like in terms of Spark application. So on the left, you have your Apache Spark application that writes out your data to the parquet format on say HDFS. And parquet internally stores the data into what it calls row groups. And then when you’re doing your deep learning training, what your individual Horovod workers are going to do is they’re going to spin up a separate instance of this Petastorm background process that will read these row groups based on which row group should be assigned to which workers, which is determined deterministically. And then Petastorm will do the work of splitting apart these row groups into separate individual rows. So that’s the unbatching step. And then we’ll insert it into what we call a shuffle buffer queue so that we can sample these elements to get better uniform distribution of the data so that we deep correlate the samples that might be together in a single row group. And then we form these random samples into batches that we then feed into the actual training process. So this is all the words that Petastorm is doing for you under the hood.

Horovod on Spark 3.0: Accelerator-Aware Scheduling

Now, the second piece of addressing these performance challenges is Horovod on Spark 3.0 with Accelerator-Aware scheduling. So end-to-end training and a single Spark application requires that ETL on CPU should hand off data to Horovod running on GPU. And ideally, we want to be able to support fine grained control over the resource allocation so that tasks assigned on GPUs Spark can have the ownership isolated, so that different processes aren’t stepping on each other using the same GPU. And multiple GPU nodes can then therefore be shared over different applications.

And in Spark 3.0, this has been introduced. You can essentially define where the GPUs are coming from using this concept of discovery script. Define how many GPUs each executor is going to receive, how many GPUs each task is going to receive and do this at varying levels of granularity for whatever makes sense in your environment.

Deep Learning in Spark 3.0 Cluster

And so what this looks like when we fit it into our Horovod training process that we have our CPU-based ETL and feature engineering as before, and then we write the data to HDFS and the parquet format for Petastorm as before. But then when we do the training and the evaluation steps, you see that it’s all running on these GPU devices that give you that performance boost, all of them, the same Spark application. So we don’t have to worry about switching context to a completely different environment in order to get the performance benefits of running on GPU.

And now to tell you more about how this works under the hood and how Spark 3.0 can be leveraged in your environment, here’s Thomas Graves from Nvidia. – Thanks, Travis. I’m Tom Graves. I work at Nvidia on the Spark team. I’m also an Apache Spark mentor and PMC member. I’ve been working on Sparks since 2013. I’m going to go over a few of the Spark 3.0 features, Accelerator-Aware scheduling and some of the columnar processing.

Spark 3.0 Accelerator-Aware Scheduling

So in Spark 3.0, we have the ability to schedule accelerators. So this is generic. You can do GPUs, FPGAs, or any other type of accelerator you want. You can see Spark-24615 for more details if you’re interested. This is supported on YARN, Kubernetes and Standalone. So it requires your cluster manager to also support GPU scheduling, for instance. YARN 3.0 added support for GPUs. And Kubernetes has supported it for a while, I think it’s since like 1.10. And the Spark 3.0 standalone deployment also supports GPU scheduling. So what this does is it gives you the ability to schedule and request GPUs just like you do CPU cores. So I can request each executor, for instance, to have two GPUs each, along with two cores each. You can also request your driver to have accelerators GPUs, for instance. And you specify how many each of your tasks require. So I might have an executor with four cores and four GPUs, and each task needs one of each. And so you could run four tasks in parallel on that. You didn’t need to specify a resource discovery script. And this is because YARN and Kubernetes give you a container, but it doesn’t tell you the addresses of the GPUs or accelerators that it’s given you. So the discovery script runs and it discovers the exact addresses that Spark can use later to assign the tasks.

So there’s also an API for you to get the addresses assigned to you at the task level and at the driver. And like I mentioned, this is supported on YARN, Kubernetes, and Standalone.

GPU Scheduling Example

So here’s an example of GPU scheduling. And so this is running a Spark shell on YARN. And interesting configurations here are, I’m asking for one GPU for the driver. I have to specify the discovery script. I’m also asking for two GPUs pre to executor. And again, I have to specify the discovery script. And each task gets one. And the final thing is on YARN, you pass the discovery script in with the files. And there’s an example discovery script in Apache Spark GitHub that will work on all Nvidia GPUs. If you have a special environment or using some other accelerator, you’ll have to write your own.

Spark 3.0 Accelerator-Aware Scheduling Cont

So here’s the API that’s used to get the addresses of the accelerator that are assigned to you. So the first one is the Task API. So if you run a map partition or something like that, you can get the task context, and then you just get the resources in the context, and you can look at the addresses assigned. Once you have those, you can pass them into TensorFlow or some other AI code that needs those to assign. The driver also has an API that you can get just with a Spark context. You just say resources and specify whichever accelerator you’re interested in. This example here has GPUs and then you get the addresses.

So the next thing is columnar processing. So in Spark 3.0, we’ve added ability for advanced users to kind of add their own plugin to be able to play with and add the ability to do some columnar processing.

Spark 3.0 GPU Columnar Processing

So this is not something that comes with Spark, but it’s an add-on. And we’ve modified Spark just to allow people to plug into it to start playing with it a little more. So at Nvidia, we’ve actually written one of these plugins, it’s called the Rapids for Apache Spark Plugin. And what it does is it allows you to plug into the catalyst optimizer in Spark, and it lets you look at the plan and then you can decide if something in that plan you can do in the columnar. And specifically, we’re running it on the GPU. So for instance, we might see a sort in the plan and we know we can run this in the GPU. So we replace that sort with a GPU sort. And so the plugin also inserts any transitions needed. So Spark normally runs in a row-based format and iterates through each row. And the GPU, you want to be in columnar. So if we replace a sort with a GPU sort, for instance, we will also put in a transition that changes the data from row to columnar and copies it to the GPU.

And all that happens seamlessly for the user. So you don’t need to change any of your code, you just need to add a config and add in the plugin jar, and all that works for you. And so what the plugin does is any operations we know how to run on GPU, it will do, but if it doesn’t, the operation will still run on the CPU just like normal. And this plugin is built on the Rapids cuDF library. And this library is basically a CUDA data frame library. It has a bunch of ETL operations, your sort, enjoins and aggregates and things in it. And so we use that in our plugin. The plugin’s available or will be available Spark summit. And we also have a version that runs on Databricks.

So now I’m going to give you a demo of the end-to-end running of ETL and Horovod. Using the accelerator-aware scheduling. Okay. So I talked about some of the new Spark 3.0 features. Here, I’m going to give a demo of the Horovod Spark Rossman Estimator example. This will show how the GPU resources or scheduling works with this. This example is available in the Harvard repo, and you can find instructions on running it. So if you want to reproduce it, you can go there. Here, I pulled it into a Jupyter notebook just to be able to read it easier. This will be running on a YARN 3.1.3 cluster. It has GPU scheduling enabled. It uses Docker containers and has isolation enabled. The isolation is important so that only your application will be able to see the resources. So no one else will be able to use your GPU at the same time you are. This is running Spark 3.0. So the command I use to launch this is here. It’s running Jupyter with PySpark. The configs of interest are here. So for each executor, I’m requesting four GPUs each. I have to specify a discovery script. This is used by Spark to discover the specific addresses of the GPUs. So YARN will give you a container with the number of GPUs you requested, but it doesn’t tell you the addresses. So this is used by Spark to discover those addresses. It sends the addresses back to the Spark driver, which then can hand them out to individual tasks. There’s a version of this script checked into Spark. You can see it here. This should work on all Nvidia GPUs. If you have a custom environment or some special, you may have to customize this. So other interesting configs here are right here. So now I’m asking for one GPU per task, I’m also asking for one CPU per task. And you’ll notice that I asked for four executor cores. So I have four cores and four GPUs per executor, and the tasks needs one of each. This means I can run four tasks in parallel on each executor.

So if we go back over here to the example, this example runs a bunch of ETL and then passes it into Horovod for training.

We can see here, I specified the ones of the parameters. The number of epochs is less than normal. I think the default here is normally a hundred. I made it 10 just for this example so it would run quicker. From an ETL side, it reads a bunch of CSV files, and then it does a bunch of manipulation on that data. So it does some joins and some tables, it adds some columns and does some data manipulation. Kind of normal ETL processing.

So here, you can see where it’s actually calling the function that prepares the training data frame and the test data frame. So we have a test data set as well. And here, you can see this shows a bunch of columns and some of the data with it. So we scroll down to here where the training will actually start. So here, we’re going to start doing the training.

And you’ll see here is the interesting code. And Travis went over some of this. So here is creating the Horovod Keras Estimator and passing in all the parameters. And then right down here is where it calls the fit function. So we have to do is call fit in the passing in the train data frame. And that will start the training.

So if we jump over to the Spark UI. So some of the interesting things here is, you’ll see I’m on the Executor page for this. So there’s a new dropdown menu here for showing additional columns. One of those is a resources column. And this will show what resources have been acquired by each of the executors. So you’ll notice here, I have four executors. Each one has four cores each and each one has four GPUs. And this shows the specific addresses of those GPUs. Another interesting thing here is if you click on the logs for each executor, you’ll see very similar information. So it will tell you on startup, what script it ran and what resources it required. This is useful if you’re debugging or just trying to figure out what addresses each one is using.

The UI, for the most part, is unchanged, other than that, for the resources. So here’s just the jobs running like your normal stuff. So if we go back to our example, here’s where the training. So the training started and it all put a bunch of things. And you can kind of see TensorFlow takeover. And here, you’ll see that it’s actually using GPUs or trying to get them and acquire them so it can use them. And if we scroll to the bottom, we can see that this training actually finished already. And the root mean square percentage error loss here is 0.18, which is high, but that’s because the epochs was set to 10. Normally that’s a hundred, and this would be more right around 0.1.

This also ran the final prediction using the test data frame. And so you can see here, and it saved the results. I talked about the columnar processing in the slides. I’m not going to demo that here as there’s other talks on it. But if that was enabled here, some of this ETL would run on the GPU as well. You can reference the deep dive into GPU support in Apache Spark 3.0 talk that’s on Thursday.

Okay. So I went over the Spark 3.0 features and resource-aware scheduling. Now I’m going to give you a preview of what’s coming in Spark 3.1. So there’s a new feature that we call stage-level scheduling that is aimed at solving the problem of, I have multiple stages in my job, and they may want different resources per stage. For instance, with the ETL ML example, your ETL runs on the CPU and during that whole application, you have to acquire the GPU as well just for the ML stage. So during the ETL stage, you’re kind of wasting those resource. So stage is a little scheduling aims to fix that and allow users to specify different resource profiles at different stages.

This feature does require dynamic allocation, and that’s because it releases executors and gets new executors with different profiles. That’s the first implementation. Currently, it’s supported on YARN and will be available in Spark 3.1. So the command that I use to start this is very similar to what I showed in the last demo, except for, we’re not actually requesting GPUs to start with. So all we’re doing is we’re saying each executor wants six cores, and then I enable dynamic allocation. So if we go look at the Executors page, we’ll see that it didn’t start with any executors, but now that it’s running the ETL, we have executors and it only has cores. So if we go to the dropdown menu, I can select the resources. And then there’s this new resource profile ID, which is just an ID to go with each of the profiles. And so this profile is ID zero. This is the default profile, and it’s just whatever parameters you passed into your Spark job. So you see that we have tests running, and it’s just using six cores each and no GPUs.

So if we go back here, I kick this off. And so this is running with epochs 4, again, for demo purposes to make this go faster. But it’s doing all the same ETL as a previous demo did.

And what will be interesting is when we get down here to the actual training, we’ll see that the executors change. So I’ve modified Horovod internally to request the different executors using the resource-aware… Oh sorry, using the stage-level scheduling. So here, you can see that, okay, these executors are six core, now I’m not using, and I’ve actually requested new executors. And this profile here, I’ve requested four cores for each and four GPUs each. This is just like the last demo. And you’ll see that these now have a resource profile ID of one. And so if you want to see more information on the resource profiles, you can go to the Environments page. When you click over here, you’ll see there’s a new resource profile section. And this gives you all the details on the different profiles. So this zero is the default one, and it’s just whatever I specified in the command line or the defaults. And this one is what I modified Horovod to request. And so I requested four cores, memory, 20 gigs, memory overhead, 20 gigs, GPUs, four, and then the tasks asked for one. And that’s what we see over here on the Executors page. That all matches.

So we’ll see the training here has finished. We can scroll down and see this error loss is worse. That’s because again, I only did epochs four. And it’s running the final prediction. But here’s the code basically that I modified to be in Horovod. So I added this code. It was hard coded just for this demo. But the API is kind of interesting. So we create a resource profile builder that’s going to build up the resource profile that you want at each stage. You specify both executor and task requirements. So here, I’m just saying executive requirements, cores, four, memory, 20 gigs, memory overhead, 20 gigs. And I want four GPUs per executor. And this is the resource discovery script. And then I can specify the task requirements as well. One CPU and one GPU for each task. And I just tell the builder that I require those and then I say build. And this build creates a resource profile. The resource profile can then be passed into the RDD API to tell it that at this point, I want to use this profile. And so Spark internally will take that profile, and it splits the application into stages, what’s your shuffle boundaries. And at each of those stages, if you have a different profile, it would switch to use them. Currently, it’s a very strict requirement, meaning at that stage, it uses executives with exactly that profile. It doesn’t try to fit them into other ones, that might be something we add in the future. So that’s the API you can use.

Again, all this code is checked into Spark 3.1 is actually available if you build it yourself. Thanks, everybody. I hope you liked the demo and the presentation. Provide feedback for us,

Try Databricks
« back
About Thomas Graves


Thomas Graves is a distributed systems software engineer at NVIDIA, where he concentrates on accelerating Spark. He is a committer and PMC on Apache Spark and Apache Hadoop. Previously worked for Yahoo on the Big Data Platform team working on Apache Spark, Hadoop, YARN, Storm, and Kafka.

About Travis Addair

Uber, Inc. 

Travis Addair is a software engineer at Uber working on the Michelangelo machine learning platform. He leads the Horovod project and chairs its Technical Steering Committee within the Linux Foundation. In the past, he has worked on scaling machine learning systems at Google and Lawrence Livermore National Lab.