Simplify Data Conversion from Spark to TensorFlow and PyTorch

May 27, 2021 04:25 PM (PT)

Download Slides

In this talk, I would like to introduce an open-source tool built by our team that simplifies the data conversion from Apache Spark to deep learning frameworks.

Imagine you have a large dataset, say 20 GBs, and you want to use it to train a TensorFlow model. Before feeding the data to the model, you need to clean and preprocess your data using Spark. Now you have your dataset in a Spark DataFrame. When it comes to the training part, you may have the problem: How can I convert my Spark DataFrame to some format recognized by my TensorFlow model?

The existing data conversion process can be tedious. For example, to convert an Apache Spark DataFrame to a TensorFlow Dataset file format, you need to either save the Apache Spark DataFrame on a distributed filesystem in parquet format and load the converted data with third-party tools such as Petastorm, or save it directly in TFRecord files with spark-tensorflow-connector and load it back using TFRecordDataset. Both approaches take more than 20 lines of code to manage the intermediate data files, rely on different parsing syntax, and require extra attention for handling vector columns in the Spark DataFrames. In short, all these engineering frictions greatly reduced the data scientists’ productivity.

The Databricks Machine Learning team contributed a new Spark Dataset Converter API to Petastorm to simplify these tedious data conversion process steps. With the new API, it takes a few lines of code to convert a Spark DataFrame to a TensorFlow Dataset or a PyTorch DataLoader with default parameters.

In the talk, I will use an example to show how to use the Spark Dataset Converter to train a Tensorflow model and how simple it is to go from single-node training to distributed training on Databricks.

In this session watch:
Liang Zhang, Developer, Databricks

 

Transcript

Liang Zhang: Hello, everyone. Welcome to our data AI summit session on the simplified data conversion from Spark to Deep Learning. My name is Liang Zhang. I’m from the machine learning team at Databricks, and I’ve been working on making ML training on Databricks simpler. I’ve joined Databricks for almost two years now. Prior to joining Databricks, I had a master’s degree from Carnegie Mellon University.
First, let’s talk about the agenda for this talk. I want to start off with a motivation for why we should care about the data conversion from Spark to Deep Learning frameworks. Secondly, I want to highlight some of the pain points in the existing methods to do the conversion. Then, I want to do a quick overview of our dataset converter and how to use it. After that, I’ll give you a demo of an end to end example for using the spark dataset converter with TensorFlow and Pytorch. At the end, I’ll wrap up the talk with some of the best practices for using the converter.
So let’s get started with the motivation for why we should care about the data conversion from Spark to Deep Learning frameworks. It’s supposed to have a large dataset of images from your driving camera and you’ll want to detect the traffic lights in the images. The amount of data is as large as terabytes, and there are new images arriving every day. Before feeding your data into machine learning models, you want to do some data cleaning and labeling. You think, Spark is a good tool to address the first three issues. After [inaudible] dissatisfied with your dataset, you’ll want to use one of the popular learning frameworks to train a model and periodically retrain the model with new data. And finally, predict the label of the new images. So here we find the problem, how to combine the power of Spark with Deep Learning frameworks to extract value out of your data.
If you’re a convenient tool in the position of the red dot that can seamlessly connect the two parts and increase my productivity. Before diving into the answer, I’d like to illustrate the pain points in the data conversion from Spark to Deep Learning frameworks, let’s go through the stages during the development of the traffic light detection project. First, we’ll always start from building a single node training project using a sample of data so that it’s fast to test the correctness of our code and debug potential errors. We’ll probably collect the data into a pandas data frame, or just load a sample of data from the file images. After we are confident about our single note code and we’re ready to train the model, using all the available data. We need to save the data from the Spark data frame into TF record files. If you’re using TensorFlow or other formats, which can be parsed in Pytorch.
Now we can identify two pain points in the proposed workflow. First, the data handling in a single node training code and distributed training code will be very different. To migrate your model from single node to distributed setting, you’re essentially rewriting all the data handling logic. Second, in the distributed training scenario, you need many lines of code to save, load and parse the intermediate files. And it very time-consuming yet unrelated to the novelty of your machine learning model. It would be convenient if we have a tool to do common, boring operations for us and help us to focus on the unique part of your Deep Learning project. So now that has hopefully given you a motivation for why we should care about the data conversion from Spark to Deep Learning frameworks and the pain points for simplifying the conversion.
I want to talk about how you can get rid of these pain points by using the Spark Dataset Converter. In the high level, the Spark Dataset Converter take the data from Spark and returns, a Tensorflow dataset or Pytorch data loader for you. And here is a minimum list example to show you what the converter API looks like. You may notice that the spark dataset converter is built on top of petastorm and can be found in the petastorm.spark package. For those who are not familiar with petastorm, petastorm is an open source library to load parquet files to Deep Learning frameworks.
This graph gives you a closer look at what converter does for you. The functionality of the converter is shown in the red color in this graph. It takes the data from the Spark data frame, check whether the data is already cached, that is, persisted in the distributed file system and persisted, if not. Then the converter will load the persisted parquet files into TF dataset or Pytorch data loader. Specifically, the Spark Dataset Converter has the following features: it will cache the intermediate files and the way it recognized the same data frame is by checking the analyzed query plan. Besides, it will also automatically delete the cached files at program exit. Second, it’s very easy to migrate your code from single-nodes training to [inaudible] if you are using the Spark Dataset Converter to load your data.
You only need to change two arguments and we’ll show it in the demo. Certainly, for those who use the vector assembler to extract feature vectors using spark MLlib, the Spark dataset converter will also help you to automatically convert the MLlib vector to one dimensional arrays in the TF data set or Pytorch data loader.
Now, I want to take the time to do a quick demo of how to use the spark dataset converter.
Just for the sake of time I pre-run this entire notebook, but I want to walk you through the code here. Here I have some data from the TensorFlow flowers dataset. It contains flower photos organized in folders corresponding to their labels. I ETL the image files into a Delta table, which contains two columns, a content column, and a label index column. The content column contains the raw bytes of the flower images and the label index column contains and integer suggesting the species of the flower in the image. We can preview the Spark data frame and we can see the raw bytes are displayed as a base 64 encoded string. I randomly split the dataset into two portions and I re-partitioned the training dataset into two partitions. This because in a later section, I will demonstrate distributed training, which requires the number of partitions equals to the number of workers.
The goal of my model today is to predict which kind of flower is in the image using the pre-trained image model mobile net V2, to extract the features from the image. So here we got the raw bytes of the image, and we need to decode the image into the format that matches the input layer of the mobile net V2 model.
First, we define a pre-process function that takes the raw bytes of one image, open it as a PIL image instance and resize it to 204×224, and then convert it to an array with three channels. Finally, we use the pre-process input function provided by TensorFlow to rescale the values in the array. Now we need to define the second function, specifying how to apply the first function to the entire dataset. The second function should have one input parameter, which will be a pandas data frame.
You can get a sense of what the input data looks like by getting a few rows from the spark data frame and collect them by two pandas. You can also test your transfer inspect function with this sample data frame. What it does is to apply the element wise pre-process function to the content column and save the results into a new feature column and drop the content column. After we get the transform bed function, we need to tell our spark dataset converter to use it during the generation of the TensorFlow dataset. This is done by a transform spec instance. It takes the following information: transform batch function, which we just explained the data type and the shape of the newly added column and the column names of the output data set a data frame. I want to emphasize this part of the transform spec instance. Because this is how we tell the converter about the shaping information about all of the multi-dimensional array and use this shape information to generate the TF dataset. Now you have seen everything about the data. Let’s see how you became build the model.
We will use the mobile net V2 model provided by TensorFlow and load the weights portraying with image net dataset. We don’t include the top layer because we’re going to add our own classification layer. So, you can see it’s very simple to build a model using transfer learning. So the next function returns the compiled to model. So far, we have prepared the data and the model. In the second step, we will feed the data into the training function using our Spark dataset converter.
First, we provide a directory to store intermediate files. Second, you create the converter instance from the Spark dataframe. Here we created two converters: one for training data and the other for the validation data. In the train and evaluate function, we generate those Tensorflow dataset with the following syntax. In the make TF dataset method, we specify the preprocessing logic in the transform spec parameter and also specify the batch size. There are many other parameters that you can specify, but here we’ll just use the default values.
By default, the generated dataset will return each record as a name table where the field names are the column names in the input Spark data frame. Since our caress model accepts tuples, instead of named tuples, we need to convert each row from the named tuples to tuples. We also calculate the steps_per_epoch and use it to control the training process. The same operation are applied to the validation dataset. In fit function, we specify the number of epochs, but here I want to highlight the best practice for controlling the number of epochs for training. Notice that we don’t specify the number of epochs to the converter…and by default, the converter will generate infinite matches of data looping over on the dataset.
We only rely on the fit function to control the number of epochs. This is easier to understand and also benefit the distributor training in the next section. And here is the result of the single node training. To recap, you only need three lines of code to convert the Spark dataset to the TF dataset using the Spark dataset converter. In the last part, I will show you how to use the converter in the distributed training. We use the library called Horovod to do distributed training for those who are not familiar with Horovod, it essentially replicates the model to different workers in the cluster and aggregates the gradients from each replica and apply the aggregated gradients to each replica.
Here, we define the function that will be executed on each worker to do the training, using one partition of the dataset. Everything in this trunk of code is required by Horovod and not related to the data handling. You can find the only difference in the usage of the converter is adding two arguments, specifying the current shard and the shard count. Remember, that we repositioned the data frame into two partitions and we have two workers in our cluster. So, hvd size is two and hvd rank will be zero or one, depending on which worker this script runs on.
Anything below is exactly the same as the single note training code. We can see that it takes two and a half minutes and achieves similar accuracy. To recap, you only need to add two arguments to migrate the single node training data loading, to distribute a training data loading using the spark data set converter. So far, you have seen the end to end example to use the converter with Tensorflow. I want to quickly show you how you can do the same thing with Pytorch. The data loading part is the same and the pre-processing part is also similar, but using the Pytorch transform function to create the transform spec instance. The model creation is also pretty straightforward and the converter is also created in the same way. When it comes to the training code, there is a larger difference in how to use the data loader comparing to the TensorFlow dataset.
Let’s just ignore the details in this function and jump to the next cell. In the train and evaluate function, we generate the data loader from the converter with parameters, including transform spec and batch size, which is very similar as the TensorFlow example. And then we create an iterator of the data loader and pass the iterator into the function called train one epoch and use a four loop to control the total number of epochs.
Similar to the tensor flow example, we also let the converter generate infinite number of batches to simplify the control logic. For distributor training, I just want to highlight the two additional parameters comparing to the single note code that load partitions of data on each worker and everything else looks the same. That’s all I have for the demos.
I hope you get a sense of the simplicity brought by the spark dataset converter. And I hope you can also enjoy the simplicity by training out by yourself. To wrap up this talk, I would like to call out some of the best practices for using the converter. First, if you’re working with image data, you should decode the image in the spec transform spec function, instead of doing the decoding in Spark. That will increase the size of the persistent data and it’s also very slow.
Besides in the transform spec, you can also choose to do some pre-processing in the dataset.map function or in the model as part of the model. You can look for the pros and cons in different methods in the discussion online. Second, as is shown in the demo, we encourage you to generate infinite batches of data using the default value of number of epochs, which is none by default and we didn’t specify it in the demo. That makes the control flow easy to understand, and also guarantees that in distributor training, every worker get exactly the same amount of data. Lastly, we encourage you to convict the lifestyle, the life cycle of the bucket that stores the cache data so that you don’t need to worry any potential data leakage. Thank you for attending the session and I hope you find this talk helpful. Feel free to reach out to me with any questions.

Liang Zhang

Liang Zhang is a Software Engineer in the Machine Learning team of Databricks. She has been working on building tools to accelerate and simplify the data conversion in training machine learning models...
Read more