Defining customized scalable aggregation logic is one of Apache Spark’s most powerful features. User Defined Aggregate Functions (UDAF) are a flexible mechanism for extending both Spark data frames and Structured Streaming with new functionality ranging from specialized summary techniques to building blocks for exploratory data analysis. And yet as powerful as they are, UDAFs prior to Spark 3.0 have had subtle flaws that can undermine both performance and usability.
In this talk Erik will tell the story about how he met UDAFs and fell in love with their powerful features. He’ll describe how he faced challenges with the UDAF design and its performance properties and how, with the help of the Apache Spark community, he eventually fixed the UDAF design in Spark 3.0 and fell in love all over again. Along the way you’ll learn about how User Defined Aggregation works in Spark, how to write your own UDAF library and how Spark’s newest UDAF features improve both usability and performance. You’ll also hear how Spark’s code review process made these new features even better and learn tips for successfully shepherding a large feature into the Apache Spark upstream community.
– All right, hi everybody and thanks for attending my talk. Today, I’m going to be telling you a story about user-defined aggregation in Spark. How it works, and how you use it, and how it’s changing in Spark 3.0.
It’s not just a story that I’m telling, it’s kind of a love story. All love stories are one of the oldest human stories, and they’re all basically the same. Hero meets aggregating API, hero files a Spark JIRA for a performance bug, and then at last, hero merges a Spark 3.0 pull request.
All good stories ought to begin by establishing the plot. To make sure we’re all on the same page, I’m going to be summarizing Spark’s data and compute model.
Spark’s data model is to present the user with a logical view of their data as a simple sequence of data elements.
But under the hood, its physical model is to store data in possibly multiple physical partitions. Here you can see our data has been split into three physical partitions in the example.
What’s it look like to compute on this kind of data? As an example, let’s take a look at summing some numbers. We begin by setting some accumulator to a zero value, and we update our accumulator with the values we see in our partition, and we loop through the partition until we have updated, in this case, summed every piece of data in the partition.
When we’re done we have a partial result, in this case a partial sum for the data just in that one physical partition.
In parallel, Spark is doing the same procedure for every single partition in the data set until it’s finished with all the partitions of your data. Once it has these partial results, it begins to merge them until it comes up with the final answer, which is the sum over all of our logical data.
Now sum is an example of a Spark operator.
Excuse me, a Spark aggregator. Aggregators all have the same kinds of properties. For instance, sum operates on input data that has type number. It also has an accumulator, or aggregator, which is also numeric. It defines an initial value of zero, and it also defines how to update the accumulator, or aggregator, and merge it when it’s done computing partial results. Now, Spark comes with an entire zoo of predefined aggregators, sum is just one. Another one is taking the maximum. You can see here that maximum fills all the same properties of an aggregator that sum does, but it does it differently. Its concept of an initial value is minus infinity.
It uses maximums instead of addition for its update and merge.
Another common aggregator is average, which also operates on numeric data, but its accumulator, or aggregator, is a pair of numbers, a sum and a count of elements. You can see that update and merge operate on this pair, not just a single number. They’ve also defined a final presentation function to return a value to the user. In this case, just the sum over the count.
Now that we’ve established the plot, it’s time to introduce the hero to the object of their affection.
As a data scientist, one of my professional interests is efficient data sketching. One particular data sketch that I get a lot of mileage out of, is known as a T-Digest. The T-Digest is a sketch that is a very small and very fast approximation of a data’s distribution.
An example of the kind of thing you can do with the T-Digest is compute quantile values from your data very easily, without actually having to store all of your data. Here in this example, you can see how we use a lookup on the compact sketch of our distribution to figure out the 90th percentile of our data.
I won’t be talking the details about how the T-Digest works today, but if you’re interested in learning more about how they operate and what you can do with them, this link on the left will take you to one of my previous Spark summit talks all about implementing and using T-Digests.
So one questions we can ask, does T-Digest behave like an aggregator? Without going into all the details, we can define a accumulator type for the T-Digest, we can define an empty state that acts like a zero, and we can define methods for updating one of these objects with a single data element, or merging two of them, and if we want to, we can also define a final presentation function. For instance, extracting a single quantile from the structure.
So the answer is yes, the T-Digest absolutely can behave just like an aggregator.
Now why is that exciting? Well, Spark doesn’t just provide us with predefined aggregators, it also allows us to define our own custom aggregating data structures. Here in this example, I am creating a user-defined aggregator function for my T-Digest, and then I’m also declaring two regular user-defined functions to extract a median and a 90th percentile from that structure.
What can I do with that? If you define one of these user-defined aggregating functions, you are allowed to use it in Spark structured streaming exactly as if you would any predefined operator. So you can see in this example, I have some streaming numeric data and I’m using the user-defined aggregator, that I declare on the previous slide, directly in the structured streaming expression to get streaming median and streaming 90th percentile on some data and also group by a windowed aggregator. It’s a very powerful concept.
I found the inner section of data sketching and user-defined aggregators and structured streaming to be so powerful, that I eventually gave several Spark Summit talks, at various intersection points of these topics. This slide here, each of these has a link to the actual talk that I gave.
As we all know, a love never goes smoothly forever.
To explain how it all started to go wrong, I have to review the actual anatomy of a user-defined aggregating function. The core of any implementation is the initialize method, which is your empty state, or zero state, and the implementations for your update or merge methods.
I wanna particularly call your attention to the update method. You can see from the function signature that it stores the actual aggregator structure in an aggregation buffer, and that is actually just another kind of subclass of the standard data frame row that we’re all familiar with.
In order to tell Spark how to do that, you have to define a user-defined type for your aggregator, which you can see defined here on the bottom, bufferSchema method.
Let’s look at this user-defined type.
The core of a user-defined type is a schema which describes to Spark how your object is actually stored in the aggregation buffer row, and then methods to pack your structure into that row and then unpack it later, which are serialize and deserialize.
The key take-away from this is that packing and unpacking these structures is generally an expensive operation. In particular, for T-Digest structures, it’s quite expensive. The T-Digest is a fairly structured data type and it costs real compute to pack and unpack this from an aggregation buffer.
One day, as is traditional, I was debugging these serializers, and I put in some print statements to see what I was actually doing with my data structures.
Let me pause here to explain what I was expecting from this experiment.
The only time you should, in theory, need to serialize one of these aggregators is after you’ve finished updating from a single partition so that you can send them across the network to collect them at whatever node is doing the final merge operation. You can see these serializations over on the right. In this example diagram, I would be expecting my print statement to run three times.
Now when I actually ran this in Spark on a test data frame with a thousand rows of data, what actually happened was, I saw that my print statements were executing a thousand times. So they were executing once for every single row of my data, not just once per partition.
This was very puzzling to me, so I started diving into the Spark code and I found something peculiar about this update method and the user-defined aggregators. To explain what’s going on, let’s take this definition to the top and rewrite it to unpack what’s going on. The first thing that has to happen, is it has to deserialize my object from the aggregation buffer.
Then it does the actual data update, which is the only part of this operation that we really care about, and is generally quite fast.
Then lastly it has to store our aggregator back on the aggregation buffer, and so it has to reserialize the structure.
Now, this is happening, as you can see, every single row of the data and you can also see that the necessity of doing all the serialization is baked right into the definition of the structure. It can’t be fixed simply by tweaking some interior code. It’s fundamental to the design of the mechanism itself.
As one does, I filed a Spark JIRA to describe the problem I was seeing, and then we all discussed it up on the Spark dev mailing list and as we saw in the previous slide, this problem is not easily fixable. If it’s fixable at all, it would have to be fixed by redesigning the entire mechanism.
We never want to give up on the ones we love, so last summer I filed a pull request for Spark where I proposed a new mechanism for doing user-defined aggregation that didn’t have the serialization necessity.
I’ll be explaining now how that works. In order to do that, I first should explain that Spark actually provides two completely different mechanisms for implementing user-defined aggregation. The first one is one we’ve already seen, the user-defined aggregator. The second one is one you see on the slide here, and it’s simply called aggregator. This slide has an implementation of T-Digests in this new aggregator interface. The first thing I wanna point out is that you can see that the implementation in this case fits entirely on a slide, there is no code that I had to leave out. So the aggregator interface is a much simpler and cleaner interface to work with.
The second thing I wanna point out is that if you look at the reduce method, and this is like the update method from the slides we had before, it’s not operating using a aggregation buffer, it’s operating directly on my T-Digest data type. So there’s no serialization or deserialization required.
Thirdly, you can see at the bottom, that it uses the encoder interface to describe the serialization. This is a much nicer, more capable interface to work with, and more versatile.
Lastly, the main point is, in this new mechanism, this interface, the aggregator interface, is the only interface going forward, so there will no longer be two separate ones, which has been a source of design confusion.
As I mentioned, the new update method in this interface requires no serialization for each data element so we’ve now gotten back to the intuitive serialization behavior that we want, which is to say it’s serializing only once per data partition, just like we would expect.
What does it look like to use this in code? Here’s a short example where I declare one of these T-Digest aggregators, and then if I want to use this in dynamically typed data frames, I call a new SQL function, which is just called UDAF, you can see in the second line, which registers this aggregator as a specialized user-defined function. When I have that, I can use this function in the standard data frame or structured streaming interface.
What has this new interface actually bought us in terms of performance? This is a short example from Spark shell, where on the top I do an aggregation using the old user-defined aggregation function style and then I follow it up with exactly the same aggregation using the new style. In the bold, you can see the resulting timings.
If we do the math on these numbers, we find out that the new interface runs 70 times faster than the old interface. I want to emphasize that this is doing exactly the same aggregation on exactly the same data. The only difference is the new interface allows us to do the aggregation only serializing at the boundaries of the partitions and that’s where all the 70 times speed-up is coming from.
Here’s a good place to mention that one of the reasons this is such a large difference
with T-Digest is that as I mentioned before, T-Digest is a somewhat expensive data structure to do serialization with.
If you are operating with aggregators that you’ve written that have cheaper serialization, you may see less than 70 times speed-up, maybe it’s only 10 times or 5 times. I’ve done experiments with very very simple aggregators, where the difference is almost zero.
However, generally speaking, all of the simple aggregators already come predefined with Spark. If you’re using custom aggregators, it’s very likely that your data structure is something not trivial, so this should be a big help for you.
That is the end of the main story. This is not just a love story about aggregation and Spark, it’s also an open source love story. I wanted to share some things I’ve learned working with open source upstreams, and in particular, very complicated pull requests, like this one was.
The first is to never give up.
When I submitted this pull request, I eventually ended up rewriting the solution from the ground up three different times before we arrived at a solution that we were all happy with and ready to merge.
It’s easy to get discouraged when you have to go back to the drawing board like that, but it eventually, if you don’t give up, you can get great results and you’ll always learn something from the process.
The second one is to be patient. It takes a lot of time to write these implementations and properly test them, and it also takes the reviewers a long time to properly review them.
Between the time I submitted the initial pull request and time we finally merged it, it was basically 6 calendar months.
It’s worth waiting to do the job right, and to get community buy-in, and you know, get the result that you want merged into the code.
Lastly, it is super important that we always respect each other. I received an enormous amount of really great feedback on this pull request and the result that we finally merged was significantly cleaner and easier to use than the solution that I originally proposed last summer.
Having respect for the community of reviewers, and respect for all the people making comments on your pull request is absolutely crucial to submitting a successful improvement to software.
That’s the end of my story.
Erik Erlandson is a Software Engineer at Red Hat, where he investigates analytics use cases and scalable deployments for Apache Spark in the cloud. He also consults on internal data science and analytics projects. Erik is a contributor to Apache Spark and other open source projects in the Spark ecosystem, including the Spark on Kubernetes community project, Algebird and Scala.