Deep Dive into GPU Support in Apache Spark 3.x

Download Slides

Utilizing accelerators in Apache Spark presents opportunities for significant speedup of ETL, ML and DL applications. In this deep dive, we give an overview of accelerator aware task scheduling, columnar data processing support, fractional scheduling, and stage level resource scheduling and configuration. Furthermore, we dive into the Apache Spark 3.x RAPIDS plugin, which enables applications to take advantage of GPU acceleration with no code change. An explanation of how the Catalyst optimizer physical plan is modified for GPU aware scheduling is reviewed. The talk touches upon how the plugin can take advantage of the RAPIDS specific libraries, cudf, cuio and rmm to run tasks on the GPU. Optimizations were also made to the shuffle plugin to take advantage of the GPU using UCX, a unified communication framework that addresses GPU memory intra and inter node. A roadmap for further optimizations taking advantage of RDMA and GPU Direct Storage are mentioned. Industry standard benchmarks and runs on production datasets will be shared..


 

Try Databricks

Video Transcript

– Hi, I’m Jason Lowe, and Bobby Evans and I are here to talk to you today about GPU support in Apache Spark 3.

Deep Dive into GPU Support in Apache Spark 3.x

So, today we’re going to cover GPU features that have been added to Apache Spark 3. We’re going to talk about a Plugin that we’ve added to accelerate SQL and DataFrame operations, and also how we’ve been able to accelerate Shuffle when doing GPU operations for ETL. And finally, we’re gonna conclude with what’s next steps.

So, first lets talk about the GPU features in Apache Spark 3.

So, first off, lets talk about the Accelerated-Aware Scheduling, which means GPUs are now a shceduable resource in Apache Spark.

Accelerator-Aware Scheduling

This is covered by Spark-24615, that’s APACHE JIRA, and this allows users to request resources at the Executor level, Driver level, and the Task level. And it also allows those resources to be discovered on the nodes, and to determine what resources, were assigned to tasks and to the drivers. And this feature’s supported on YARN, Kubernetes, and Standalone load clusters.

GPU Scheduling Example

So, here is an example, a Commandline example, showing Spark Shell being run against a YARN Cluster, and we can see the disconfigs here, where we set Spark Driver Resource GPU amount to one. That says we want the driver to have one GPU, and we can also set a Discovery Script, a shell script to use to discover the GPU and the Driver node. And we can also set Spark Executor Resource GPU amount to two. Meaning, we want each Executor to request two GPUs, and correspondingly, we can specify a GPU discovery script for the Executors, and then finally, we can also specify the Task Resource GPU amount. Meaning we want every task to use one GPU. So, with this configuration in particular, we are expecting no more than two tasks to run at one time on Executor because each task is going to use one GPU, and each Executor will have at most two GPUs.

GPU Discovery Script Example

So, speaking of that Discovery Script example, here is a Discovery Script, where we’re going to use a Shell Script to detect what accelerator resources we have on a node. And that Script just needs to take and produce JSON formatted strings that are gonna be parsed by Spark to determine the accelerators that have been discovered on a node. So, this particular example uses the NVIDIA Speed Binary which is installed with the NVIDIA Driver, and that produces output that is then marshalled into a form that Spark can use to detect the GPUs on that system. So, once the GPUs have been detected and Spark has scheduled them, its very convenient to be able to determine what has been assigned.

GPU Assignments API

So, for example, let’s say, in that example we’re asking for two GPUs and each task is using a GPU, it might be useful to find out which GPU a task is assigned. So, there’s an API for that. Tasks can get the task context from within their Resources map, that they can use to look up, by accelerator type, in this case GPUs, to find the addressees that were assigned. And those Addresses are strings, and can be passed into Tensorflow, or other AI code to determine what indices or addresses were assigned for a particular GPU. And similarly on the driver, you can get that from the Spark context, there’s the resources map, looking up by accelerator type GPU, in this case, and this example shows that the driver was assigned GPU zero.

GPU Scheduling UI

So, this accelerator we’re scheduling is also been hooked into the Spark UI. So if you click on the Executors tab in the UI, and then enabled the resources additional metrics checkbox, that opens up this resources column where we can see what GPU has been assigned to the driver and Executors, so you can actually verify that you’re asking for GPUs, and which GPUs you’ve actually been assigned for your Spark application.

Stage Level Scheduling

So, another feature is Stage Level Scheduling, and this solves the problem where a lot of Spark applications are built with an ETL Stage, and then they feed that into a Machine Learning stage. So we’re cleaning and prepping data and then we’re gonna do some machine learning on that data. And traditionally, that is run with CPUs in the ETL Stage, and GPUs for the ML stage. Right? the trending… And the problem with that today is that the ETL Stage is not using GPUs, and the Machine Learning Stage is. And if you try to run that in a single application, the ETL might run for quite a long time, and if we need to use GPUs, we’d ask for those GPUs for every task, and that means during the ETL Stage, we’ll be sitting on unscheduled GPU nodes, but not using those GPUs, and that leads to poor GPU utilization and angry users that are trying to use those GPU nodes, whether or not being used by other user during the ETL Stage. So, Stage level Scheduling is designed to solve that problem. This is covered by Spark-27495 by JIRA and this allows users to specify resource requirements per RDD operation. This means Spark can dynamically allocate containers to meet the resource requirements at particular stages, so you don’t have to have a uniform task requirement for the whole application. And then this is coming soon in Spark 3.1. It just missed the Spark 3.0 cut off, but the feature is almost complete, and that will be ready in Spark 3.1.

SQL Columnar Processing

So, finally, I’d like to talk about SQL Columnar Processing. This is in Spark 3.0, its covered by the JIRA Spark-27396, and this extends Catalyst, Catalyst being Sparks SQL engine there’s a Plugin interface to Catalyst, and this extends that Plugin interface to allow Columnar Processing by Plugins. Plugins can modify the query plans with Columnar operations, and that means that plan nodes can exchange RDD or Columnar Batch, instead of RDD or Row. And this Columnar format enables efficient processing by vectorized accelerators such as SIMD units, FPGAs, and GPUs.

Spark 3 with Project Hydrogen

So, with all these features put together, Project Hydrogen initiative is coming to fruition, right? We have Spark 2, where as I mentioned before, traditionally in the pipeline related to Machine Learning, regard data preparation , one on the CPU Cluster, orchestrated by Spark, that necessarily is getting serialized out to a shared storage, and then a separate cluster for GPUs that actually loads that serialized data backup, and trains it with, actually boosts Tensorflow, you know, your favorite AI framework. With Spark 3, we have finally a Unified architecture. We have a single pipeline, with stage level scheduling, we can actually schedule it all as one application, we can do Ingestion, Prep, Model Training, all orchestrated by Spark, Single platform, built for AI. And this infrastructure consolidated and simplified, and critically, we now can accelerate the ETL portion with GPU, using that Plugin API. And speaking of Plugin API, I’d like to hand it off to my colleague Bobby Evans, to cover how we built the Plugin to accelerate SQL and DataFrame operations.

Thanks Jason, So yes, as Jason said, I’m gonna be talking about the Plugin that we wrote to be able to do Accelerated DataFrame and SQL processing on the GPU. So the big question in everybody’s mind is “Can we make ETL fast with the GPU?” It seems pretty obvious that the two would go well together, but why hasn’t anybody done it yet? And we’ll get into some of the details about that. Let’s start off with an example. So, here are some numbers that we ran on Some TPCx-BB queries. These queries were run on two DGX-2 nodes, and we devoted all 96 cores of each of those GPU nodes to doing the processing on the CPU side. There’s a whole lot of host memory. They weren’t bound by host memory. There’s a lot of NVMEs on these nodes as well. So, we were giving them as much processing power as possible, But as you can see, we still were able to achieve some really amazing performance speed up by doing this.

Deep Learning Recommendation Machines

The next use case we want to talk a little bit about is also a kind of standard use case that people have. It’s the Deep Learning Recommendation Machines (DLRM) from Facebook. So the DLRM scripts come prebuilt with, being able to do the Cretio Dataset. That’s a one terabyte, seven-day, click stream anonymized dataset to be able to do some recommendation… To build a recommendation model with. The really hard part of this is the cardinality of the data. The DRM wants to be able to do…

They want to be able to have everything in terms of numbers rather than being able to process strings directly. And so we need to be able to do something similar to Spark’s String Remapping, but the cardinality is so high that it’s something Spark’s default implementation just can’t handle. The DLRM, comes with a single script, a Python script to do this, but it’s really quite horrible because it’s single threaded, and it’s a really fabulous script. There’s a lot of work that has gone into it, but it takes forever. As you can see in this next slide, Doing the ETL with a default script took about six days.

DLRM on Crited Dataset (Past)

We had one person run it, but they didn’t actually record the time. They were just running it for a verification that were producing correct results, and they forgot to record the time. And I couldn’t convince anybody to run it again, but it took about six days to run that script end to end. So, we wrote a Spark script to be able to do the same sort of processing. And on 96 cores, that Sparked script was able to reduce it from six days, 144 hours down to about 12 hours, which is still a rather long time.

And the training side, training on that same 96 core CPU took about 45 hours to do the training. So, this is a fairly typical use case, where you’re doing ETL and then you’re doing training. And as you can see, the GPU training took about 0.7 hours, a little less than three quarters of an hour, on a single V100 GPU. So you can see the typical state of the art that people are doing is going to be taking a Spark ETL job, and running it on very wide, and then going to do training on a GPU. That’s kind of the state of the art today. But we wanted to see if we could make things even better.

And so, we took the Plugin that we’ve been working on and we threw it at this ETL processing. And a single V100 brought that processing time from 12 hours down to two. 2.3 hours. Which is a huge speedup. It’s really great, but we wanted to see if we could go even further and scale out as well, because two hours, you’re still going to be only able to do a couple iterations a day. It’s not really a full load… It’s not really go grab a cup of coffee, go grab a lunch break, come back, and your processing is done. So we also tried throwing eight V100s at the same job, and we were able to get the processing time down to about half an hour.

DLRM End-to-End on Criteo Dataset (Present) Spark ETL + Training for Criteo Dataset (1TB)

So you can see here, head to head, how these different things stack up. From the crazy CPU only version, which is single-threaded to the Spark CPU only version. And then the state of the art of what people are typically seeing and doing today, which is still over 12 hours of processing, all the way down to the GPU enabled version, which can be done in a little over an hour. With this we’re able to speed up from the state of the art several X, over what’s already there. And we can also speed up… We can drop the cost drastically as well. So Jensen, our CEO, he has this quote, he loves to say, “The more you buy, the more you save.’ And I think in this case, it’s really true.

RAPIDS Accelerator for Apache Spark (Plugin)

So, with this, we are really starting to show what Project Hydrogen can do, that we’re able to create a system, which is this Plugin, the RAPIDS accelerator for Apache Spark, where we can do all of the processing on a GPU, end to end, machine learning, and ETL together.

So, let’s get into some of the details of how this Plugin works. How are the SQL side of it works.

No Code Changes

So, the first thing I want to emphasize is that there are no code changes at all. That the way that we built this, it should be a transparent drop in replacement to what you’re doing. There may be a few config changes to be able to tune things properly, but the code itself should remain completely unchanged. We don’t support everything, but we do have a very large library of operators that we do support right now.

What We Support

And we’re working on growing that every day.

IS This a Silver Bullet?

Is it a silver bullet? I mean, we’re here talking about it. No, obviously it’s not going to solve all problems for all quarries. Small amounts of data. So if you look at the chart on the side here, that is a lg-based chart of fairly several different, common, typical IO speeds that you’ll get. All the way from the CPU cache, at about a terabyte a second, down to a spinning disc at 160 megabytes a second. And for the GPU, we’re typically operating on a separate PCI-e slot. So we’re either gen three or gen four, which means to be able to get your data from main memory, onto the GPU to do processing, we’re gonna go…. We need to move that data at about 12 gigabytes, maybe 13 gigabytes a second. And that provides a fair amount of overhead, that we have to overcome on the GPU and doing processing to be able to pay for that. And so, if you’re processing relatively small amounts of data, less than a few hundred megabytes per partition, the GPU is gonna have a hard time to overcome that. We typically see, once you get above a couple hundred megabytes, the GPU really starts to pay for itself. Other issues that we run into Cache coherent processing. It’s not the GPU is bad at it, it’s just that the CPU is very good at it too. And so trying to overcome the CPU cache there, at a terabyte a second, it becomes very difficult. The other thing is data movement. That if you’re reading your data from spinning disks, at 160 megabytes a second, you’re probably IO bound to begin with. You’re probably not compute bound. However, we see in a lot of cases, especially with SSDs, NVMEs, people are compute bound more or less the entire time that they’re processing. That the IO has gotten to be good enough that the CPU can’t quite keep up anymore. We also have have issues with things like UDS, where if you’re doing a lot of processing and things that we can’t accelerate on the GPU yet, then going back and forth between the CPU and the GPU, adds a lot more overhead to be able to pay for itself. And there are also a few situations where the limited amount of GPU memory we have can cause some issues, but we’ll get into those more as we go on.

But It Can Be Amazing What the SQL plugin excels at

But it can be truly amazing in a lot of cases. So, if you have high cardinality data, if you’re doing very big Joins, very big Aggregates, crazy Sorts, the GPU can eat those up, and just go at insane speeds compared to what the CPU can do. If the window that you’re operating on is bigger than the CPU cache, for say a Sort or Join, you’re definitely going to be able to beat it on the GPU. Window operations. A lot of window operations in Spark are N squared, where N is the size of the window that you have. And so if you’re doing processing on large windows, the GPU can completely destroyed the CPU in many cases.

We found aggregations with lots of distinct operations. We’re able to process those much much faster than the CPU can. We’ve seen really weird corner cases where we’re talking a 100X speed up, which we had to do a double take and go back and check to be positive that we hadn’t messed it up somehow. But we literally saw for one customer, a 100X speed up on certain distinct aggregate quarries. So complicated processing is another thing that we do really really well. And you may think, “I don’t have any complicated processing.” Well, if you’re writing out to Parquet or ORC, those are incredibly expensive processing that they’re doing all kinds of compression. They’re doing lots of deduplication and other statistics and things to be able to get the size of those files as small as possible. And we can see, in some cases, 20X speed up by writing to Parquet or ORC than the CPU can.

Parsing CSV can also be very expensive. It’s a very strange format to be doing on the GPU. But a lot of really smart people have spent a bunch of time to figure out how to do that in a really parallel way. And we’re able to beat the CPU in most cases with CSV parsing as well.

So, how does it work? How do we actually make all of this happen?

Spark SQL & DataFrame Compilation Elow

Well inside Spark, there’s the framework called Catalyst as Jason talked about, and in catalyst, you’ll either take and input this SQL, or will go through a SQL parser, and get turned into a DataFrame object, or if you’re using the DataFrame API directly, it will just… You’ll just essentially be building up the same Abstract Syntax Tree that the SQL was doing to produce the DataFrame. And as soon as you say go, that’s a collect or a right, or anything that’s going to stop the lazy evaluation and actually do processing, At that point, catalyst starts to do a bunch of compiling and optimizations. So it’ll take the DataFrame, turn it into a logical plan. That logical plan will get several optimizations. It will get turned into a physical plan, the physical plan will also go through several optimizations and then it will finally get turned into an RDD or Internal Row. So the actual processing the catalyst is doing is all done in RDDs, just as any other Spark processing is done. But the stages above it are being compiled down to produce an optimize RDD. And so, what you see on the UI in Spark is a representation of that physical plan. And so for the Plugin, what we do, is we tie into that. So, all of the stages above the physical plan are the same. So, the same optimizations, everything that happens. And then near one of the very last stages in the physical plan operations, right before cogeneration, and a few things like that, we get a chance to take a look at the physical plan. And we can walk through the physical plan and do a one-to-one mapping more or less, from operators that are CPU enabled on Spark Catalyst, to operators in a physical plan that can run on the GPU. And those GPU enabled Columnar operators are operating on RDDs of ColumnarBatch, which then when we’re done, we will convert back to RDDs or Internal Row to be able to provide a clean, transparent transition between the GPU processing and the CPU processing. So, as an example here, kind of a single simple query laid out, CPU on one side GPU on the other. So in this query, we’re doing a very, very simple hash aggregate. And so we’ll read in a Parquet file in both cases. Since Parquet is a Columnar format, the data is already Columnar. So we don’t have to do any kind of transition from rows On the CPU side, However, the CPU after it reads in the Columnar data, it needs to convert it into rows to be able to do the rest of the processing. And so the next step that it does is it converts those columns into rows. So it can start doing the first stage of the Hash Aggregate. We just, on the GPU side, we skip that and just go directly into a GPU enabled stage of hash aggregate. Then we start to do a shuffle. So, in the CPU side, they’re shuffling rows, on the GPU side, we’ve put in optimizations to be able to shuffle Columnar data efficiently. With that though, like I said, one of the optimizations that we really tried to do is to process enough data, to pay for it – the cost of moving that data. And so one of the optimization stages we’ve put in right after the shuffle, is to combine those shuffled data back together into larger chunks, so that the GPU can be more efficient and processing. After that both stages go into the second stage of Aggregate processing, And then finally, we go out and we write out the Parquet file. Now there’s a little bit of oddness on the GPU plan. After we write out a Parquet file, there’s a conversion from rows… From columns to rows at the end. All stages inside a Spark plan, have an output. The Parquet file writer produces no output, but because of things we haven’t fixed yet, we’re still putting in a transition at the end, to try to translate nothing that’s Columnar data, into nothing that’s rows. We have plans to remove that at some point in the future, but you may see it in. As you don’t get scared. It’s not going to impact performance in any way.

ETL Technology Stack

So with that, we’re able to build up an entire… We built up an entire stack to be able to do processing for this. So the underlying pieces that we’re using… So each stage in that plan, each of those GPU stages, we’ve implemented them in terms of cuDF processing. So cuDF is an open source library based off of RAPIDS, which is an open source set of libraries that use Aero formatted data in the GPU to be able to communicate with each other. All of these are built on top of CUDA.

cuDF itself provides a Python API that is very compatible with Pandas. And if you’re a Panda shop and you do a lot of processing with that, I would encourage you to go check out cuDF. But what we’ve done is we’ve also added in Java APIs on top of this same base library. And then we’ve implemented each of those operators in terms of those Java APIs, so that we can then get the parallel accelerated processing in Spark. We’re going to jump into a demo now, but before we do, I wanted to explain a little bit about the setup that we have. This setup is on Databricks in AWS, and we just pulled up two clusters using the latest version of Databricks for Spark 3.0.

Demo Cluster Setup

On the CPU side, it’s a fairly standard. We got r4.xlarge for the driver, and r4.2xlarge nodes for the workers. On the GPU side, we have a p2.xlarge for the driver. That’s actually not needed.

Databricks is still working through some things. The driver for us doesn’t require a GPU at all, but Databricks, because of how the ML pieces are set up, if you set up a GPU cluster, you have to have a GPU node. So we threw one in there, shouldn’t make any difference to the total cost or anything. It’s just a couple of cents difference. On the workers though, we’re using the p3.2xlarge nodes. So it’s more or less the same between the two nodes, it’s just, we’ve added one V100 to each node to do processing.

– This demo is doing data preparation and feature engineering on the 200 gigabyte Fannie Mae single-family loan performance data set, in preparation for training and XG boost model. The query itself is pretty standard Py. Spark. There are two queries here, one for each cluster, the GPU cluster and the CPU cluster. They’re identical, except for the location where we store the intermediate data. So they don’t collide and a couple of configs to tune each query optimally, either for the CPU or the GPU. These queries take quite a long time to run. So we’re going to get them started, and then we can explain more information about the setup and everything else as they’re running. We’ll start the CPU first, give it a little bit of a head start, and start the GPU notebook just right behind it.

The GPU cluster is on the left. The CPU cluster is on the right. We already covered what’s in it. So, just to let you see for yourselves that we brought them up, that they’re the same, 12 nodes on each. The only difference is that there’s a V100 GPU on each of the nodes on the GPU cluster, and not on the CPU cluster. Gonna put the video in fast forward playback now, you can watch the time or on the right to see how things are going.

There’re two steps that we’re going to do in the processing. The first step is to transcode the data from CSV into Parquet. The reason we’re doing this, to reduce the total size of the data from 200 gigabytes of CSV, into about 16 gigabytes of compressed Parquet data. The reason why we’re doing this, is to reduce the amount of data that we have to load as we process from S3. the data is originally stored in S3, we’ve just mounded it under the Databricks file system here. So, by compressing it, we reduce the total amount of access that we have to do to S3, and it speeds up the query. But the second thing that it does is it allows for predicate push-down. Parquet is a Columnar format. It stores the data for each column separately. And so in this complicated query where we do self joins several times, it can reduce the total workload, the total amount of data that we have to read, by a significant factor, because we only have to read the columns that we care about. And so that’s why we’re doing this first step of transcoding the data into Parquet. We’re going to switch back to normal playback speed soon because the GPU cluster is just about done with all of the processing that it needs to do.

Now that the GPU job has completed, let’s go take a look at the Spark UI for the GPU cluster, just so that you can see what it looks like, that we have integrated in all the GPU changes pretty seamlessly, and it should be pretty intuitive that on the SQL page, you can see that we have the same type of boxes, the same type of events that you’d expect on a Databricks UI, same with a normal Apache Spark UI. We even have integrated in all the metrics that you would expect on each of these boxes. So, if I open that up, we’ll scroll over and you can take a look. There’re metrics there that are GPU specific as well. And now that we’re done with the GPU cluster, let’s shut it down and then we’ll speed up the rest of the playback. So that you don’t have to wait around for 20 plus minutes for the CPU to finish.

Even at 20X playback speed, it’s going to be kind of slow to wait for the CPU to finish. So to avoid you getting bored, let’s jump straight into some of the results of this run. So, the GPU ended up being about four times, a little over four times faster than the CPU, almost seven minutes exactly versus 29ish minutes on the CPU side. And the cost… Well, V100s are kind of expensive, but we still ended up being about 25% cheaper, if you’re paying for Databricks enterprise, than the CPU version.

I hope you’ve enjoyed this demo. Thanks for watching. – As you can see, we’ve gotten some really great speed ups by being able to do this. That we got a 4X speed up in performance, which is really quite amazing, and also an 18% cost savings. Now, the numbers you saw before in the video on the cost savings, those were for the enterprise edition. Standard edition, We still win. We still get an 18% cost savings. But that’s not very much. And we know that. And so we decided to try and run with a slightly different set of GPUs.

(4 Cluster Setup

So, T4 GPUs are designed for inference. So, they have a little bit less processing, they’re a lot cheaper than V100, which is really optimized for the training the ML/DL side. T4 fits better with SQL. And so, to get an idea of what we wanted… To get an idea of what the cost could be like, we spun up a cluster in AWS itself, not on Databricks, and set it up with 12 nodes with T4Ss. So, same number of courses before actually less memory, but doing the same processing.

Coming Soon…T4 GPUS on Databricks

And we ran the exact same notebook through. What we ended up being is that the V100, was able to do the processing a little bit faster. So instead of 4.1X, the T4s were 3.8X, but they’re also much, much cheaper than the V100s. And we were able to see a 50% cost savings just on the AWS side, across the board. And that translates into what we would predict would happen also on Databricks. Databricks, from what we hear, is going to start supporting T4s. In Q3 sometime, I don’t know for sure, you’re going to have to talk to them about it, but these numbers, we just assumed worst case. They charge as much as a V100 as far as the Databricks units for for that as well. And even with that, we’re able to see a 50% cost savings on this query just by by using the T4s.

So, we wanted to see, is this just that one query, after all you know, that that’s a query we wrote, how can you trust it? It’s one of those weird benchmarks. We wanted to see on other queries as well. So we took a couple of TPCx-BB queries and ran them on the exact same cluster. And to show the CPU versus the GPU version. And here as well, we saw about three and a half X speed up and a 40% cost savings on these two TPCx-BB queries on that same same setup.

So, now I’m going to turn the time over to Jason to talk a bit more about the second part of the acceleration that we’re doing, and that’s accelerated shuffle. – Thanks Bobby. So, let’s talk about Accelerated Shuffle, and what that means when we’re processing on GPUs. So, when we’re talking about shuffle, let’s make sure I’m the same page.

Spark Shuffle

When we say shuffle, we’re referring to the data exchange between Spark stages. So here’s an example showing two stages in a Spark job. The first stage has a parallelism of three, represented by the three tasks. Zero, one and two, and the second stage has a prevalence of two, so the’re two tasks there. And as stage one runs, every task there is going to partition its output into two partitions, reflecting the parallelism of the stage after it. So, when that stage completes, we’re going to transfer or shuffle all of those partitions to the respective tasks in the subsequent stage. So that transfer, that exchange, is the Shuffle. So, let’s think about what happens when we start processing ETL operations, SQL and DataFrame operations on a GPU, and what happens during the traditional shuffle. So, for this example, let’s assume that GPU zero, is where some data has been produced and is ready to be shuffled to the next stage. So, here we can see that the Spark shuffle is what I would refer to as a CPU Centric Data Movement, which makes sense because traditionally Spark has always executed on CPUs. So, here, let’s say we want to transfer the data. So, as soon as the stage completes, the data is going to be fetched from GPU zero by the CPU into main host memory, and then written out to disc just as Spark shuffle works today. And that’s going to cross across the PCI-e Bus as Bobby referred to earlier, and cross that into the CPU, into the host memory, and the cross it back down into local storage as we store that data. Then when the next stage runs, let’s say on the same note, GPU one runs another task in the next stage, and it wants to fetch that partition. So what’s going to happen is, the CPU is going to then fetch that data from local store, Maybe the page cache in the operating system avoids that read, maybe it doesn’t, depends on how much activities happen on that note in the interim. And then it’s going to send that data down to the GPU across the PCI-e Bus again. And let’s say, what happens if it’s running on a GPU on a different note? So then it’s going to fetch that data, CPU will fetch the data and write it across the PCI-e Bus to the network. And that network is … That Nick has going to send that data across the network to the remote node, who’s then going to cross the PCI-e Bus to the CPU on that node, and fetch back across the PCI-e Bus again, to the GPU on the remote note. So you can see that we’re crossing the PCU Bus, you know, somewhere between four to six times, depending on where we get page cache effects or not, as we do this operations. So we want to see if we can improve that. So, can we do something more like a GPU centric data movement? So in same scenario, same setup, GPU zero has produced the output, but we’re going to see if we can do something better than always fetching it to host memory, and always writing it to this. So the first step we’re going to do, is we’re going to cache that data on GPU zero, where it was produced, in the hopes that we can do something more, something smarter with it. So, let’s say the next stage pass to run on GPU zero. If we get lucky the next day, it runs on GPU zero, or it goes to fetch that partition, it’s already in GPU zero’s memory. That’s the best kind of shuffle you could hope for because it’s zero copy. We just fed the two to write to each other. In the other scenario, let’s say GPU one runs the next stage, if it’s still cached in GPU zeros memory, Well, often multiple GPUs in the same node, are connected with a fabric called NV link. And that is a high speed peer to peer network. That’s not the PCI-e Bus, it’s much faster than it. And we can do a direct transfer, not involving the CPU between the two GPUs. And then that’s also a very nice outcome. And let’s say the remote case again, if you talk about before. we need to send this to a GPU one remote note, and it’s still cached in the GPU zeros memory, then we may be able to leverage something called RDMA or Remote Direct Memory Access, where the GPU… The data will be sent straight from GPU zeros memory to the nick and across the network. And as it’s received by the receiving end, it will go straight from that receiving nick into the GPU memory of the receiving node. Neither CPUs of either sender hosts or receiving hosts are involved in actually copying the data. And that’s also very nice because it reduces the amount of PCI-e Bus traffic involved in the transfer, and doesn’t involve the CPU, meaning it’s free to do other operations. And then of course, finally, you know, if we can’t store all of it in the GPU zeros memory, GPU memory is, you know, it is limited, then we need to spill somewhere. And in the future, we’d like to leverage a technology called GPU direct storage, where we could, again, similar to the network case, we could have the GPU data directly transferred from the GPU to end VMEs or local storage banks on the nodes, rather than having it fetched into host memory, and then fetch back across, down to the local storage again.

So, speaking of spilling from GPU memory, let’s see how it works today without GPU direct storage. So, if we cannot hold all of the memory in GPU zero, then as GPU zero runs out of memory, we will on-demand spill those shuffle buffers, those partitions that need to be shuffled will spill them to host memory across the PCI-e Bus, and then if we can’t cache, we have to configure them on a host member. It can be used as a cache there, and if we can’t cache it there, then eventually we’ll spill to local storage using the CPU as it’s done with Spark shuffle today. So, let’s cover again, the various scenarios of how we can transfer that data to the next stage. So again, if GPU one wants to fetch the data, and it’s either on local storage or in host memory, if it’s in local storage, we can fetch it from the CPU into host memory, and then from there, the GPU can fetch it from host memory, down to the PCI-e Bus, similar to how it works with legacy shuffle today. If we want to send it over the network, we may still be able to leverage RDMA in that case. Once it’s in host, we can have the RDMA directly transfer that data from host memory across the Nick to the remote. And again, importantly, on receiving end, it won’t enter host memory, it go straight from the nick, into the GPU, avoiding another double crossing. And so we can still leverage some of those things, even in the case where we actually are spilling to host memory and then all the way to disc. So with all of these different transfers, we’re transferring between GPUs, GPU memory to GP memory.

UCX Library

We’re doing host memory to GPU memory, We’re doing GPU with RDMA, we may be doing GPU with TCP, There’s all these different transports. How can we manage all those? And that’s where UCX comes in. So UCX stands for Unified Communication X. It’s an open source open consortium that it strikes the communication transports. So you set up end points and you tell UCX, I want to send from this end point to this end point, and it selects the best routes available, and it might split more than one route if you enable that feature for higher bandwidth. So, for example, it supports TCP, supports RDMA, supports shared memory, and could IPC for GPU transfers. And importantly, it supports zero copy GPU transfers over RDMA as I’d mentioned before. It’s important to note that RDMA does require network support, meaning you’ll need something like InfiniBand or RoCE to do RDMA. And I highly recommend you visit their website open ucx.org. If you’re interested in more information on that.

So what happens if we put this together? we actually start running this shuffle Plugin. So again, this is a TPCx-BB like query, it’s similar to query 22, which is Inventory Pricing query. This is again running on those DGx2 nodes, the two DGx2 nodes, 192 CPU cores total, and here this shuffle is about 225 gigabytes of compressing data, and running this on the CPU, the query X gets in about 228 seconds, some two and 20 seconds, something like that. And if we just turn on the RAPIDS accelerator Plugin without a shuffle accelerator, just using Sparks legacy shuffle, that time cuts down to 45 seconds, which is a nice improvement. But can we do better? And so if we turn on this accelerated shuffle Plugin, where we’re trying to cache the data GPU memory, trying to leverage Nvidia link, trying to leverage RDMA, because these nodes are connected with Infiniband, then we see that that time cuts from 45 seconds down to just over eight seconds, which is a very very nice win on an already nice win, right? So that’s, that’s a nice, impressive outcome. And that’s what we’re looking for to try to optimize this IO transfer, this data movement. So of course you’re asking, “Well, what happens “if we can’t catch it all in GPU memory? “do we see any improvement at all?” And that’s where this comes in. So this is like the ETL processing for TPCx-BB query five. It’s basically the same query five does for ETL. This is the ETL processing to get ready, to train a Logistical Regression Model, and that needs to produce, I think 10 input vectors. And that model is going to identify what visitors are interested in on different categories, but that ETL is particularly expensive. And so you can see that just doing the ETL for this, using those 192 cores from those across the two DGX-2 nodes, we see that that query runs in just over 1,550 seconds on the CPUs. We turn again and turn on the Plugin, without using the shuffle, just using regular Spark legacy shuffle, and that cuts the time to 172 seconds, which is a very nice win. And again, this is shuffling well over 800 gigabytes of compressed data, far more than query 22 did. And so we are spilling not only from the GPU reader host memory, but we’re spilling to disk quite a bit of the data as well. But even with that, we see that turning on this shuffle Plugin, we drop the GPU time in half. So we took an already nice win from just using the RAPIDS accelerator. and doubled its performance by adding this shuffle, Again, leveraging these high bandwidth transfers over Nvlink…. Nvlink when we can’t catche, And RDMA, if we have to go through host memory in the worst case.

So, with that, what are the next steps for this project? So, first off, the first thing we’re gonna do is we gonna open source this accelerator. We’re actively in the process of it. That’s imminently going to happen. We’re also actively working on Nested types support, such as Arrays, Structures, and Maps. We’re doing decimal types. And of course we’re working on more SQL operators all the time. A little bit further out, we’re working on GPU direct storage. As I mentioned before, timezone support for timestamps. Right now we only support the UTC time zone, we’re working on SQL Higher order functions. And of course, UDFs and user defined functions. And you may be wondering, “How can we support UDFs?” “It’s a black box of code.” “How can we automatically translate to that, “to put on a GPU?” And yes, that is a very very difficult problem, which is why it’s further out. But we do have a research team that’s looking into ways of not necessarily translating arbitrary UDS, but looking at certain kinds of UDFs, and being able to digest them, break them down and translate them automatically into GPU operations, so that we can accelerate even some UDS.

So, where can you go to get more information? I highly recommend you go to main landing page we have for it, which is NVIDIA.com/Spark. There you’ll have all the information about the RAPIDS accelerator. I recommend you use the contact us link there, where you can get in touch with a NVIDIA Spark team. You can listen to Adobe’s email marketing, a use-case where they used databricks and this accelerator Plugin to speed up and accelerate their marketing intelligence services Use-case pipeline. And finally, there’s a free ebook at NVIDIA.com/Spark-book, that covers the GPU features coming in Spark 3, the RAPIDS accelerator Plugin, and details like that.


 
Try Databricks
« back
About Robert Evans

NVIDIA

Robert Evans is a Distinguished Software Engineer at Nvidia. He is a PMC member of Apache Hadoop, Spark, Storm, and Tez. Prior to Nvidia, he worked for Yahoo on the Big Data Platform team on Apache Spark, Hadoop, Storm, and Tez. He also worked on enabling the GNU Linux operating system on ARM processors for mobile devices. Robert holds BS degrees in Computer Science and in Computer Engineering from the University of Utah.

About Jason Lowe

NVIDIA

Jason Lowe is a Distinguished Software Engineer at Nvidia. He is a PMC member of Apache Hadoop and Tez. Prior to Nvidia, he worked for Yahoo on the Big Data Platform team on Apache Hadoop and Tez. He has held positions of Architect for the embedded Linux OS group in Motorola and Principal Programmer at Volition Games. Jason holds a BS in Computer Science from the University of Illinois at Urbana-Champaign.