Spark SQL works very well with structured row-based data. Vectorized reader and writer for parquet/orc can make I/O much faster. It also used WholeStageCodeGen to improve the performance by Java JIT code. However Java JIT is usually not working very well on utilizing latest SIMD instructions under complicated queries. Apache Arrow provides columnar in-memory layout and SIMD optimized kernels as well as a LLVM based SQL engine Gandiva. These native based libraries can accelerate Spark SQL by reduce the CPU usage for both I/O and execution.
In this session, we would like to take a deep dive on we build Native SQL engine for Spark by leveraging Arrow Gandiva and its compute kernels. We will introduce the general design of commonly used operators like aggregation, sorting and joining, and discuss how can we optimize these operators with SIMD based instructions. We will also introduce how to implement WholeStageCodeGen with Native libraries. Finally we will use micro-benchmarks and TPCH workloads to explain how vectorized execution can benefit these workloads.
Speakers: Yuan Zhou and Chendi Xue
– Hello everyone. My name is Yuan. I am a Software Engineer from Intel Machine Learning Performance Team. Today we will introduce our new project called, Native SQL engine for Spark. I will give a short introduction on the background and the general architecture, and my colleague Chendi we have to introduce more details for this interesting project. Okay, so this is the agenda today. First, we will do a general introduction on our native SQL engine, our motivations for this project and the general architecture, then we were digging to those details. There are generally, there are three big components in this project. The first one is we build a columnar data source. The second of all is columnar shuffle and last one is the biggest one, columnar compute. So Spark SQL works great in today’s world. We still saw there are many, there are several features, several disadvantages for Spark SQL. First one is it’s using an internal row based format. It’s very difficult to optimize with seamed optimizations like AVX tool or AVX 500 curve. The second one is since Spark works on JVM, the GC Overhead is very, very difficult to handle. You have to tune very carefully on this, if you are working under low memory. Second one is the JIT code. Greatly relies on JVM. It’s difficult for engineers to tune this because it’s inside of JVM. Last one is Spark doesn’t have a very good framework to reuse those native libraries. For example, we have many good native libraries on different parts, for example, for Sort and for Hashmap, we have many good libraries. But it’s difficult to use these libraries inside columnar Spark. So, to solve these issues, we are proposing native SQL engine. First, we’ve introduced columnar-based Arrow format. The right picture is actually a famous picture from Arrow community. As you can see in the picture, traditionally, Spark was internal row-based format. Which means they twist different types of merging into one role, in your memory. This makes it, it’s unfriendly for CPUs. But with Arrow memory format, the content with one type is, is placed into one column. So this is much, much friendly for CPUs. We will introduce this in later slides. The second advantage is, we are doing our compute in native calls, so there’s not much work inside of JVM. So the GC Overhead is not an issue here. Sort advantages is we introduced generic called generation framework, which can take C++ code and also LLVM IR code. This makes much easier to check the code generation. The last bit is we also introduced LightweighedJNI call framework, which allows the user to use different native libraries very easily. Okay. So this is a very general picture for our native SQL engine. As you can see in the picture, there are three different layers. Application layer, we didn’t do anything, we didn’t do any changes in the application layer. So, all the existing applications like Spark SQL, or Java Scala apps can run on native SQL engine without any modification required. In the middle layer, we infused a Query plan optimization. So with this layer, it can generate the optimized plan for our native SQL engine. In the bottom layer, this is the core part of our native SQL engine. Mainly, there are three components here. The first one is called Native Arrow Data Source, which can read bait from different formats, for different format. For example, it can read Parquet files with native code and then generate Arrow based columnar format and a feeding to our native SQL engine. The second of all is our native compute operators. So in this part, we have generally, we have implement many operators, as well as those expressions. The core part of this compute, is done with arrow Gandiva and also pre-compiled kernels with C++ code. The third one is a native shuffle. We implemented a new shuffle, with arrow RecordBatch based. These three big components are the core products of our native SQL engine. Also, there are many, there are some small parts, but it’s also very important like memory management. We will generally introduce, we will introduce these components in details in the following slides. Okay. So I want to mention several key points here. The first one is data format. As you can see in the picture, initially Spark used internal role based format, which makes it unfriendly for CPU caches. And then in arrow format, we are using a new format called columnar batches, which is a fixed size batch, but with columnar format. But in our native SQL engine, we’re using an optimized version. So during the benchmark, we found the batch size is critical for different platform. If you have a high end CPU, you may want to increase the batch size, so that the performance can be much higher. Or if you have a low end CPU, you may want to use a smaller batch size. So it works better on your CPU. So in native SQL engine, we have developed tunable batch size, for all of our operators. And it can automatically split the coalesce patches between those operators. Okay. The second, the idea is Spark Plan Rules. So this is a very famous picture in Spark SQL world. The above picture is execution flow for traditional Spark, Spark SQL. And the bottom picture in the page, is the new working flow for our native SQL engine. As you can see, the difference is only in the Query Execution part. In native SQL engine, we didn’t touch anything before physical plan generation. After physical plan generated, we will do some slightly, slight modifications on the plan. For example, we will insert our columnar operators. We will try to do a columnar Whole Stage Codegen, for those different operators. We’re also trying to do the adaptive execution, adaptive Query execution here. Okay. This is the third point. This is the Columnar Whole Stage Codegen concept. So initially, when we implement these features, we first implement those more operators, like in the left picture, it’s working like operator by operator. We then we realized those performance is not that good, because there are many materialization overhead between those operators. Then we added the Columnar Whole Stage Codegen feature, like in the right picture, as you can see, we can actually group those joints and aggregate into one native operator. So that the material overhead is removed. The performance is much better than in the left picture. Okay. So let’s go to the details here. The first one, I would introduce the Data Source. So the left picture is for today’s spark. Although Spark SQL, Spark has a vectorized Parquet reader, but it is still compute based on its internal row format. So eventually Spark will need column to row conversation between those operators. So with our native SQL engine, we are using arrow record batch based data source. It can actually reading from different file formats from different data sources. It’s very, very fast. So this, this picture is working for all. For our data source, the application layer is changed. So existing workloads on pyspark, thriftserver, sparksql can run in with data resource, without any modifications required. This underlying part, we are actually using arrow dataset API. It’s very efficient, because it provides zero data copy feature. And in the underlying, we are using C++ code to read fetch those files from HDFS or your localFs or even S3A. Okay. So these are features for columnar data sources. We support different file systems like S3A, HDFS or localFs. We also supported PushDown support, with statics or metadata filters. As of today, partitioned files are also supported right now. So the dynamic partition pruning feature is also supported. Okay. So this is a general introduction for Columnar Shuffle. The first thing is we, since we are using arrow record batch based, try to split those arrow record batch with native LLVM optimized execution. Also, since we are on native side, we can do very efficient data compression based on different data format. For example, for integers, we choose one format. We choose one compression algorithm, and for our string, We can choose another one. So it can achieve better efficiency. Only the radio side, we will automatically do defragmentation on those shuffle files, so that we can get a better batch size to feed into the following operators. Also the adaptive Query Execution feature is supporting our columnar shuffle. Okay. With that I’ll hand over to Chendi. She will give a detailed introduction on the operators.
– Thanks, Yuan. In this page, we show us all operators and expressions currently supported in our project. And we covered all TPCH and the TBDS queries, by blowing operators and the expression. And one thing I need to mention here is, we will convert a spark expression to arrow Gandiva expression in our columnized spark plan, since arrow Gandiva expression can be passed to native side for later on native process. And during the conversion, we also can detect those expressions, that we are not supporting yet. So we can do the automatically fall back to make all queries wrong. Let’s move on to the Columnar project and the filter. Let’s take an example. Project is something like this. We added field A to A, B C together, and the compare it with field D. And if the result, is greater than D, we return this row. And if the result is less than D, we just abandon this row. And expression like this will be passed by arrow Gandiva to native side. And then we will use LLVM IR to, we will compile into LLVM IR. Since the input data is using Columnar batch format, the LLVM IR function core, will process a vector of data at once, leveraging AVX instruction steps. Then let’s talk about a hashmap. There are two types of hashmap in our native SQL engine project. One hashmap is much like a Spark. And its Spark-compatible. 32 bit hash algorithm. Then with original role to handle harsh collision. This kind of hashmap will be used in hash partitioning and broadcast the hash join. And then we are also implement another advanced type hash algorithm, using 64 bit hash, with spaced layout. This harsh algorithm, will better leverage AVX to generating hash key, and also did much better in probing a hash map. And meanwhile using 64 bit hash algorithms. Also significantly reduced hash collision chances, in probing. So performance is more optimal. And this type of hash algorithm will be used in hash aggregation and shuffled hash join. Then let’s take a look of the BroadcastHashJoin exchange of data. The BroadcastHashJoin exchange of data consists of two types of data, a hash map and the original arrow Record Batch. In our design, we didn’t copy the original data into the hashmap because there you can get two benefits. Firstly, eliminate redundant memory copy from columnized format to hash map. The second benefit is using columnized format, to broadcast data. The data size will be more compact and the con pass state efficient compared with row based hash map. So, and another benefit is also, can be when broadcast exchange is used by semi join or entry join, or existent join without any condition check. We can easily only send out hash map, instead of the hashmap, plus arrow RecordBatch combo. Let’s now talk about Sort and SortMergeJoin. So in most TPCH and the TBC DS cases, sorts are used for SortMergeJoin. So our implementation of sort is also be optimized for mSortMergeJoin case. So there are three type of implementation of sort. So the first type of implementation is for those data with only one field of key and without any payloads. This type of sort will be mostly seen before a semijoin, entry join, and the existent join. And for those type of data we will use in place redis sort. So the sort will be AVX automized. And there’s a stick in the type of implementation of sorts, as well as those data with one field of Key and with multiple fields of payload. And that type of sort, it will be most seen in inner join and outer join. So for that type of data, we will also use redis Sort, but instead to put the data in place, we’ll use indices in the sort memory. And then use the indices for later materialization. And the third type of sort, is for resource data with multiple fields of key and multiple fields of payloads. And to implement that type of sort, we will do code generated quickSort, and then put the indices in sort memory. So stages source is the input for sortMergeJoin. Instead of materialized assorted data, we will only input the indices to SortMergeJoin. And then we will use the index from the indices array to access original data and do a project comparison and the condition check to those data. Then materialize those index be selected. Okay. Then in previous slides, we talked about individual operators implemented in our project. And in this page, we will talk about WholeStageCodeGen for Spark, after SQL plan being analyzed. There will be multiple stages and multiple operators, in one stage. And the spark doing a WholeStageCodeGen optimization to create a big while loop, and then put code block from different operators, in this while loop. And our idea of calling a WholeStageCodeGen, is quite similar with what Spark did. The difference will contain below two points. Firstly, we will build our WholeStageCodeGen in C++, and then compile with GCC. By doing that, our code will be easily compiled with vectorized instructions. Secondly, as you can see, we also do WholeStageCodeGen for shuffled hash drawing. The reason for that is our hash map is managing the native memory. So the allocation and the release will be much easier than using JVM, since the garbage collection overhead will be eliminated. And also since we are using arrow data format, the memory will be more compact and capacitate efficient. So we introduced all components like SQL computing, shuffle and the data source. These components are located as a base of our project. For example, project and filter, will use existing arrow Gandiva and the WholeStageCodeGen multiple key sorts. And shuffle join with condition, will use a C++ code generator kernel. And remaining operators will use compiled templates kernels. So to integrate our projects into Spark, we also implement two layer above. The top layer is written by Scala and Java to convert Scala, to convert Spark expressions to Gandiva expressions. So we can pass them to native side. And then we also need use the top layer, to treat Columnar batch as intermediate data between RDDs. So middle layer is Jni adapter, which is to build Gandiva expressions to different kernels. So this layer is easily to be extended, to hook more libraries. Okay, then let’s talk about memory management. So there are two types of view in this picture. The above view is a workflow view and the below view is a memory pool view. So in the above view from the workflow view, there are two types of data, intermediate data and internal data who need to use memory. So the intermediate data is what in Spark, the intermediate data is on this row. And then in our case, intermediate data is columnar the batch. For example, the first operator, the data source operator, will read data from underlying file system, then allocate arrow Recordbatch, then pass the arrow RecordBatch to the next operator, which is the columnar project. Then the project operator will evaluate the input arrow data and allocate a new one to pass to the next operator. So the intermediate data can be allocated and released between operators. Then for some operators like sort, we don’t want to release the data immediately after evaluation. We want to keep in as long and as possible until spill is triggered. In that case, we want to retain those data as an internal data and then release when task is completed. So is a task completion hook. Besides input to columnar batch, hash map is also treated as internal data. So despite it’s internal data or intermediate data, it’s all from three different memory pool, JVM pool, direct memory pool, and the native memory pool. The data to a memory pool managed by arrow allocation manager. And they will be reduced to spark task memory manager, and the get grant to allocation, for allocation from spark, and also being notified for any spilling instructions. Okay. And this is the snapshot of a demo example of TPCH- Q4. As you can see, all operators in this page are converted to columnar operators. From our initial evaluation, we observed about 30% performance improvement, can bring in 1.5 terabytes TPCH datasets, compared with the original Spark, with the same configuration. Okay. That’s all about our project. And if you are interested to know more about our project, please visit our repo on GitHub, written in these slides. And then for now our project, is under intense development. All TPCH and the TBDS queries are runnable. And that shows big improvement.
Yuan Zhou is a senior software development engineer Intel, where he primarily focused on big data storage software. He's been working in databases, virtualization, and cloud computing for most of his 10+ year career at Intel.
Chendi Xue is a software engineer from Intel data analytics team. She has six years’ experience in bigdata and cloud system optimization, focusing on computation, storage, network software stack performance analysis and optimization. She participated in the development works including Spark-Sql, Spark-Shuffle optimization, cache implementation, etc. Before that, she worked on Linux Device-Mapper optimization and iscsi optimization during her master degree study.a