Modernize Your Analytics Workloads for Apache Spark 3.0 and Beyond

May 27, 2021 12:10 PM (PT)

Apache Spark 3.0 has been out for almost a year, and it’s a safe bet that you’re running at least some production workloads against it today. However, many production Spark jobs may have evolved over the better part of a decade, and your code, configuration, and architecture may not be taking full advantage of all that Spark 3 has to offer.

In this talk, we’ll discuss changes you might need to make to legacy applications in order to make the most of Apache Spark 3.0. You’ll learn some common sources of technical debt in mature Apache Spark applications and how to pay them down, when to replace hand-tuned configurations with Adaptive Query Execution, how to ensure that your queries can take advantage of columnar processing, including execution on GPUs, and how your Spark analytics workloads can directly incorporate accelerated ML training.

We’ll provide several concrete examples taken from an end-to-end analytics application addressing customer churn modeling, recent experience modernizing Apache Spark applications, and lessons learned while maintaining a library of Apache Spark extensions across three major versions of Apache Spark.

In this session watch:
William Benton, Principal Product Architect, NVIDIA

 

Transcript

Will Benton: Thanks so much for joining me. My name is Will Benton. And today we’re going to be talking about some changes you can make to your Spark jobs to take advantage of new features in the Spark three series. Here’s a little bit about the guy you’re listening to. I work on data science product strategy at NVIDIA. And in this talk, I’m going to mention some work my NVIDIA colleagues have done, and some work that I’ve done, but any opinions in this talk are mine alone. I really enjoyed presenting at Spark Summit and Spark+AI Summits in the past. And it’s an honor to be back this year, for Data+AI Summit. I’ve been involved with the Spark community for just over eight years now. In April, 2013, I wrote a tutorial about how to get Spark 0.7 up and running on Fedora. And back then, Spark was hosted on sparkproject.org instead of at Apache. Spark SQL and GraphX didn’t exist yet, but you could use Shark or Bagel. And running Spark meant also building Mesos and a patched version of Hadoop.
Spark has had three major releases since I first used it. And while some fundamental APIs, like the RDD and configuration parameters have been really stable over that time some of the changes that you can and should make to older applications to work well with newer versions of Spark are really going to be the focus of this talk. So here’s what we’re going to look at. We’re going to place things in historical context. Look at how our configurations might expose leaky abstractions and the trade-offs that we encode when we tune our jobs. We’ll look at how to get around making those trade-offs statically, with adaptive query execution. We’ll look at how we can take advantage of columnar execution and our data frame processing. And then finally, we’ll see a really cool consequence of columnar data frame processing, which is the ability to accelerate query processing with hardware acceleration. So I want to start though with some historical context.
Now you might’ve been expecting that historical context would mean RDD’s or Hadoop. We’re going to go back even further. This is a portrait of Denis Diderot from 1766. Diderot was a prominent figure in the French enlightenment. Here we see him wearing a decent, but unremarkable robe. He looks sort of content. He’s almost cracking a smile. This is a different portrait of Diderot from 1773. He’s older, wearier, and less content, but he also has a fancier robe. If we’re going to take Diderot’s claims at face value, this robe is actually the source of many of his problems. As you might imagine, being a public intellectual in the years leading up to the French revolution, was not a particularly lucrative occupation and Diderot regularly had trouble with money. But at some point he was given this impressive scarlet robe and it made him realize that by comparison, all of the rest of his possessions were far more ordinary.
And the contrast with everything else he owned, made him replace nearly all of his possessions to live up to the standard of this new robe. Furniture, bookcases, displayed art and more, everything needed to be upgraded to not look shabby in comparison to this new robe. In an essay from 1769 Diderot laments that this gift has led to his dissatisfaction with his possessions. He describes himself as enslaved to the new lifestyle introduced by this fancy robe. Now this phenomenon of consumer behavior in which a single upgraded possession can lead to a cascade of additional purchases, is called the Diderot effect by sociologists. It’s easy to imagine how this can work with possessions, even if you aren’t particularly sartorially inclined. I mean, everyone has bought a gadget that’s turned into a lot of accessory purchases, right? And I think there’s a pair of really interesting analogies in this concept of cascading consumption, and they’re ones that we in the Spark community are familiar with.
So let’s look at the first of those analogies. Now, when we build data processing systems, we have a general concept of what’s possible with the tools, techniques, and architectures that we know about. We have a pretty good sense of whether or not we’ll be able to solve a given problem at a given scale or a given level of precision. When we start with a new, more complex problem or a problem at a never before conquered scale, it seems impossible at first, but improvements can make it possible to solve a problem in a limited way. Instead of solving a problem at scale for example, we might solve it on a subset of our data or compute a more imprecise approximation of the answer. But for many domains, we aren’t happy with approximations or being forced to work on a sample. We’ll want to be able to solve the problem on more of the dataset.
Typically, the next stage in our evolution involves solving a harder problem or the same problem at a more challenging scale, but at great computational expense. Think of problems where you don’t mind that they’re slow because you’re happy to be able to solve them at all. Of course, our contentment with this state of affairs is short lived because we really want not just to get answers, but to get them quickly. But a quick solution for the problem we initially set out to address ultimately leads to further dissatisfaction as well. Once we can solve a problem quickly, we’ll want to solve a bigger version of that problem, whether that means operating at greater scale or whether that means using more complex techniques.
If you’re passionate about solving problems with data in your organization, you probably have the same problem with pipelines, queries, and workflows that Diderot did with his wardrobe and furniture. A fast scalable solution to one problem makes everything that isn’t fast or that doesn’t scale look bad. And it creates this additional desire to solve more problems, bigger problems, to expand along every dimension. You’ll always want to move up this hierarchy from impossible to partially possible, to slow to fast. You’ll want to move as many problems to the right side of this hierarchy as possible. And once you get to fast, you’re going to want to use your new powers to solve bigger problems.
So let’s look at how analytics systems have evolved in the last couple of decades and see echoes of this Diderot effect. Around the turn of the century, you might run business analytics in a pair of databases. Your source of truth would be a transactional database, which you’d put on the fastest machine you could find because you couldn’t scale it out. And that would federate all of your operational data. Your analytics would happen in an analytic database, which would periodically mirror the data in the transactional database. And you could scale this up a lot more. Now you couldn’t scale the transactional database and your analytics were going to lag behind. And this was also a very expensive architecture to operate, and it was difficult to tune. The Hadoop architecture relaxed some of the constraints and capabilities of relational databases in order to make it possible to solve bigger problems at scale.
In this case, our source of truth is a distributed file system. We spread our data on ingest across physical files on different nodes of the file system and compute jobs migrate to the nodes that contain the data that they’re going to operate on. The advantage of this Hadoop architecture is that unlike our relational database, you can scale it out arbitrarily by adding more nodes, you can add more storage and you can add more compute. The downside is that the analytic processing was slow and that we can’t independently scale our compute and storage. Now we can imagine that these areas of analytics correspond roughly to windows over this hierarchy of data processing systems that we looked at earlier. In the transaction processing analytic processing era, many things were impossible or were possible merely with caveats. We had to be very careful about the problems we tried to solve. With the Hadoop era, we were able to solve more problems at scale, but typically things weren’t as fast as we would like. And there was a lot of clever engineering going into making it faster.
With Spark, we shift the window more to the right and get more in that desirable part of the space where we’re solving a lot of problems, where we’re solving really difficult problems slowly, and where we’re solving many problems quickly. And the advantage of Spark of course, is that we were able to scale our storage and compute independently because our jobs executed by caching data in worker memory. So you don’t have to have a compute job running on every storage node and your file system is not as important in intermediate communication for jobs as it was in the Hadoop era. Spark offered efficient execution of static compute graphs derived from high-level programs with the RDD API. It offered runtime planning of database queries executed with the data frame and dataset APIs. And as we’ll see in recent versions, Spark also can dynamically reorganize plans in response to how the kind of data that they’re actually operating on, with adaptive query optimization and cost-based optimization.
But if we look at data frame and dataset execution in the context of how Spark’s execution model has become more dynamic over time, we can actually see another instance of the Diderot effect. And unlike our example earlier, where improved performance on one dimension of one workload left us wanting improvements along other dimensions, or on more workloads, this is an example of the Diderot effect within workloads. In the classic Hadoop era, the speed of spinning disks and gigabit ethernet could be a huge bottleneck and have huge performance impacts on jobs, especially since the distributed file system got hit with every stage of every job. Spark addressed some of these concerns right away by allowing users to cash collections in memory, and encouraging an architecture that desegregated compute and storage, which made it possible to write compute jobs once again, that were compute bound after all. Other changes, like the increasing prevalence of 10 gigabit ethernet and solid state drives, made network and disk latencies not as much of an issue, but really exposed that the CPU parts of our jobs were not as fast as we’d like.
The popularity of the data frame API presented an opportunity to change things at the framework level. Because data frame operations are a higher level of abstraction than RDD operations, they’re much more amenable to optimizations. And Project Tungsten was a family of initiatives to make Spark’s data frame processing faster on the CPU. It entailed a lot of great engineering advancements like columnar processing, native code generation, off heat memory management and cache sensitive data structures. These techniques combined to make query processing faster and use less memory. And they also essentially arrived for free, as far as Spark application developers were concerned. The kinds of low-level optimizations that might’ve required you to hire a team with compiler and database experience were available to any job that used data frames. If we want to look at the Diderot effect again, in the context of 2021, we might say that eliminating bottlenecks making parts of our workloads faster, makes the slow parts much more obvious. And that the efficiency we get with one improved component creates new challenges, but also new opportunities.
So we’re going to talk about those challenges and opportunities now. And I want everyone to think about what the oldest Spark application that you have in production is. A lot of people have three year old Spark applications these days. Now, if you have an application that’s been in production for three years, if you were on the cutting edge when you developed it, you were developing against Spark 2.3. If you weren’t on the cutting edge, if you were in a more conservative organization, you were probably developing against an earlier release in the 2.0 series, or maybe even Spark 1.6. If you have an older application, say a five-year-old application that you were developing in mid 2016, you may have developed that against any one of the releases in the 1.0 series.
You know, you probably use data frames, but you may not have used data frames that effectively. I’ve actually seen Spark applications that are seven years old that are still running. And an application like this is going to have been developed before spark 1.0 was even released. Think about how much Spark has changed since before version 1.0, and how many great new features there are. And think about how an application that’s basically worked since then may not be taking advantage of those features. The other thing to consider, is how the hardware landscape has changed in this time. If we think about hardware refresh cycles as being approximately three years, if we have that application that we developed in mid 2016, we’ve gone through probably two hardware refresh cycles since that time. And details about the systems that we’ve designed our Spark jobs for can leak into the configurations that we run our Spark jobs against. The most obvious place where this can happen is in the number of partitions you’re using.
If you’re writing a Spark job to run on a number of small machines that have a small amount of RAM, you’ll use a lot of partitions. If you’re going to modify that job to run on a cluster with fewer or larger machines, you’ll use larger partitions, but fewer of them. Now, the interesting thing is, the machines we run on may change more often than our configuration does, especially in the public cloud. So one thing to consider is whether or not the default parallelism that we set up when we wrote an app still makes sense for the configuration that we’re deploying it on today. Another thing to consider, is that while we may have configuration parameters that we submit with our job, the partition count that we ran the job with actually lives beyond the configuration. For example, in the number of partitions that we write to disk when we save a file to stable storage.
Another thing to consider is how the JVM has changed. If you were using Spark three or five years ago, it’s possible that you were developing against a JVM that had a radically different garbage collector, that worked with different heaps, and so on. Similarly, if you were designing an app to work well with task level scheduling on Mesos, but now you’re running it with stage level scheduling or similarly, if you had a job that you were designing to run with fine-grained scheduling on Mesos in 2014 or 2015, but need to run on coarse-grained schedulers in Yarn or Kubernetes today, there are probably some abstractions that have leaked into the way you’ve configured and tuned your application, that aren’t going to be optimal for your current environment. There are a lot of lower level things you can think about too. Thresholds and cutoffs, partition counts for shuffles, how much data we want to read from disk into a partition.
These things all can change with our hardware and with the version of Spark that we’re running on. But even more insidiously, these configurations can leak into code. And a lot of Spark applications have configuration parameters spread throughout the code as magic numbers. “I’m going to repartition this to a Fibonacci number because it makes it faster,” is the comment, right? You’ve seen all kinds of things like this in real jobs I’m sure. The challenge with these kinds of jobs is that you need to really identify a few places where you have to update them for contemporary environments. So tuning performance, our priori is difficult. It’s better if we can defer some decisions to run time. We talked about how Spark has become more dynamic over the years. How we’ve gone from efficient execution of static compute graphs that are implied by RDD operations to query planning at runtime.
And now what we’re going to talk about is how we can reorganize our data in response to its dynamic distribution. And this is what we’re doing with adaptive query execution. Choosing the right partition size for optimal execution is difficult. And the right partition size at one stage of your job may not be the right partition size later. In this case, if we have a filter operation running on a partition data frame, we might eliminate nearly all of the elements in this data frame, but this is actually the best possible case if we’re filtering out a lot of elements from a data frame. Because we might also see something like this, where instead of filtering things more or less uniformly from each partition, we affect some partitions more than others, causing our runtime to be dominated by how long it takes to process the partitions with the most elements, rather than the average number of partitions in each element, where we had a more uniform distribution.
Adaptive query execution can help to solve this problem by coalescing the partitions that we use for shuffling. If we recognize that we have a lot of sparse or a lot of very small partitions, we can combine them together into relatively fewer, relatively larger partitions. Another advantage that adaptive query execution can do, is choose a different join strategy when appropriate. We talked earlier about setting thresholds and one of the thresholds you can set in a Spark configuration is when to broadcast join keys to use those in a join. And just as a quick refresher on how joins work, let’s look at three strategies that you could use to implement a join between two relations. The first one is just where we loop through every element in the first relation and compare the key to every element in the second relation with nested loops. This is extremely inefficient. It’s a geometric number of comparisons, but it will work.
Another strategy we could use is where we sort both relations by the key value and then step through each relation. And so in this case, if we compare the first element and the left relation to the first element in the right relation, we see that the key value in the right is lesser than the key value in the left, we would advance. And so this is much more efficient than nested loops because we just step through these without doing all of those extra unnecessary comparisons. A way to avoid even more unnecessary comparisons is to broadcast the rows that might be involved in the join and as a hash, so that we can make very efficient comparisons and decide right away which rows are going to be involved in the join. And what would we do here is basically summarize the keys for each relation and then distribute them. Now, the thing about adaptive query execution is that we can convert dynamically, sort-merge joins to broadcast joins when it makes sense to do so. And this can result in some great performance benefit.
But the nice thing is that because we’re collecting these statistics about which keys are in which partitions to do this join more efficiently, we can also decide not to consider some partitions for certain joins at all. And that’s another advantage of adaptive query execution. And in this case, we use that information to say, “Once I know that a certain partition is not going to participate in the join at all, I don’t even have to consider it.” Which can be another advantage. Turning on adaptive query execution is very easy. There are some parameters to tune, but you can get a lot of benefits just with the defaults, which you get by turning on sparks.sql.adaptive.enabled to true in your Spark configuration. Once you have this enabled, you’ll see in your query plans, whether you’re inspecting them manually or looking at a Spark event log or in the Spark history server, that you’ll see that there’s a node in your query plan called AdaptiveSparkPlan, which will tell you that you’re running with adaptive query execution.
Those of you who’ve worked with machine learning or scientific computing, know that it’s always more efficient to operate an array at a time, rather than an element at a time. Columnar execution brings similar benefits to databases and it also opens a higher level of abstraction. Let’s say that we have data stored in rows, and we want to perform some operation where we filter some rows and then we project out some columns. If the data are stored in rows, we’re going to load as many as we can into memory, perform whatever operations we’re doing on each row, and then perform the projection. But we’ve polluted our cache with all of this row data and we’ve done a lot of extra IO that we don’t need to do. If we reorganize the data in a columnar format, we can take advantage of the fact that in order to project something, we only need to investigate the columns of interest and we can reconstruct the rows from this columnar representation.
So columnar representations reduce the storage demands in IO by compressing repeated values and sequences of values. And we can also have algorithms that operate directly on this columnar representation for faster access and cache friendly execution. One of the big advantages of the columnar representation, is that there’s a plugin interface for Spark that lets you delegate out columnar processing to accelerated libraries. And there are a few of these for different hardware accelerators. I’m going to talk about the one I know best, which is the one that accelerates query processing with NVIDIA GPUs. And this is the RAPIDS Accelerator for Apache Spark. Basically what the plugin does, is it delegates data frame operations that can run on the GPU to the cuDF library, which provides accelerated data frame operations in C++. So what this looks like at the query planner level is if we have a query graph for an application, some operations will be able to run on the GPU and others will not. But these are transparently handled by the plugin, which schedules some operations to run on the GPU and others to run on the CPU and inserts transfers back and forth.
The optimization happens at the level of the query plan and it’s transparent to the user. We put together a case study showing how this works in an end to end machine learning workload. We’re going to talk about the parts we implemented in Spark. The machine learning part was in XGBoost, but we had a case where we were modeling and predicting customer churn. The idea is given a customer’s behavior, is this customer going to renew his or hers subscription or not? And so we had basically three applications. We’re going to talk about the one on the left, which is an application that federates data from multiple sources, takes data from our relational schema and puts it more in a wide table that a data scientist would expect to have land on her desk. And also does some exploratory analytics on these data. So essentially in this workload, we go from five tables that have individual kinds of observations about a customer, to one wide table that has for every given customer, everything we know about that customer. We also produce analytic reports about the distribution of field values.
And we do a data cube to identify which features we’ve collected about a customer are most correlated with retention or churn. This is the Spark part of an end to end machine learning workflow. Again, we ran the machine learning part in XGBoost, but if we look at this standard Spark code that we ran on GPU, just by adding GPUs, we were able to get a 4.2x speed up overall on the whole workload. And the analytics part, where we were doing that exploratory analytics, that sort of reporting, was almost seven times faster with GPUs than with CPUs. By using data frames, by using modern features like adaptive query execution, and by relying on columnar execution, we were able to delegate to a hardware accelerator and get a really impressive speed up for free.
So today we talked about the Diderot effect in data processing systems. How making one thing fast makes us want to make everything else fast too. And how having everything fast makes us just want to do more. We looked at how Spark has changed over the years. And we talked about how your applications may have some assumptions that made sense when you wrote them that don’t make sense anymore. We looked at adaptive query execution, which provides a way to mitigate the impact of making decisions statically in configuration. And we saw how using high-level interfaces like columnar processing and data frames enables us to benefit from hardware acceleration in our query jobs as well.
So where do we go from here? I’d love it if you kept in touch and let me know about your successes or challenges in this space. You can reach me on Twitter or GitHub @willb, or via email at willb@nvidia.com. Upgrade to Spark 3.1, and look at those configurations. This is a great time for spring cleaning. You should check out the RAPIDS Accelerator for Apache Spark, which is a great way to get transparent acceleration for your Spark jobs on GPUs. And finally, I’d like to recommend two Data+AI Summit talks by my NVIDIA colleagues. Jason Lowe will be talking about accelerating UDFs with the RAPIDS Accelerator for Apache Spark. And Tom Graves will be presenting on how stage level scheduling can help Spark better support heterogeneous workloads. Thanks so much for your time. Please don’t forget to review and rate the sessions, and I’m looking forward to your questions.

William Benton

William Benton is passionate about making it easier for machine learning practitioners to benefit from advanced infrastructure and making it possible for organizations to manage machine learning syste...
Read more