This talk explains how Spark 3.0 can improve the performance of SQL applications. Spark 3.0 provides many performance features such as dynamic partitioning and enhanced pushdown. Each of them can improve the performance of a different type of SQL application. Since the number of features is large, it is not easy for application developers to understand these features at a glance. This talk gives a brief explanation of these features with an example program and explains how it works and how we can improve the performance.
Here are takeaways of this talk:
– Good morning, afternoon and evening, depending on where you are watching. Thank you for joining this session, while there many sessions parallel. My name is Kazuaki Ishizaki. Today, I like to talk about a brief overview of Apache Spark 3.0 improved performance, your SQL application.
I’ll put this presentation into my SlideShare after this talk. So let me introduce myself. I’m a researcher at IBM Research, Tokyo.
My expertise is a compiler optimization. I became a committer on Apache Spark two years ago.
I have been working for SQL package. In particular, whole stage coordination and interaction with Java Virtual Machine. I have been working for IBM Java Virtual Machine over 20 years, 1996. In particular, I was responsible for research and development for Java Just-in-time compiler.
So, Spark 3.0, what we are waiting for a long time since spark 2.4 has been released.
Here are major changes in Spark 3.0. For SQL performance improvement, which I personally selected. Spark 3.0 improves interaction with developers. Introduce new dynamic optimization without manual parameter tuning. Apply optimization more catalysts, and update infrastructure, Java and Scala. Let’s see recent history of Apache Spark release.
Let us go back right to late 2018. The community release Spark 2.4 on November 2018.
At the last Spark Summit at San Francisco, at the Kyenote, here is an announcement, Spark 3.0 expected later this year, it means 2019.
Unfortunately, it did not really happen.
Instead of that, late 2019, the committee is the preview release of Spark 3.0. It was one year later, after Spark 3.0 was released.
Congratulations, Spark 3.0 has been released, early this June.
So community extensively worked for releasing Spark 3.0. There are about 3500 issues, that community reporting picks, including new feature improvement and bug fixes.
3500, it is hard to understand what’s new for SQL performance improvement. So, in this Spark Summit, there are multiple session to explain, introduce and deep dive new features for spark 3.0.
So, this session guides you to understand what’s new focusing on SQL performance. Here a list of seven major changes from my perspective.
So, please vote which items you are interested in among them. Within next one minutes.
So, obviously, there are other many changes. If you are interested in other changes, so please join other session, to correct them.
So, these seven changes can be categorized into four areas that I already talked in this session. The first two, new explain format and all type of join hints. They are related to the interaction with developers for performance understanding and improvement. Item three and four are dynamic optimization without manual parameter tuning. Items five and six are catalyst improvement, to apply optimization, to broaden range of catalyst SQL programs. The last item is update over the Scala and Java version. This is because Spark 2.4 supports their older versions.
So, first, let me talk about new explain format.
So, what is important to improve your SQL application performance? It is very important to understand how your query is optimized.
To understand a query plan, it is necessary to easily re-do it, obviously.
So here is a simple query. Let us see how this query plan is shown on Spark 2.4 and 3.0.
On Spark 2.4, when explain command is executed, the displayed query plan is too long. One line is too busy, this is because one line has many attributes. Unfortunately, it is not easy to understand at a glance.
So Spark 3.0 introduces new options, formatting. To show a query in two parts, the first part is a set of operator. The second part is a set of attribute. In the first part, in both, it makes us easy to understand at a glance what operation will be executed.
If you want to see the detail information such as the input and output for each operation.
You can see attribute part for each operation in the second part. For example, you can connect between first part and second part using the number. For example, number one, number two. So let us go to second change.
This is an extensions of join hint.
On Spark 2.4, we can set on the broadcast join as a hint catalyst six, as a join type that we do not expect.
From Spark 3.0, we can set any of all join type as a hint, if catalyst select as a joint type that we do not expect.
Here are examples how to specify join hint in your program in SQL or DataPrep.
So it is important, easily understand query plan to know what join type is selected by catalyst. If the section is not suitable, you can suggest a better join type by using this feature.
Let’s go to the next category.
Adaptive query optimization. This can automatically tune three type of parameters by using runtime statistics information while the number of reducers. Next is join strategy. The last is number of the partitions for skewed join. They are done automatically without tuning properties run-by-run, by hand. According to the data I’ve explored, this query optimization can achieve good performance improvement for DPCDS query.
This is an automatic section of number of reducers.
This is a quite past item. Spark 2.4 determines the number of reducers using the property spark.sql.suffle partitions.
If you want to ask how five kinds of keys to be grouped, the five reducers that will be launched for five partitions regardless partition size.
Therefore, assigned partition may cause unbiased workload at each reducers. For example, Reducer 0, will finish very quickly, this is because partition 0, is very small. On the other hand, reducer 3 takes much time. The total execution time is dominated by Reducer 3, obviously, this is inefficient. This is because Reducer 0 and Reducer 4 there have a lot of the idle time, while other reducers is busy.
So in Spark 3.0, selects numbers of reducers to meet the average partition size. In this case, Spark 3.0 will launch three reducers. One reducer will have 3 partitions. No all reducer takes similar time. This is efficient compared to in previous case. This is because the same things will be done. Three reducer versus five reducers.
To enable this feature, we need to set these two properties in the bottom of these slide.
The next one it to select join strategy. Spark 2.4 decides join strategy based on statically available information, such as the table size, 100 gigabyte or 80 gigabyte.
At the query optimization time, the size of the filter results, table two is unknown. Therefore, sort merge join is selected in this case.
Spark 3.0 automatically changes join strategy based on the runtime information such as, size of the filtered result. In this example, Spark 3.0 dynamically apply the optimization based on the runtime information, size of filtered result.
In this case, the filter result is very small.
Spark changes the join strategy from a sort merge join, to the broadcast join. As the result, total execution times will be reduced. This is because two sort operations are eliminated. To enable this feature, we need to set the property at the bottom.
The last one is to apply optimized skewed join. On Spark 2.4, partitions are naively created at the join time. Therefore, sometimes skew occurs like partition 2. This is because, total time is dominated by processing partition 2. That is the largest partition in this join.
On Spark 3.0, larger partition is splitted into multiple partitions to make the size smaller. Now partition 3 does not dominate the total time extremely. As a result, total time will be reduced. Therefore, that purple is shorter compared to the previous space. So Spark automatically duplicate partitions correspond to the split partitions. With that minor tuning, run by hand, this decision, splitting and duplication are performed when we set these two properties at the bottom.
So let’s go to another dynamic feature.
So, dynamic partitioning pruning can reduce I/O accesses by avoiding to read unnecessary partition using predicated push down. This pruning is automatically operand.
According to the presentation, in the last Spark AI summit in Europe, this can achieve it’s great performance improvement in DPCDS query.
This is an overview of the Naive Broadcast Hash Join on Spark 2.4.
In this case, all of the data, both table are read.
The result of the filters, table small, is broadcasted. Then, broadcast join occurs.
Since the other table large is very huge, in the left part side, lots of the I/O occurs and wastes a lot of the time due to the reading and use of data at the join time.
Dynamic partition filter allow table large to perform the predicated push down to avoid reading the unnecessary data.
The dynamic filter information is sent to the broadcast data. The dynamic filter information based on that filter operation on the right hand side. Then when table large is read, push down is upright based on the dynamic filter information. Finally, only the necessary partition is read from the table large.
It can do basically a lot of the unnecessary data to be read, then we can improve the performance in this query.
This is an example of Dynamic Partitioning Pruning. As you can see, in the physical plan, the table large uses dynamic pruning information, in partition filter, written in blue. This information used for push down in parquet as read. As a result, table large can reduce the amount of data to be read.
Let us go to the other category. Catalyst improvement regarding the nested columns.
The first improvement regarding the nesting column, is a column pruning. Column pruning can read only necessary columns from parquet column. On Spark 2.4, column pruning works for some operations such as Limit. As you can see here, on these a nested first column, in column two, between blue is read. We can successfully avoid reading unnecessary data like column one, second column in column two.
On implementation of column pruning for nested column was ad hoc in Spark 2.4. For example, it does not work as a operator, such as a repartitioning. As you can see, in the bottom, most columns are read and repartitioned. After the repartition, pruning is performed. When we compare both case, the bottom one is inefficient. This is because, bottom one are read two columns compared to the upper one. So the upper one, read only needs one column.
So Spark 3.0 picks this issue. Spark 3.0 can apply column pruning to nested column with any operations. It can improve the performance for any operation for column pruning.
Here are example plans for column pruning. As you can see, most query limit under the repartitioning. ReadSchema in parquet have on the column two, the first column. Therefore, parquet will read only one column.
We can reduce amount of the I/O by avoiding read unnecessary data.
Another improvement regarding the nested column is the push down such as parquet and ORC.
On Spark 2.4, push down works well for non-nested columns such as column one. Little mean orange. However, it does not work for nested column such as column two. As you can see, query plan, push down filter does not have a condition regarding the column two, one equal 100. As a result, all partitions are read from parquet column number in Spark 2.4.
Spark 3.0 fixes this issue. Push down in parquet and ORC works well for nested column. As you can see in blue, blue exposition pass through the parquet, then parquet can apply predicate push down. So parquet does not send all of the data to Spark, but send all the necessary data partitions on the part of column. Therefore, we can improve the performance compared to the recent push down in previous stage. So, generally, the nested column is used in complex and large schema. It’ll be useful handling complex and large data in your use case. You can improve the performance such a case.
Next one is improvement for complex aggregation.
On Spark 2.4, a complex aggregation query is slow. This is because, the query is not translated into the native code. For example, it happens in DPCDS query. Why it happens?
Here is a quick overview of how query is translated into native code. A given query is translated into Java code by Spark with a query optimization in catalyst. Then HotSpot compiler in OpenJDK translates Java code into native code.
However, if that Java code is too large, HotSpot compiler gives up generating native code Thus the performance that is not good.
Spark 3.0 fixes this issue, by making aggregation Java code small. Spark 3.0 splits large Java method into small methods.
Then HotSpot compiler can generate native code for multiple small methods. As a result, performance is improved compared to Spark 2.4.
So here is an example program to perform average function to 50 columns.
Now, generated code size is small as you can see in blue.
All of those method up to 8000 Java bytecode per method.
All of the methods will be translated into native method by using HotSpot compiler.
When we disable this feature, the size is more than 8000 as you can see in red part. The native code will not generated by HotSpot compiler.
Therefore, the performance is not good compared to the above case.
Finally, let me talk about updating Scala and Java.
On Spark 2.4, Java 8 and Scala 2.11 were usually supported.
From Spark 3.0, Java 11 and Scala 2.12 are usually supported. Java 11 has much to improvements for native coordination and garbage correction. Therefore, in general the performance will be improved. And from the maintenance view, OpenJDK 11 is a Long-Term- Support version. It will be supported by 2026.
In the Scala side, Scala 2.12 exploits new features of Java 8.
This is good compared to the Scala 2.11.
So here is a takeaway, Spark 3.0 introduces many features
for one and half year. From my perspective, these seven feature, help performance improvement for your SQL application. If you want to revisit this presentation, please visit this SlideShare URL tomorrow. I’ll post my presentation after this talk.
So I put some resource, good to read,
at blog entry and presentation.
Dr. Kazuaki Ishizaki is a senior technical staff member at IBM Research - Tokyo. He has over 20 years of experience conducting research and development of dynamic compilers for Java and other languages. He is an expert in compiler optimizations, runtime systems, and parallel processing. He has been working for IBM Java just-in-time compiler and virtual machine from JDK 1.0 to Java 8. His research has focused on how system software can enable programmers to automatically exploit hardware accelerators in high-level languages and frameworks. He is an Apache Spark committer, working for SQL component. He is an ACM distinguished member.