Migrating ETL Workflow to Apache Spark at Scale in Pinterest

May 27, 2021 03:15 PM (PT)

Download Slides

Pinterest is moving all batch processing to Apache Spark, which includes a large amount of legacy ETL workflows written in Cascading/Scalding. In this talk, we will share the challenges and solutions we experienced during this migration, which includes the motivation of the migration, how to fill the semantic gap between different engines, the difficulty dealing with thrift objects widely used in Pinterest, how we improve Spark accumulators, how to tune the Spark performance after migration using our innovative Spark profiler, and also the performance improvements and cost saving we have achieved after the migration.

In this session watch:
Daniel Dai, Software Engineer, Pinterest
Zirui Li, Developer, Pinterest

 

Transcript

Daniel Dai: Today, we are going to talk about how we are migrating ETL workflow to Spark at scale in Pinterest. First, a little bit about ourselves. This is Daniel Dai. I am a Tech Leader of the data processing platform team in Pinterest. I’ve been there for about two years. Before that, I work in Cloudera, Hortonworks and Yahoo. I’m a PMC member of Apache Hive and Apache Pig. My current focus is to increase Spark adoption in Pinterest.

Zirui Li: And hi, this is Zirui. I’m a Software Engineer at Pinterest Spark platform team. I’ve been building Pinterest’s Spark platform and develop Spark functionalities, such as our in-house PySpark platform, Spark History Server, Spark tooling and optimizations and so on.

Daniel Dai: This is today’s agenda. Firstly, we are going to talk about how Spark is used in Pinterest. Second, we will talk about the approach we took to convert our Cascading/Scalding application to Spark. Third, we will talk about the technical challenges in the migration and how we solved them. Next, we will talk about the process and the tools we developed to support the migration. And last, our result so far and the plans for the future.
First, how Spark is used in Pinterest. Our platform is built on AWS. However, we don’t use EMR or other high level services. Instead, we are building our software stack on ECQ by ourselves. The reason for that is, first, we want to build a vendor-neutral solution so we can avoid vendor lockdown. Second, we want to provide timely support to our internal users by our own team. In the diagram, we can see, we store everything on S3. We build our own Hadoop cluster with a YARN and HDFS. Here, HDFS is very light and we only store HBase files, intermediate data, job resource files, job history data, and the Spark history logs. Or as a permanent storage, we have got S3. This not only provides flexibility of the benefit of storage in a computer separation, it also saves costs. We do have some clusters in the past to store permanent data on HDFS. However, we found it’s very costly to maintain HDFS storage. And last year, we migrated almost all permanent stared to S3 and we can save 80% of the cost by doing that.
We have built a couple of clusters. The size of the cluster varies from dozens of nodes to 1000+ nodes. All the cluster can theoretically run all YARN workloads, including Spark, Hype Cascading/Scalding, Hadoop streaming. But in reality, some clusters are optimized to run Spark-only workload. So, no matter we are still running on those clusters. For those clusters, we choose to use R5D EC2 Instance for two reasons. First, it has faster local disk. So, the Truffle performance is a much better than other EBS-based instances. Second, it has higher memory to CPU rate. So, it is suitable for Spark, which tends to use more memory than MapReduce. We are also building a cross-cluster logging layer so we can automatically load this backup application to the best cluster based on the application type, application tier, S, A or O and the cluster availability, etc.
Most Spark application in Pinterest are running on version 2.4. As I said earlier, we build our own software stack using open source version of Apache Spark. We have quite a few internal patches, which involving history server, accumulator, metrics, performance improvements. We will talk some of this in the rest of the talk. We already lay other plans to upgrade to Spark’s 3.1 this year, and hopefully we can talk about it in the future. We have both production job and ad hoc job running on Spark. Production job is usually scheduled by Airflow, and they can be Spark SQL, PySpark and Native Spark application. For ad hoc use cases, user can submit Spark SQL via Querybook, which is a Pinterest project we recently open-sourced. It is great to submit a SQL query against multiple back-end engines, such as Spark SQL, Hive and Presto. User can also submit ad hoc PySpark application via Jupyter Notebook.
Our Spark workload currently represents 40% of resource usage in terms of CPU cost. One year ago, the number is only 12%. This change is driven by our strategy to onboard new use cases using Spark, and in the meantime, actively migrate a legacy MapReduce application to Spark. The first migration we did is Hive to Spark SQL migration. This is because most Hive queries can run in Spark SQL with no or very little change. So, it is relatively easy. We have been working on this migration since last year, and now most Hive queries has already migrated to Spark SQL, and we plan to finish this migration by Q2 this year. To support the migration, we have developed a tool called automatic migration service to do this migration without or with very little user intervention. The same tool can be modified to facilitate other migrations, and we will talk about later.
Besides that, we also plan to migrate the Hadoop streaming application to Spark RDD pipe operator, and migrate Cascading scale application to Native Spark. Most migrations are still at early stage.
Into this talk, we will focus on Cascading/Scalding to Native Spark migration. We can see Cascading/Scalding still represents half of the resource usage in Pinterest. This is the biggest chunk remaining for us in app reduce. The major use cases for that are ETL, which are very large jobs and tend to produce a large volume of output data. In the near future, we will consolidate our batch processing in Spark. We use Spark SQL for query processing, except for interactive queries, which will continue to use Presto. We will use Spark IPP or eight frame for ETL, and will also support PySpark for machine learning use cases. Now, let’s switch to Zirui.

Zirui Li: Thanks Daniel for giving us the overview of Spark at Pinterest. As Daniel just mentioned, the majority of our ETL pipelines are still in Cascading/Scalding and converting them to Spark takes a significant role in transition from MapReduce to Spark. And we’ll go through the approaches with you during the conversation and share the lessons we learned. Let’s first take a quick glance at Cascading/Scalding and their features.
Cascading getting jobs are usually regarded as a simple DAG where each pipe is connected via pipe assemblies. There are only six different pipes, and we’ll give more details in later slides. Based on these six pipes, there are two common Cascading application patterns. Pattern one’s central operation is GROUP BY. And the pattern two is based on COGROUP, which is equivalent to join in Spark. One thing to mention is the Cascading UDF tends to include most logics, either in the map side were each pipe, or reduce side were every pipe. The last thing about Cascading is that it’s based on Java API. Scalding, on the other hand, provides a rich set of operators on top of Cascading. Scalding’s operators are based in Scala API, and share a lot in common with Spark RDD which paved the way for Spark conversion.
One thing to emphasize is, we want to get rid of the Cascading/Scalding syntax entirely during the conversion, and usually we will have three options to take. The first one is Native Spark. This is the most established path which works well for most Cascading/Scalding applications and our default option for general cases. The second is Spark SQL, which brings the benefits that SQL is a time-tested language and easy to migrate to other engines in the future. However, for Spark SQL, the UDF interface is private, and UDF API may not be reusable across different SQL engines. So, we would only recommend it if there are not too many UDFs. The last option would be PySpark, which take advantages of rich Python libraries, but suffering from suboptimal performance, especially pretty slow Python on UDF. So, this option is only suitable for certain machine learning use cases. The Native Spark would be our focus in the remaining sections.
There are two available API is for Native Spark option. The first one is Spark data for in-data set API, which is newer and recommended in the community. However, in Pinterest, most Cascading/Scalding applications will involve thrift sequence file, and encoding/decoding thrift object to of our data frames, it’s really slow, costly, and frequent in most operations. One way to solve this is to totally rewrite the logic to process thrift object, but that’s not scalable for batch conversion. So, we would only recommend this way if there’s no sequence file involved. On the other side, the RDD API, although older and not as performant as Dataframe API, it’s free from thrift dilemma and provides more flexibility to handle thrift object SRT, let alone that is semantically close to Scalding. All this make it our default choice for the conversion.
Here is a brief outline of Cascading/Scalding through Spark migration process. First, where we write applications manually into Spark, where we would try to reuse most of Cascading/Scalding library code except those with Cascading/Scalding syntax. Then, we’ll use an automatic tool to help with result validation and performance tuning, which we’ll cover in detail in later section. Now, let’s first look at how to translate Cascading/Scalding code to Spark.
For Cascading jobs, the DAG is simple with only six different pipes. Most Cascading pipes have one-to-one mapping to Spark transformation as the table show here. For each and every, they mainly hold UDF, and we’ll talk about that in next slide. For other pipes, the mapping is more clear. Just A few things to mention, for GROUP BY conversion, sometimes secondary sort is necessary, which will be covered in next section. What’s more, the hash join in Cascading, it’s similar to broadcast join, but we don’t have Native support for that in RDD, and we’ll need to simulate it with a broadcast variable. And this is my example of RDD broadcast join implementation. As a comparison, translating Cascading UDF is more tricky as UDF hosts the most complexity in Cascading.
There are mainly three aspects in UDF translation that needs attention. The first is semantic difference. Cascading UDF usually do filtering and transformation at the same time. So in Spark, we need to use map with filter accordingly with transition from Java to Spark. The second point is that Cascading UDF is a single thread model while Spark is multi-thread if the executor core is larger than one. This can potentially lead to a concurrency issue. One work around would be set at executor core to one. The last thing is that Cascading UDF is usually implemented as a class with initialization and cleanup blocks. However, in Spark UDF is a function without a need and cleanup hooks. One solution would be using mapPartition to simulate. This is one example. We put the operations inside of the while loop and to do the init or cleanup before or after the while loop.
Translating Scalding is more simple as most Scalding operators have one-to-one mapping to RDD operations due to the semantic similarity as the table shows. Most UDF can also be used in Spark without change, which is a great relief.

Daniel Dai: There are a couple of issues we are facing during the migration. We saw a bunch of Cascading applications are using secondary sort. Using secondary sort in Cascading is simple. Actually, that’s one of the few areas where Cascading is simpler than Spark. In the GROUP BY operator, if you put two fields as input parameters, the second field is sort key. Cascading will make a group share the same key, and inside group, records are sorted by the second key. In Spark, we can simulate by partition and sort within partitions operator. However, the semantics is different. In Cascading, we will invoke UDF after GROUP BY for every group, and give the UDF and iterator over the records of the group. In Spark, we’ll get a single iterator which contains all the input of different groups. It is sorted by the first field of a key, then second field of the key. To fill the semantic gap, we created a group sort iterator, which takes the Spark iterator as input, and the will detect the boundaries of the first key to give a user a two-level iterator.
Unlike Hadoop counters, Spark accumulator is not reliable. There are two sources of unreliability. First, when the stage retry happens. This is usually caused by the loss of Apache or Truffle data. So Spark will try to run the stage and attempt to recover the lost shuffle data for the select partitions. Second, the code increments the accumulator run multiple talents in different stage. In the sample code, the accumulator input records increment in the first stage to calculate some score. And then increment again in the second stage to save the output. To solve the issue, we developed a reliable accumulator based on AccumulatorV2 API, which will do the data application for stage plus partition to stop the stage retry issue.
To address the second source of unreliability, we can insert persist statement to prevent the same code running twice. So Accumulator only increment in the first stage. However, insert persistence statement require user understand the logic of the code, and insert persist only in correct and necessary places, which could be hard. A short cut we found is, we can just retrieve the Accumulator value in the earliest stage, and later stage usually can test duplications. There’s exceptions though. In some cases, user do want to use the same accumulators in different stages. For example, user want to counter the input of both branches of the join. In our reliable Accumulator, we provide flexible ways to retrieve the value of Accumulator; get earliest value which works in most cases, get final value to retrieve the final result, and retrieve value of a particular state to provide maximum flexibility.
We also develop an accumulator tab in Sparky UI. Also, the original Spark UI has accumulators, but it’s mixed with stage tab. It is hard to find the value of a particular accumulator, and it is especially hard to identify which accumulator value is a reliable if the same accumulator appears in multiple stages. With accumulator tab, we can see all those information in a single page, and make it very easy to find accumulator, and reason about the reliability of the accumulator value if it is used in multiple stages. We share this feature in Apache Spark JIRA 35197.
To help tune the performance of the migrated Spark application, we developed a Spark specific profiler. It works by taking snapshot of the thread dump of Spark executors and sends them to Kafka topic. A visualizer, which is developed on top of Nebula, which is also an open source project by Pinterest will ingest the Kafka data and aggregate the thread dump to generate the frame graph. Compared to as a profiler, it is very easy to use. We only need a config flag to enable it, and build a frame graph individualizer in real-time without waiting for the application finishes. The other unique ability for this profiler is that we can split the entire profile into different stages and tasks. So, we can generate the frame graph for the stage or task which has the most performance issue. We also make use of the Spark’s threading model. So, we only focus on the useful threads and ignore noises.
As I mentioned earlier, our data is stored on s3. There are several issues in s3 we are fighting with in the past. Eventual consistency, slow metadata operation, and 503 rate limit errors. The good news is now S3 provide a strong consistency. So, we have one less thing to worry about. To solve the other two issues, we adopted Netflix s3committer with quite a few fixes to reduce the number of mapped data operation, provide tuning knobs to reduce 503, which we already talked about last year. However, current s3committer doesn’t support Spark RDD. That’s because Spark RDD is using all the MapReduce API, and the s3committer only provide commit classes in the new API. We have to write a wrapper class so we can use it in RDD. Now let’s hand over to Zirui.

Zirui Li: Thanks Daniel for the demonstration of our major technical challenges. Now let’s take a closer look at the general migration process.
In Pinterest, we design an auto-migration service, or say AMS, which is a tool to automate major migration steps and save great amount of engineering resources. This is a general graph chart that describes how AMS works. After you propelled your convergence Spark code based on previous sections, AMS would require initial parameters, such as job location, main class input/output, other necessary Spark conflicts, and so on. Then AMS would automatically kick off a run for your converted Spark job, and then automatically perform data validation, performance tuning, and finally migrate to Spark if everything goes smoothly. Let’s take one step closer to see how major components work.
Let’s Start with data validation part. Once the Spark run is finished successfully, data validation would be automatically performed. There are two criteria we’ll check. First, we’ll compare the total row counts between the original Cascading/Scalding output and Spark output. If results match, then we’ll calculate and compare the check sum of output. We’ll first create a table around the output, then we’ll run Spark SQL query towards the table with an internal UDF to calculate the output checksum based on a specific partition, or just entire table. This approach works fine for normal cases. However, it would fail in two scenarios. One is when we have double or flawed in the results, the other is when the output includes arrays and array element orders are different. In some cases, we have work-arounds like defining a stress hold to allow double offload gap, or flatten array first before checksum calculation. However, in other cases, manual investigation would be inevitable.
When the validation fails, there are several sorts of uncertainties that might lead to the gap based on our experience. The first thing is to be careful with input data that depends on timestamp. If so, please make sure the timestamps should be exactly the same as the Cascading/Scalding job. And also pay attention to random number generator in the code. Also, please double check if there’s any unstable top results in the tie condition. This can happen in ordering related operations like limit. Last but not least. The rounding difference might also result in differences in case, such as filter condition tests. These pitfalls can be a good start to figure out the potential bugs in the Spark code or the gaps between input data.
Once the validation is passed, the AMS will enter the automatic performance tuning stage. This is high-level tuning and it can be roughly generalized into three steps. First, we have to retrieve the runtime memory recall usage of the Spark job from our internal metrics storage. Then, we’ll compare the runtime usage with our initial parameters. And if we declare more than needed, will turn down the resource based on the actual usage, and will run again. Note here, we’ll leave some buffer rooms to avoid failure costs by tight results. We’ll treat the job in good performance and past tuning stage if three predefined conditions are all satisfied. The Sparks jobs run time should be less than the original Cascading/Scalding job. The weight cost second savings should be more than 20%, and extra memory usage should be less than 100%. We have a maximum of five retries before entering the failure stage.
Besides the initial automatic performance tuning, users also have the choice to perform other tunings manually. Here, we only cover some common optimizations. Let’s start with executing numbers, course, and memories. There’s always a trade off between these parameters. Usually more executor tends to bring better performance, but the memory cost will increase. Well, you can instead use more cores for executor, which would save on memory, the cost on CPU goes up. So, the decision will be based on different needs for cores or memories. Also notice, there may be one executor core of development for applications where concurrently easily exist for UDF. Other general rule of thumb would be use dynamic allocation whenever possible, which usually helps save costs. And parallelism is another major factor to decide your job’s execution velocity, and you can adjust them accordingly by tuning these parameters.
When both data validation and the performance tuning stages are passed, AMS is able to automate the job migration as well. Once users’ confirmation to migrate is received, AMS would automatically pick the Spark job over original Cascading/Scalding job during run time which replace the jars and the parameters based on previous stages. In this way, the migration details are hidden from users. Note, user also has the option to roll back to original state after the job is migrated. In the meanwhile, during the whole migration process, if anything goes wrong, we have the failure handlers to take over. If the failure is recognized by the handler, the AMS would automatically recover to proper stage with corresponding fixes. For example, for out of memory failure, the AMS would adjust the memory accordingly and the trigger retry in run backstage. On the other hand, if the failures aren’t recognized, then the AMS would enter failure stage, and manual investigation is needed. Now, let’s hand it over to Daniel.

Daniel Dai: Thanks Zirui, and here is the result we get so far during the migration. On average, we reduced the application run time by 40%, we get 47% cost saving in terms of CPU. However, we have to increase memory usage by 33%. But since memory is relatively cheaper, this is still a significant improvement overall. We will continue Cascading/Scalding migration in the future. We will work with individual teams to manually migrate Cascading/Scalding applications or frameworks still evolving to Spark with the help of automated result validation and performance tuning process. For those application not involving, we are also thinking about build a Spark backend for Cascading. Even though we want to get rid of the Cascading/Scalding syntax for the application not evolving, that might not be a problem. And we can quickly consolidate to a Spark execution engine with the Spark backend.
That’s all for our talk. Thanks.

Daniel Dai

Daniel Dai is currently working on data processing platform in Pinterest. He is PMC member for Apache Hive and Pig. He has a PhD in Computer Science with specialization in computer security, data mini...
Read more

Zirui Li

Zirui Li is a software engineer on Pinterest’s Spark team. He has been building Pinterest’s Spark platform and developing Spark functionalities. He’s mainly focusing on Pinterest’s in-house Py...
Read more