Using Delta Lake to Transform a Legacy Apache Spark to Support Complex Update/Delete SQL Operation

Download Slides

The convergence of big data technology towards traditional database domain has became an industry trend. At present, open source big data processing engines, such as Apache Spark, Apache Hadoop, Apache Flink, etc., already support SQL interfaces, and the usage of SQL basically occupies a dominant position. Companies use above open source software to build their own ETL framework and OLAP technology. However, in terms of OLTP technology, it is still a strong point of traditional databases. One of the main reasons is the support of ACID by traditional databases.

Traditional commercial databases with ACID capabilities have basically implemented complete CRUD operations. In the big data demain, due to the lack of ACID support, only C(reate) R(ead) operations are implemented, and U(update) D(elete) operations are rarely involved. Part of the infrastructure of the eBay data warehouse is built on the commercial data product Teradata. In recent years, as the company’s overall technology migrated to open source solutions, the data warehouse infrastructure has basically migrated to Apache Hadoop and Apache Spark platforms. But to completely migrate from Teradata, you must build a SQL processing engine with the same capabilities. Analytical SQL on Teradata has 5% of queries using Update / Delete operations. Currently Apache Spark does not have this capability.

This session mainly introduces the eBay Carmel team using Delta Lake to transform legacy Apache Spark to fully support Teradata’s Update / Delete syntax. Besides the standard SQL Update / Delete which provided by Apache Spark 3.0 (At present, it hasn’t released yet), we have also implemented Teradata’s extended syntax in Apache Spark 2.3, which can perform more complex Update / Delete SQL operations, such as join. In this session, I will introduce how we did it and its technical details.

Speaker: Lantao Jin

Transcript

– Hello everyone, I’m very happy to be here as a speaker to give you this session. It’s about using Delta Lake to transform a legacy SparkSQL to support the complex CRUD operations. If that let me introduce myself. My name is Lantao Jin. I’m a Software Engineer at E-bay Infrastructure Data Platform. I have over eight years, big data infrastructure, development experience, and I’m focused on Spark internal optimization and the efficiency platform building. You can contact me with below three links. Let me start the agenda. First, I will introduce the background and our requirements. Then I will show you how we do the cross table update, deleting and insertion. And our optimizing amazing optimization works. Finally is management works. And the three, last three I will mix together with the timeline. Here is the background. Maybe some of you here already know in eBay, we are migration of data warehouse from commercial data warehouse to open source. So different data warehouse has different SQL syntax. So we need, the SQL syntax is compatible with commercial product. For example, we have migration of ad-hoc workload from the MPP engine to Spark SQL, But SQL engine and the MPP engine, there’s different syntax. We need this update/delete syntax. So besides the CRUD is a fundamental requirements in data processing. Before that if we didn’t have the update and delete, since we developed a FileFormat, It’s very like Hive ACID to support this. And also we, our customer always use Left join, to update some incremental data to performance update. That’s why we need the fully, SQL syntax CRUD. Besides databaselization is a trend in analytic data sets, for example, Google BigQuery. This is a big data analytic datasets. But the interface is very like a database. besides with a CRUD, we can provide opportunity, with new approach in many scenarios. To achieve this, we have four requirements. The first one is, we need to fully support commercial data warehouses SQL syntax. So in this Commercial Data Warehouse, they provided a very complex update/delete SQL syntax. And you can see in the right picture, they can join single table join, single table update, and they can join with another table to update to target table. So besides, we need to match it in performance, and this is very significant important. And our work is based on Spark 2.3.0 last year. So we call it Legacy Spark. We have developed many works like many features and optimizations in this Spark version. So for this feature we also need to base on this Spark, and it’s integrated with this features. Another is we need to deliver this features to customer in a short time, because our customer is blocking immigration. Here our project started from last November, based on Spark 2.3.0 with Delta Lake 0.4.0 And so there were older versions and we delivered this feature, this product to our customer at this March. And our customer migrated their works and migrated their scripts, and their SQL to these new features, to performance, the update and delete and the they signed off at this May. Also in this September, we forward-port our optimization work and our features to Spark 3.0.0 and Delta Lake 0.7.0. Yes. I will introduce some data about this feature and usage. The data is end of September. Totally, first I will introduce performance. Totally we got five to 10 times faster than the open source version in our scenarios. And the over ten business units are using Delta tables now. And more than two thousands production tables has converted to Delta tables. And we have over three thousand update/delete statements per day. So before I introduce our implementation, let me back to 2019. Why we choose Delta Lake? Before this, we developed a new file format for Spark. Its implementation is very like have ACID but it’s not good of ability. And we just provide its API, not SQL. In that time Data breaks open source sell product Delta Lake. And the Delta Lake provided a full bring ACID transaction to Spark. And it’s provided the Scala/ Java API to the update, delete and merging too. It’s very easy for us to evaluate the protocol and it’s need minimal changes to compatible with spark. So also we compared some similar production. For example, Hudi and Iceberg. We found that the basically function, they are similar. And for example, in that time they didn’t support SQL and that they cannot use very complex Join, but they provide ACID and the basic API. So it’s hard to see which one the performance is much better than the others. It’s depends on the scenarios. Finally, we choose Delta Lake. Our project has four stages. The first stage, in the first stage, from the last November to this March, we have finished fundamental functions. We refactor Delta Lake to support Legacy Spark 2.3. And we finished the cross table, updating, deleting syntax. Also, we need vacuuming to resolve the small files and we provide a SQL based time-travel. In the stage two, the main work is about fixing and the performance. So we have too many improvements improvements performance. In the stage three, we migration our work to Spark 3.0 and Delta Lake 0.7 and we found it still consume many memory, so we did many things to reduce the memory. Also, our customer wants to support subquery in WHERE. In the stage four, It’s now we are investigate more challenge things like Runtime Filter Z-ordering. So go back to the stage one. The challenge is in that time, the Delta Lake 0.4, it didn’t support SQL. So and Delta Lake required Apache Spark 2.4 and above. But our version is 2.3, and even the Spark 3 support the update/delete syntax. It only used single table update, but we need a very complex update/delete. And we have many, our internal features, our internal optimizations. We also need integration this features with this features. So in this stage, our implementation is we support SQL in 0.4 And also we implementations cross table updating and deleting and insertion, and provide SQL based Time Travel. Besides we finished the automatic vacuuming. So major work in stage one to support Spark 2.3 we add some patches in Spark 3 and then we backport the update/delete code with Data Source V1 to 2.3. Also we downgrade partial codegen interface in Delta Lake 0.4. Also, re-write the resolution code with Data Source V1. So, what is Cross table update/delete? Give you three cases. There are three different database. They all support cross table complex update/delete. For example, in the Teradata, if we update T1 from T1 and T2, then we set T1’s column to T2’s column and T1’s other column to some expressions. In the conditions, we set a T1’s column one equals T2’s column one and other conditions. You know you can see in the right side, this is a full syntax for the Join with the update. So to achieve these, we didn’t use Delta Lake java API or Scala API, we just use SparkSessionExtensions. We implemented it in catalyst. So it kind of benefits by catalyst. And also we injected some reservation, parser rulers in the SparkSessionExtensions, and we store necessary metadata in HiveMetastore. I rewrite it using DataSource V1. Here I will give you some basically process about the update internals. But this is not a deep dive session, so I won’t go to the details. You can see in the SqlBase g4, we change it. We add our form clause. Then the form clause will be a parser in Spark and the generator source. So, they will be finally, it will be generated as update with Join table nodes. In the Source side, it may be condensed multiple Joins. And the finally the update with Join nodes will be transformed to update with Joincommands. In this parts, it’s very similar, like the merge in two process. So different here is we support bucket tables and here is similar, is Insertion Internals. At Insertion Internals is change a lot in Delta 0.7, but even in Delta 0.7 it only support static partition insertion, but we have already finished static and the dynamic partition insertion. So here also we support bucket tables and use the bucket tables we can insertion we can distribute this in buckets. In this scenario, you can see, we add a InsertIntoDataSourceExec nodes. Use this notes to do the insertions, we can, we can easily to insert with some adaptive query executions. Okay. Here is just show how the insertion works. Use it at in, here is I want to see its use in static partitions. And also we provided a dynamic partitions. Another thing is in stage one we finished the SQL based Time Travel. We provided two ways. We can use a Rollback commander to Rollback table to its history. For example, we set version to two then it can go back its history, go to its snappy shot two. Also we can access a table, with its version with Command At. For example, we can update a table from a table with its version, to access its history. Also we can access it in the form. And we finished the Auto vacuuming. First I will introduce our architecture background. In our stack we have two clusters. First, the Apollo clusters is store the data and the Hermes cluster we are performing jobs. So we have many, our Spark is YARN and our one business unit have one YARN queue. And in one queue we have one or more STS servers. And there is one queue called the reserved. This is not used by our customer. So there is a one STS server in the resolve queue. So in each STS servers use listener to store their delta metadata to third-part storage async. And they listen this six events. The STS server in reserved queue will double check the table metadata, if the events lost it. Also the STS server in reserved queue will trigger automatically vacuuming and shows UI. And in this section, our main contribution is we support the cross table updating/deleting and we support update/delete with multiple table joins. Also we can infer the join condition. Also we provide the insertion with static and dynamic partitions, and the auto vacuuming and the SQL based Time travel function. So in stage two, we in online and we found there are still some gaps in performance between the Delta Lake and the commercial product. And also we found in a very big query the Delta table was very easy cause the memory auto memory in Spark Driver. Also, we need to manage the capacity, and if finished will resolve the small file problems in delta. So this stage we did many optimization work. First is the bucketing join. And the second is we resolve small fail problems automatically, and rewrite the outer join to reduce shuffle. And we provide many FilterPushDown to reduce the file scan and the memory consumption. Also we provide Delta UI to manage the vacuuming. Here is bucketing joint case. We restore the bucketSpec information in delta table metadata. And it’s use requireChildDistribution it can retired the shuffle. For example, there is a big data table about five terabytes, join with small tables. Why is there is a join? Because in Delta update with table all use merging two command, also we are using two joints. First is inner join, another is outer join to do the update. So this is the internal of the command. In this join, you will, if the two big table, it will provide the sort-merge join. Before that it will very easy cause auto memory, after our patch it will finish in three minutes. And we also, resolved the small file problem. In current communication version in 0.7 it had a repartition if necessary method. In this method it used configuration. If you set this configuration to two delta will add a repartition and to resolve the small files, but if you didn’t set it, it will generate many small files. But it’s not good for us because our user didn’t use API. So and our user didn’t want to set some configurations. So our solution is before write the file out we added, we converted to the plan, add InsertToDataSource nodes. So in this picture, we add InsertIntoDataSource node in the plan. Then this plan in the stack strategy, it will transform into InsertIntoDataSourceExec, and in the required before it’s execution, it will be added the repartition if really necessary. You can see for this ensure repartition we will if it is a bucket table, we don’t need to repartition again, because it should be partitioned by bucket column. For partition table, it will be repartitioned by the partition column, if unknown partition and a unknown bucketing table, it will distributed by RoundRubin. So it’s not every time to add a repartition work. And we want to reduce the shuffle data we rewrite the heavy outer join. In the merge into command or our update with join command there is outer join. Even there are some predicate in the right side it still need all the rows to filter file to right, to performance to join. So there are many rows in this join. We removed the right side only predicate from the join conditions to its filter, and you need join. And you need the join with the right side. At the right side we will applied the anti-predicates filters first then by our testing and practice after we patch this patch, our Sort-Merge-Join could be five to 10 times faster. It depends on how much data is skipping from the shuffle. Here is example, In the left side, it’s where the Sort-Merge-Join in merge in two and there was a about nearly about eighty billion rows in this sort-merge-join, but apply our patches. it’s only a five billion rows, and then it’s a unit or the rest sixty billion rows. So this performance is much faster. And also we provide Delta UI our resolved queue. In this queue, we can get many informations, many metadata information of vacuuming. So our contribution in stage two is we provide bucket table and the bucketing join. Also we can resolve small files automatically, and we rewrite the heavy outer join to reduce the shuffle data and we provide the many Filter push down to reduce the table scan. You can, you can find it in this three broadcasts. In the stage three, we planned to upgrade to Spark3.0 in this year and our customers need to use subqueries in the WHERE condition to update some data tables. And also we have many features like a file index, materialized view, range partitions. This features, this optimization works didn’t work for data tables. So we need to do that. Need to finish this. And also we have considered the availability and robustness. So in stage three, we have already finished our migration work to our migrate our change to Spark3.0 with Delta 0.7. Also we have support Subquery in WHERE and we did many more optimization works to reduce memory consumption in driver. I will give you the Subquery how to support subquery in WHERE. Here is the three delete or update SQL. They are very complex subquery in this WHERE condition. Count now we support IN , EXISTS, NOT IN with IS NOT NULL, NOT EXISTS, Correlated Subquery, Nested Subquery, Multiple subquery with conjunctive. But we didn’t support NOT IN without IS NOT NULL, Scalar Subquery, and Multiple subqueries with disjunctive. And we support Subqueries by rewrite rewrite the update node plan. So here’s example We update here is subquery IN subquery example in WHERE. So before we write the original plan is the IN subquery expression is in the join condition. And we rewrite it to you inner join. We can use inner join to update a table. Another case is use NOT EXISTS. So to the original plan is NOT EXIST subquery expression in join condition, and rewrite it to use in the source side we use left anti join to join the target table and the source table. Then you, then we use this, inner join with the updated with the target table. So in stage two, our contribution is we migration all of change and improvements to the latest version, and we support subqueries in WHERE. Also we did many reduce memory consumption works for driver. For example, with skipping schema inferation in merge with some bigger table, some tables, if it read from the catalog, and the others. Here is a picture to show the performance. Before that is a commercial Data Warehouse. After we finish our stage one, it’s a performance, here I need to see it. This is a duration. So the low the better. So, so after stage one, the performance is bad. And when we optimized in stage two, finished the stage two, the performance is better, and the duration is done. In the stage three, in the end of stage three the performance is nearly like the Commercial Data Warehouse. So next is our future work. So we have many features like Range partition for delta, and we are working on File index for delta. Also we have already did the Runtime Filter Join Optimization. We hope this can also benefit the Delta. Also we are investigate the Kudu and hope is could it be do more faster updating. Also we are working on Z-ordering to faster the scan and the Native engine we are also interested. Okay that’s my session. Thank you.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About Lantao Jin

eBay

Lantao Jin is a software engineer at eBay's Infrastructure Data Engineering (INDE) group, focusing on Spark optimization and efficient platform building. He is a contributor of Apache Spark and Apache Hadoop and familiar with a variety of distributed systems. Prior to eBay, He worked for Meituan-Dianping and Alibaba, where he worked on data platform and data warehouse infrastructure efforts.