Fully Utilizing Spark for Data Validation

May 28, 2021 11:40 AM (PT)

Download Slides

Data validation is becoming more important as companies have increasingly interconnected data pipelines. Validation serves as a safeguard to prevent existing pipelines from failing without notice. Currently, the most widely adopted data validation framework is Great Expectations. They have support for both Pandas and Spark workflows (with the same API). Great Expectations is a robust data validation library with a lot of features. For example, Great Expectations always keeps track of how many records are failing a validation, and stores examples for failing records. They also profile data after validations and output data documentation.

These features can be very useful, but if a user does not need them, they are expensive to generate. What are the options if we need a more lightweight framework? Pandas has some data validation frameworks that are designed to be lightweight. Pandera is one example. Is it possible to use a lightweight Pandas-based framework on Spark? In this talk, we’ll show how this is possible with a library called Fugue. Fugue is an open-source framework that lets users port native Python code or Pandas code to Spark. We will show an interactive demo of how to extend Pandera (or any other Pandas-based data validation library) to a Spark workflow.

There is also a deficiency in the current frameworks we will address in the demo. With big data, there is a need to apply different validation rules for each partition. For example, data that encompasses a lot of geographic regions may have different acceptable ranges of values (think of currency). Since the current frameworks are designed to apply a validation rule to the whole DataFrame, this can’t be done. Using Fugue and Pandera, we can apply different validation rules on each partition of data.

In this session watch:
Kevin Kho, Open Source Community Engineer, Prefect

 

Transcript

Kevin Kho: Hey, everyone. My name is Kevin Kho. I’m an Open Source Community Engineer at Prefect, where I work on Data Orchestration, Data Flow, Workflow Orchestration. And today I’ll be talking about fully utilizing Spark for Data Validation. So we will begin by talking about Data Validation, what it is, when you should use it and then we will go over two of the most popular Data Validation frameworks in Great Expectations and Pandera. Pandera is only available in Pandas at the moment, but for our use case, we want to use it on Spark so, that’s where we will introduce Fugue. Fugue is an obstruction layer that will allow us to use Python and Pandas packages on Spark. And then interestingly, by combining Fugue and Pandera, we will be able to achieve validation on the partition level, which is something that Data Validation frameworks don’t allow for yet today.
So for this Data Validation topic, we are going to be going over a case study with this fictitious company called FoodSloth. And FoodSloth is a food delivery company, that is interested in having demand pricing. The plan is to update the price of the service for each location every 10 minutes. But we want to make sure that the prices that are being displayed on the app, are reasonable and that nothing is too ridiculous for customers. So to execute this first, we will have a model training pipeline, and this can run on a weekly or monthly basis. The point is that, this pipeline is not particularly time sensitive. And on the other hand, we will have an inference pipeline that will be running every 10 minutes to output the new prices. And for this one, we need a lightweight Data Validation system that will execute within a reasonable amount of time.
So on the model training side, Data Validation is normally done when we load the data from the source. We check it to see if certain assumptions that we have are true before we start our model training. This guarantees that the model training is accurate and nothing surprising will happen when we deploy that model. On the price inference side, we are going to stick Data Validation just right before we push the prices out to the app. So we load the data from the source, we will run our model and get the new prices. And then, we will run the Data Validation, to just check if those prices are reasonable before we update the app. So for the first weekly pipeline, the Data Validation framework is not time sensitive. And for the second pipeline, the inference pipeline, our data Validation needs to be as lightweight as possible.
So some common validations that are used, first, we can check if no values exist in data. And this is if we are using it on data that cannot be known and will break downstream, then we can touch these areas ahead of time. We can check for Schema. Does the dataset have the columns needed? Are they in the right data types needed, so that nothing will break downstream? We can check the shape. So we want to make sure that we are not losing any records when we perform our data transformation and that our data is the way that it matches the way that it entered our pipeline. And lastly, for our particular use case, we want to make sure that the numeric values are within a certain range. So for our use case, we want to make sure that the price that is being predicted, is within a reasonable range before it goes out to production.
And with that, we will talk about the first Data Validation framework that we have today in Great Expectations. So, Great Expectations is a framework that has the same interface for both Pandas and Spark. And it’s the most popular Data Validation framework today. Here, we will show a code snippet on how to use Great Expectations in a notebook. First, we import Great Expectations, the Spark DF Dataset, and then we pass our Spark Data frame into the Spark DF Dataset and this Spark DF Dataset inherits the Spark Data frame. So it has all of the operations you can do on a Spark Data frame. But added to those operations, we also now have these expectations that come along with the Spark DF Dataset. So in the last line of code here, we are using the expectation expect column values to be between. And then we specify the column to execute this on, along with the minimum and maximum value and the mostly parameter, the mostly parameter is a very interesting part of Great Expectations.
Here we specify that mostly equals 0.95. What this means is that 95% of the records should fulfill this expectation. If less than 95% of their records fulfilled this, then that means we failed. But if only one or 2% of records failed this expectation, then this is treated as a success. And the last thing to note is the result format. There are varying levels of details that we can get with the result format. And I will show an example of that in the next slide. Here’s an example of the detailed results that came out of the expectation that we just ran. First, you will see on top line, that we failed the expectation success equals success is false, and then a bit lower you will see result.
The result gives us information about the expectation, how it failed. For example, it gives us what percent of records failed specifically, how many those were, what was the missing count in that column? What are some examples of records that failed and that comes in the partial unexpected list at the bottom, where you see 96, 98, 96. So those are values that were not in the range that were expected. One really nice thing about Great Expectations, is that under the hood, it’s already generating an expectation suite, a config when you’re running your expectations. This expectation suite can then be output into Data Documentation so that you can keep track of what expectations were run and how your dataset performed, and whether or not it passed or failed those expectations.
If you’re starting from scratch, you can also use the data profiler tool that create expectations has, to generate this Data Documentation. And what it does, is it goes into a dataset and looks at the Statistics, the Quantiles and tries to get some summary statistics over your data so that you have a baseline set of expectations that you can start with. So after profiling, you get a general, a basic set of expectations that you can then use in the future as a starting point. So, Great Expectations in general has a very robust ecosystem, has a lot of tools that you can use and a lot of features. But of course this comes with a price. There is some overhead to use this. And for our use case, this makes a lot of sense, in the training pipeline, but it doesn’t make as much sense in the inference pipeline when we need a lower latency model.
So, is there a more lightweight framework? To answer this question, we’ll take a look at what Data Validation frameworks are available in Pandas, and that’s where we run into Pandera. So Pandera is only available in Pandas. It’s more lightweight than Great Expectations. There’s a lot of built in validations, similar with Great Expectations. There are also a lot of statistical validations like T-tests. And one nice feature about Pandera is because the API is lighter rate, it’s much more easy to extend and given the lightweightness, we want to apply Pandera on Spark. But before we dive into that, first we’ll show what the Pandera sample code looks like.
So here we import Pandera on the top line and we import column check and data frame Schema. With Pandera, you’re defining a data frame Schema, and the data frame Scheme is, comprised of this column class. And this column class is, comprised of multiple checks. So here we create a Schema, that’s going to apply on the column called price. And we’ll pass in this column class with the check in range. And this is the check that checks that the values are between this minimum and maximum. So in this specific use case, we’re saying check it that the column is within the range, minimum value equals five maximum value equals 10.
After we make the Schema, we can simply call the dot validate method and apply it on a data frame. So here we say, price check dot validate df, and it will look for the price column in this data frame and execute that check. So just a quick comparison of validation frameworks. On one hand, we have Great Expectations which has Spark support. It has a very nice feature that success criteria can be flexible with the mostly parameter and you can detail what percent of records can fail and still be a success. There are very detailed outputs and Data Documentation. You can also have notifications in production like Slack messages, if your pipe Data Validation, pipeline fails. And they also have a CLI that you can use to set up your validations. Great Expectations has a lot of features, definitely more than can be covered in this talk. In general, it’s just a very robust and very big ecosystem of tools. Pandera, on the other hand, is only available on Pandas.
It’s more lightweight. It’s less invasive towards code. You can easily make Schemas and use decorators in your code to apply it. So on one hand, we have Great Expectations, which makes a lot of sense for training where it’s not time sensitive. But pipeline that is expected to run every 10 minutes where we need a more lightweight framework, we want to use Pandera, but Pandera does not have Spark support. And that brings us to the next topic.
How can we use Pandera and Pandas based libraries in Spark, and that brings us to Fugue. So Fugue is an abstraction layer that allows Python and Pandas users to take their code and apply it to Spark and Dask. In general, the interface is friendlier than the Spark user defined functions, and I’ll show an example of the interface later. The goal of Fugue is to allow users to decouple their logic and execution. So users should only have to define their logic once in Python or Pandas and then during runtime, determine the execution engine that they want to run it on, whether it’s Pandas, Spark or Dask. The goal is to allow users to write code once in a scale of agnostic way and allow them to scale seamlessly when needed, when there’s more data or when they need to take advantage of distributed compute.
So the Fugue architecture is as follows. There is a Python API and SQL API and user users will write code in either Python or SQL and this gets translated. Fugue is an abstraction layer that takes this code and maps it to the execution engine, such as Pandas, Spark or Dask. One important note is that Blazing SQL extension for Fugue was released recently. So now you can use standard SQL on top of Fugue and run it on Spark SQL or Blazing SQL. You can also run it on Dask. So Fugue is one of the SQL engines for Dask.
So here we have an example of the basic code snippet with Fugue. You can see on the first line that we define the Schema, we are outputting all of the columns that came in and creating a new column called filled. The filled, is of type double. This Schema hint is enforced during execution. Then we define of native Python function called fillna. In practice, we won’t be using base Python for fillna because Pandas has its own, and it’s very easy to use, but this is just to show that we can create a function with base Python. It takes in a type DF Iterable, and it outputs Iterable as well. Then in this function, we look through the rows of the data frame and we create a new item in the dictionary. We create this filled item and we fill it with either the value that existed or the value that was passed in. And then we yield the row.
Notice that this function is purely Python, and it’s not coupled to any framework. It does not rely on Pandas or Spark. And then on the last two lines, we define the Fugue workflow, context manager, and we pass it in the Spark execution engine. This passing in the Spark execution engine, makes our code run on Spark. If we wanted to run it on Pandas, we can pass in the native execution engine. And on the last line we load the parquet file and we transform it with this fillna function. Notice that the content inside the context manager, is also is not tied to any framework and it’s agnostic to scale. So the only line that tells us the scale or the execution engine is the Fugue workflow.
So now, how do we combine Fugue and Pandera to bring Pandera into Spark? So the first three lines should be familiar. That was the price check we did earlier with Pandera wherein we created a data frame Schema and we put a check on the column price. The main value was five and the max value was 10. And then we create a few transformer function with the Schema in, as seen in the next lines. And then we create a new function called price validation. As I said earlier, we can pass in Pandas data frames for input and output types. So we pass in an input type of Pandas data frame and an output type of Pandas data frame. And then we run the, validate method, on the DF. Now with the last two lines, it’s just a matter of executing it with the Spark execution engine.
We set up our Fugue workflow and pass in the Spark execution engine, and then we apply the transform method to the DF to DF equals DF dot transform, and our price validation that was defined. Now, this applies to check of Pandera in the Spark execution environment. But interestingly, by combining Fugue and Pandera, we can also achieve validation by partition. We can have different validation Schemas for each logical partition of data. And here is some example use case of that. So let’s say FoodSloth was in one location, and all our data was validated for that location’s prices, let’s say, Florida. Now our company is expanding and we have services provided in three states. These three states will have different acceptable ranges of prices. So for example, Florida may be lower than California. So we need different rules of validation for each geographical region. So here is an example of what that data will look like.
First, we take note of the location column. We have three Florida locations, and four California locations. In general, the Florida prices will be lower. And so we want to apply a different validation on the Florida set of data and the California set of data. So the plan is to partition based on location and apply a different validation for each of those locations. So here’s how we can execute the validation by partition with Fugue. The first six lines of code should be very familiar. They’re the price check again with the data frame Schema where we create a check that with the min value and the max value and apply it on the column price. We just have one check for Florida and one check for California. And then we create a dictionary called price checks, where we have the key of the state, and then the value being the Schema.
Then we modify our price validation function from earlier. This time, we have to pull in the location that we are applying this to. And then, from our dictionary, we pass in the location to get the appropriate data frame Schema. After we get the data frame Scheme, it’s just a matter of running the, validate method on the data frame to execute that check. Now in the last two lines, we can set up our Fugue workflow and pass into Spark execution engine again. And then for the last line, notice that we partition by location. So in the earlier data frame, we’re splitting that big data frame into smaller data frames. And by the time it hits the price validation function, it’s already partitioned by location. So by the time it enters the price validation function, there is a California data frame, and there is a Florida data frame. And that’s why we can pull the location by getting the first value under the location column.
After we partitioned, we just transform with the price validation. Spark users mean may notice that this is a lazy execution at the moment, because we don’t have any print, any show, statement, or saved statement. By default, the Spark execution engine with a Fugue workflow will respect the lazy evaluation of Spark. If this were a Pandas execution engine, then it would be evaluated immediately. And with that, I just want to summarize my talk.
So today we talked about Data Validation, what it is, and when you need to use it, a Data Validation is important to make sure that our data pipelines are running as expected and that they don’t break unexpectedly if bad data comes in, or if there are modifications to the pipeline. We went over two Data Validation frameworks, and Great Expectations and Pandera. Great Expectations has a very robust ecosystem has a lot of features whereas Pandera is a more lightweight option. Pandera does not have a Spark API, but we can combine Pandera and Fugue to get Pandera running on Spark. And then interestingly, we were able to achieve partition by validation, which is currently a weak point for Data Validation frameworks. None of the Data Validation frameworks today have an option to have a group by validation, and we can achieve this using Fugue and Pandera. Just to be clear, Fugue can also work with Great Expectations and apply Great Expectations on each partition. In general, Fugue is a library that will allow you to use Pandas based libraries into the Spark environment and allow you to do it on each partition.
So thank you for coming. And I hope you check out the few projects and both of these Data Validation frameworks. Thank you.

Kevin Kho

Kevin Kho is an Open Source Community Engineer at Prefect, where he helps users with workflow orchestration. He was previously a data scientist for 4 years, most recently at Paylocity. Outside of work...
Read more