In the past several years, the pandas UDFs are perhaps the most important changes to Apache Spark for Python data science. However, these functionalities have evolved organically, leading to some inconsistencies and confusions among users. In Apache Spark 3.0, the pandas UDFs were redesigned by leveraging type hints. By using Python type hints, you can naturally express pandas UDFs without requiring such as the evaluation type. Also, pandas UDFs are now more ‘Pythonic’ and let themselves define what the UDF is supposed to input and output with the clear definition. Moreover, it allows many benefits such as easier static analysis. In this talk, I will introduce the redesigned pandas UDFs with type hints in Apache Spark 3.0 with a technical overview.
– I am Hyukjin Kwon, and I am a software engineer at Databricks. So today I’m gonna talk about Pandas UDF and then Python type hints. So Pandas is probably one of the most commonly used Python library for calls and data frames.
So a few years ago, the Pandas UDFs were introduced to improve the performance hugely.
You could just use Pandas API directly in the UDF and that solved lots of problems in the regular Python UDFs, in the Python APIs in Apache Spark. And after a couple of years we started to face some problems and confusions as it rapidly grows.
And at the same time there has been kind of a trend in Python about the type hinting in general. So, it provides a better static analysis with lots of benefits.
But it’s kind of still premature, but it’s encouraged in general. So this talk will cover a short Python type hints focused problem in the old Pandas UDFs and then also introduce new Pandas APIs in Apache Spark 3.0.
Yeah, so, I’m one of the Apache Spark committer, PMC and I’m a software engineer at Databricks and I’m one of the major contributor in Koalas which is Databricks project I’m working on. And I’m pretty active in Github, in particular, the Apache Spark repo.
Yeah, so, today this is my agenda. I’m gonna first talk about what are the Pandas UDFs and Python type hints. And then the problems in the old Pandas UDFs. And then I’ll talk about how I solved this problem by using Python type hints in Spark. And lastly I’ll introduce the new Pandas APIs with Python type hints.
So, first of all, to just give you a bit of background, I’ll go through the Pandas UDFs and then how it works first.
So, in general, Pandas UDFs or regular Python UDFs or Python rich APIs serialize the data in functions from JVM to Python. So in Pandas UDFs specifically, it uses Apache Arrow with almost zero cost when they serialize and de-serialize. And also Pandas UDFs can perform the vectorize operations and serialization which improve the performance hugely. In addition they allow you to use the set of rich APIs in Pandas and Numpy.
So here is an actual example. So as you can see it adds a one to each value, so it’s wrapped by Pandas UDF separator. And then the UDF evaluation type is specified as scalar so it’s a scalar Pandas UDF. Inside the function it adds one and it’s vectorized, so it takes modern CPU design into account such as SIND or type lining. On the right side this is how the function is applied against the actual data. So the black color is kind of the logical representation of a Spark data frame. So it has columns and then the column has values. Physically they are distributed but logically they are one data frame. And the red color represents the physical unit that the function actually takes as input and output which are Pandas arrays.
So, I will talk a little bit more about the internal and how it works. So let’s suppose we have Spark working. So, we have the three workers, and then the master and drivers are not shown for simplicity here. So usually Python workers are not created when it does not use Python native functions. Only when the Python related operation happens, then it lazily creates the Python process to interact with that. So, once you create a Spark data frame and it performs an operation, this is how actually it looks like. So it loads partitions into workers and then starts to process.
So, when they have to execute the Python functions for Pandas UDFs, it serializes and then sends the data from JVM to Python processes. So each partition becomes multiple Arrow batches and then transfer to the Python processes.
Each Arrow batch is converted to Pandas arrays and then the usual functions are executed finally together. So the function pandas_plus_one takes the Pandas series and it adds one in vectorized manner by utilizing the Pandas and Numpy and then it returns the output arrays. After that, we return the user functions are converted back to Arrow batch and then they become the Spark partitions.
So, here is the performance difference between the regular Python UDFs and Pandas UDFs. So as you can see in the Pandas UDFs there’s a huge performance improvement when you compare it to regular Python UDFs.
So, this is how the Pandas UDFs work in general and then before we talk about more how Pandas UDFs and the Python type hints work together, I will briefly talk about what are the Python type hints first.
So, the Python type hinting is basically a way to express which type is expected for a variable or function’s input or output. So, the typical Python code like the first example don’t have the types because of the nature of the typing in general. So usually you don’t know which type is expected before you actually execute the functions or read the documentation. And then the second example shows the Python type hints, so like the typical Python codes, they are completely optional, and you can easily tell which types are expected for input and output.
The Python type hinting was introduced from Python 3 so this is the standard way of annotating the types and you can use it for instance for better static analysis or possibly run time checking or code generation, potentially.
One of the benefits of Python type hints is IDE support. So this is my PyCharm, and when the functions prescribe their Python type hints correctly, and then users can see the warnings when they call it with a different type. So for example there’s an API here called merge and then when I use the string for the second argument that should be a data frame, it shows a warning because the type is not matched. So developers can easily avoid such simple but very common mistakes.
Another benefit is the better static analysis and auto-documentation. I use MyPy in the project I’m working on. The warnings you’ve seen in the IDE can be actually actual errors from the static analysis. So again it makes sure you avoid such basic mistakes automatically, and it is not possible in the regular Python codes without type hinting. So, this is one of the benefits, and then another one is that it also can automatically document your code. So you don’t have to list off which types are expected for the input and output, because they are automatically documented. The second example is that this pink generates the documents for the types.
So, Python type hinting is still kind of premature but it is in general encouraged because of many benefits. There are so many tries out there, for instance Pandas itself is trying to have Python type hints and PySpark itself has the type hinting support although it’s currently a third party library at this moment.
Right, so now we know what is Pandas UDFs and then Python type hints. So, I’m going to talk about what kind of problems we actually faced in the Pandas UDFs and then why we had to redesign Pandas UDFs here.
In Spark 2.4 we have three kinds of Pandas UDFs. The first one is the scalar Pandas UDF. It transforms the values from a to b using Pandas APIs. The second one is the grouped map Pandas UDF which splits the data to each group by the given function and then applies the function, and then combines back to a regular Spark data frame. The last one is the grouped aggregate Pandas UDF. It does exactly the same thing with the grouped map Pandas UDF but it’s required to aggregate each group to a single scalar value. Actually the return value from three Pandas UDFs are different for each type of Pandas UDFs. In case of scalar and grouped aggregate, it returns a regular Spark column. That is actually the same with the other Spark columns, so you can operate or perform with other Spark columns or expressions. However, the grouped map Pandas UDFs returns a Spark data frame, so there’s difference here. I will talk about this a bit more later.
In Spark 3.0 there are even more new types of Pandas UDFs implemented. So the first one is the scalar iterator Pandas UDF which allows you to use an iterator within the Pandas UDF. The second one is the map Pandas UDF. This also allows you to use an iterator, but it’s more like a map partitions in other APIs.
The last one is co-grouped map Pandas UDF. This is similar with the grouped map Pandas UDF but it allows you to co-group in two different data frames. And in this case, the scalar iterator Pandas UDF returns a Spark column and then the other returns a Spark data frame.
so here is the complexity and confusions. So the first problem is that the increasing number of Pandas UDFs started to make confusions for users. For example the three Pandas UDFs here above all do the same.
But the type of the Pandas UDF is different and then the input output for all the functions is also different. It’s difficult to tell what are expected.
The first scalar UDF takes and outputs a Pandas series and then the second one requires an iterative series. The last one expects a data frame for its input and output. So it’s difficult to tell what types are expected in the function. Also it’s not very clear how each UDF works and then why we should specify each UDF type.
And then here is the second problem. So the first example used the scalar Pandas UDF and it adds cosine to the results by using Spark expressions. So as you can see it works fine. But the second example uses grouped map Pandas UDF and there’s no way to directly add the cosine.
This is because the first example of the Pandas UDF returns a Spark column instance that can be mixed with other expressions or functions. However the second Pandas UDF returns a Spark data frame instance. So some of the Pandas UDF return a Spark column but some of them return a Spark data frame. Internally the first example creates an expression but the second example creates a query execution plan. So both are technically separate concepts but they’re in the same category as the Pandas UDF.
So Spark committee had to discuss a lot to address the problems, and then finally we kinda made it. So now finally I’m going to talk about, I’m going to introduce the new Pandas APIs with Python type hints.
So, we came up with two main points for the redesign.
So the first is the Python type hints. In Pandas UDF context, there are three advantages. Firstly, it’s self descriptive, so the user defined function describes itself and knows what to expect for input and output. It’s very easy to tell what a function is supposed to do.
The second this is the static analysis. As I said, the IDE supports Python type hints and then other tools like MyPy can detect common mistakes about the typing that Python developers usually make. And lastly, it automatically documents your Pandas UDFs. So you don’t have to bother to document which arguments should be which type in the documentation. So by using Python type hints, the UDFs can be simplified like the left side.
Then the second point is the API separation. Some Pandas UDFs return a Spark column but the others return a Spark data frame. So they’re now separate. So the former stays as a basic Pandas UDF as is, it still returns a Spark column, and can be mixed with other expressions or functions, but the latter became a second API group called Pandas Function API. So they work like an API in data frame, in general, as query execution plan that holds expressions internally.
So now I’m going to introduce the new Pandas UDFs one by one. So the first is series to series Pandas UDF. Previously it was called the scalar Pandas UDF but now it’s called series to series Pandas UDF. So in the new style you don’t have to specify the Pandas UDF type, only the function itself describes the input and output itself.
The second one is the iterator Pandas UDF. So this is a new Pandas UDF introduced in Spark 3.0. So this can be used in both old and new styles because the old style isn’t deprecated for now.
The function expects an iterative series and the total length of the output and input should be the same.
This is also iterator support in Pandas UDF. The input style, it takes a tuple of series in the iterator. So the old characteristics are the same with the other iterator support like the length of input and output should be the same and then the StructType is represented as a Pandas data frame.
These two iterator support in the Pandas UDF are actually pretty useful, especially when you have an expensive computation to have one state to share.
The example on the left side computes the expensive initialization and then uses it for the whole iterative process. So this could be a common case in machine learning when you for instance load a model.
Another example is that when you pre-fetch. For example you could consume the input iterator in a different thread to save on waits and then computes the output in the main thread. It could improve the performance of your Pandas UDF.
The last one is the series to scalar Pandas UDF. This way was previously the group aggregate Pandas UDF so it expects a series to be aggregated to a scalar value.
The scalar value should be a Python primitive type or Numpy data type. The old style of UDF on the right side can be converted back to the new style on the left side.
This is the new category introduced from Spark 3.0. The Pandas function APIs create a query execution internally and Pandas UDFs create the expressions. At this moment, the Python type hints in Pandas Function APIs are completely optional.
You can omit the Python type hints but it’s encouraged to use Python type hints.
This is the first one, this is the grouped map Pandas Function API. It was previously grouped map Pandas UDF. So it splits each group by the given condition, and it applies the function, and then combine back to the Spark data frame. The function expects a Pandas data frame as input and output.
The second one is map Pandas Function. This is also introduced in Apache Spark 3.0. This is similar with map partition in other APIs. So you can transform your Spark data frame by using Pandas data frame. It shares the same characteristics with iterative version of Pandas UDF but one difference is that it doesn’t have the length limitation. So you can return arbitrary length at the output.
Lastly, co-grouped map Pandas UDF function. No, sorry, I mean lastly co-grouped map Pandas Function API. So this works similarly with the grouped map Pandas Function API but it can perform with the two different data frames.
For instance if you have two data frames and say they’re grouped by the same condition then you can actually do something like as a join, for instance, or things like that. And then you can combine to single data frame. So it’s pretty useful.
Okay, so to recap. The Pandas APIs now leverage Python type hints with many benefits such as static analysis, auto-documentation, and self-descriptive UDFs. The old Pandas UDFs become separate groups
into Pandas UDF and then Pandas Function API. The supports of the iterator and then co-grouped map and the map transformations are available in Spark 3.0.
Yeah, so that’s it for now.
Hyukjin is a Databricks software engineer, Apache Spark PMC member and committer, working on many different areas in Apache Spark such as Spark SQL, PySpark, SparkR, etc. He is also one of the top contributors in Koalas. He mainly focuses on development, helping discussions, and reviewing many features and changes in Apache Spark and Koalas.