Deep Dive into the New Features of Apache Spark 3.1

May 27, 2021 04:25 PM (PT)

Download Slides

Continuing with the objectives to make Spark faster, easier, and smarter, Apache Spark 3.1 extends its scope with more than 1500 resolved JIRAs. We will talk about the exciting new developments in the Apache Spark 3.1 as well as some other major initiatives that are coming in the future. In this talk, we want to share with the community many of the more important changes with the examples and demos.

The following features are covered: the SQL features for ANSI SQL compliance, new streaming features, and Python usability improvements, the performance enhancements and new tuning tricks in query compiler.

In this session watch:
Wenchen Fan, Software Engineer, Databricks
Xiao Li, Engineering Manager, Databricks

 

Transcript

Wenchen Fan: Hello, everyone. This is Wenchen and Xiao from Databricks. Today, we are do a deep dive into the new features of Apache Spark 3.1. A bit more about Databricks, we are a data and AI company. And we provide the first Lakehouse, which offers a simple platform to unify all of your data, analytics and AI workloads. We are working with over more than 5,000 customers across the globe, and we have a culture of innovation, and dedication to the opensource. We have initialized several opensource projects in the data space that are quite successful.
I’m Wenchen and both Xiao and me are working for Databricks. We focus on the development of the Spark team in Databricks and both of us are Spark Committers and PMC members. Spark 3.1 has a lot of features in many areas. In general, every Spark release focus on three things: usability, stability, and performance. Today, we will hear some of the new features. Please read these notes if you want to know all the details.
The first topic I’d like to start with is entity for compliance. Today, entity of compliance is a major effort of the Spark project. It includes an ANSI mode, which enables a more strict behavior such as filling the query if input is invalid. It also includes [inaudible] more functions single syntax and features from the entity of [inaudible]. We hope this can make Spark easier to use for the common single workloads and use cases. This is also a long term effort that the Spark committee will keep investing.
In ANSI SQL, invalid input is still a risk, and the query fail when hitting invalid input. To ensure data quality, in the last release Spark 3.0, we added the runtime overflow check for mass operations and functions. In Spark 3.1, we follow this [inaudible] more closely under the ASI mode. For example, [inaudible] it signal that your data may have some issues. Without ANSI mode, Spark return is none which may hide the data issues and produce wrong result to rest of your data [inaudible].
With ANSI mode, [inaudible] exception to capture the data issues. Similarly, many other data [inaudible] have been added to different places, such as if you want [inaudible]. Stream is not a valid number of format [inaudible] ANSI mode. If you [inaudible] to date with partner stream, this will also fail if the stream is not in the valid data format. With ANSI mode, Spark can capture the data issues earlier to ensure your data quality of your [inaudible].
In addition to runtime checks, the query compelling phase also becomes more strict on the ANSI mode. For example, without ANSI mode, Spark can cast an [inaudible] to timestamp, and we have no idea what the [inaudible] means. Number of seconds from [inaudible]. With ANSI mode, Spark now follows the ANSI [inaudible] to define which kind of CAST is allowed and confusing CAST like this [inaudible] timestamp is forbidden. This is a screenshot of the ANSI CAST document. You can study it in the official Spark website.
This is an example. If you put invalid CAST in your query, so for better experience, Spark provides the [inaudible] message and you can replace your CAST with one of these new functions that have [inaudible]. For example, if your intention is to treat the [inaudible] as number of seconds from Epoch, so you can use the new function [inaudible] which has a clear semantic that this is seconds [inaudible] not milli or microseconds.
In the upcoming Spark 3.2 release, we plan to drill the ANSI mode and complete all the pieces to make it production ready. It includes a new [inaudible] CAST behavior under the ANSI mode. In the last slide, I talked about the implicit CAST that is specify the users. Spark also has [inaudible] CAST which ask the CAST automatically for your queries. For example, if you want add [inaudible] with a double, the type is not match and Spark will cast the [inaudible] to double instead of failing the query. This makes your life easier [inaudible] queries. But also brings confusions if the CAST is not added properly.
In Spark 3.2, we will follow the ANSI SQL spec to define if it has the rules to make it more strict and more [inaudible]. We also revisit all the functions and operators to add more long term data checks to ensure the data quality. Sometimes people may know that the data is dirty and just want to get the known result. If input is invalid, we also will provide some new functions, which will not fail during long times with invalid input. For example, we will add a new function, TRY-DIVIDE, [inaudible] if the [inaudible] happens.
Next one is the CREATE TABLE SQL Syntax. A while ago, I did a survey among my friends and many of them don’t know that Spark has two different CREATE TABLE Syntax. The native one use the USING class to specify the data [inaudible] and other classes to specify options, [inaudible] and so on. Spark also supports the Hive CREATE TABLE Syntax to attract the Hive users as early days. The Hive Syntax is very different from the native one. This is pretty bad for Spark users as they have to learn two different Syntaxes for CREATE TABLE. What’s worse? If you are from a traditional database, you don’t know this Spark of Hive syntax, and just write CREATE TABLE. It creates a Hive text from the table, which is super slow.
In Spark 3.1, we unified these two CREATE TABLE Syntax. Now there’s only one syntax which is compatible with both Hive and the native one. We also added a config to allow to create [inaudible] tables by default if you simply write CREATE TABLE without specifying the data source. I know that this [inaudible] needs to be set manually. Otherwise, Spark will still create Hive tables for [inaudible] reasons.
In this release, we also added as proper support for CHAR/VARCHAR type. Instead of simply treating them as stream type, CHAR/VARCHAR is commonly use in our databases because it gives an upper bound of data science to save storage space, and avoid [inaudible] super large input. Now, Spark does the last trick properly during the table insertion. You can see from this example that a runtime error happens if input stream exceeds the lens limitation of the CHAR/VARCHAR.
For example, this is CHAR five, so the input cannot exceed five CHARS, and then the input is A, B, C, D, E, F, which exceeds the lens, so it fails. For the CHAR type, Spark also follows the ANSI SQL spec and pass the value to lens of the CHAR type. Look at example, the column one is a CHAR type with lens five and the lens of its value is always five. Even if say [inaudible] value is not five lens. For example we insert A, B, C into the table column one and [inaudible] is three. But if you just scan the table later [inaudible] because we part the value during the table insertion.
This behavior actually is not very useful and sometimes confusing. It was useful in the very early days of database, but now most of the time you can just click the CHAR type. In this release, there are many other ANSI features. But I don’t have time to go through in details. For example, in this release, we unified the SQL temp view and permanent view behaviors. Now, we always re-parse, analyze the view SQL stream when reading the view for both temp and permanent views. Doing this, we can capture the table data change for the views. Otherwise, for temp view, we just capture the [inaudible] plan. Even if your table changes like add more files, or remove some [inaudible] it will not change the view result.
Now there’s consistency between temp and permanent views. We always capture the table data changes. We also support the current list in the INSERT statement. Now you can put the current name after the table in your INSERT INTO command, and to reorder the queries of your input query, so [inaudible] of input query. We also support the ASI nested bracketed comments in a SQL [inaudible].
In the upcoming Spark 3.2 release, there are many other ongoing work for ANSI SQL compliance, so for example we will add the ANSI year-month and day-time INTERVAL data type to replace the current INTERVAL type which is very limiting. The current interval type is not comparable, because you cannot compare a month and same days as the month can be either less than or equal to or more than it is, so it’s not make sense to compare a month and three days.
In the new type, we put month and days into different types, so now we can compare each type individually. We will also add ANSI timestamp without Timezone type to simplify the handling of timestamp. We will also add new de-corelation framework for the correlated sub query to support the outer reference in more places. We will also add a new features, it’s a join, which allows you to reference the [inaudible] from the join left side in the join right query. We also add SQL error code which is more searchable and cross language and [JCCB] compatible.
Next I’d like to talk about node decommission. This is an important feature in Spark 3.1 to improve stability. Spark engine is [inaudible] however, recovering from failure has overhead.
Node decommissioning is to reduce the failure recovering over head. It can gracefully handle scheduled executor shutdown. Executor shutdown is assigned to Spark but it has costs. The first all the runtime on executor [inaudible] and need to be rescheduled. Second, the [inaudible] catch blocks and shorter files on executor are lost and need to be recomputed later. In the real world, there are still the cases that executor shutdown is known ahead. For example, Spark has a feature, auto scaling to tune the size of the cluster depending on the demand.
And in that case, Spark sometimes can decide to shut down one or more idle executors. If we deploy Spark on cloud like AWS, there is EC2 spot instance, which will notify the executor being it’s going to be killed to make way for other accounts and this is very cheap instance. Similarly, if you deploy Spark on [inaudible] there is also GCE preemptable instance and it has same features as above. It is very cheap, but the node can go away at any time. If we deploy Spark [inaudible] using YARN or Kubernetes, they will also kill the containers with notification to make way for the higher priority tasks.
It will know that the container or the executor will be killed within a window like one minute and you can do something to make sure the shutdown will not have very large overhead.
Let’s say how this feature node decommissioning helps this case. Here is a simplified workflow for how it works. First, there is a shutdown trigger, which can be something in like Spark auto scaling manager, or [inaudible]. The shutdown trigger will send the signal to executors to notify the shutdown.
Next the executor will do two things at the same time. First it sends a message to driver to notify that this executor is being decommission. Second, they will start migrating data blocks to other live executors. In this case, let’s say we have two other executors left in the cluster, so executor one will start migrating data to them.
Next the driver gets message and stops scheduling tasks to executor one. The data migration may not complete before the executor one is shut down but it’s fine. This is just a best effort. As long as there are some data blocks being migrated, you can save the computing for these blocks.
As a summary, what is node decommissioning do is first it migrates the data blocks to other nodes before shutdown to avoid recomputing these node blocks later. Second it stops scheduling tasks on the decommissioning node as they likely cannot complete and waste resource. Third, Spark will launch speculative tasks for tasks running on the decommissioning node so that they cannot complete to finish the job earlier. Eventually, this feature makes Spark more stable by reducing the overhead of executor shutdown.
So far we have talked about the usability and stability improvements in Spark 3.1. next let’s talk about performance improvement in Spark SQL, which is a major focus in every Spark release.
The first improvement I’d like to talk about is shuffle hash join. Let’s look at how shuffle hash join is done in Spark. For the two tables being joined, we pick the smaller one as [inaudible]. First we will shuffle both sides for the tables to provision by the join keys, so that the same keys appears in the same partition. Next for each partition, we load all the data of the good side to do the hash table. Then we iterate all the data from the probe side and look up the hash table to find the match.
If there’s a match, we drawing these two loads to produce a result. We can easily see that the shuffle hash join is not robust. We don’t know how small each [inaudible] partition is. Building the hash tale may OOM. For example, if the [inaudible] table is skilled, there may be a very large partition and then building the hash table on our side can OOM on executor and crash the application. As a result, Spark always prefers the [inaudible] join of hash join to avoid OOM.
However, user may still want to turn on the shuffle hash join to avoid sorting the join tables. If they know that their table are not skilled and decide it’s okay, and will fit the memories so they will not be OOM. However, because Spark never use the shuffle hash join by default, and over these years, we only add new features to the [inaudible] not shuffle hash join. In this release, it would have to make sure that Spark shuffle hash join is on par with other join algorithms like the [inaudible] join.
the work includes we add the code-gen for the shuffle hash join and we support more join types in the shuffle hash join, and also we handle the unique keys kind of [inaudible] for the shuffle hash join and so on. On this release, you can safely turn on the … not safely, but you can turn on shuffle hash join if you know the OOM will not happen and it can give you some performance boost. In the future, we will keep improving the shuffle hash join to make it more robust. First, we will rather on the AQA framework, so that we can see that hey, we know the side of each partition so if they are small enough, we can safely turn on the shuffle hash join. This can avoid you doing this manually and make wrong decisions.
Second, we will also support spilling in the shuffle hash join. Even if there is one side being very large and can build the hash table, the join can still be run and just slow but not OOM.
The third thing I’d like to talk about is partition pruning. This is very critical for file scan performance. It can reduce the files to scan. In general, before Spark launch the tasks to read the files, it first sends a request to the underlying catalog like the [inaudible] with partition pruning to get a list of files to scan.
Aversely, if Spark sends more partition [inaudible] the catalog can more likely to deploy more files so that you can scan risk files and improve the performance allowed.
In this list, we did a lot of work to send more partition predicates. For example, now we support contains StartsWith and EndsWith in the partition pruning. And now we also support the data type [inaudible] in partition pruning. Whereas before, it only supports very simple types like numeric and string type.
We also support not-equals and NOT IN [inaudible] in the partition pruning. This is not a full list and we will keep expanding this list just push down more [inaudible] to the catalog and get less files to scan.
A similiar topic is predicate push down. In the last slide, I mentioned that Spark can use partition predicates to reduce the number of files to scan to speed up the scan performance. For certain file performance like predicate they [inaudible] maximum values of some columns in a file, so Spark can use Databricks to reduce the data chunks [inaudible] from the file. It’s also important to push down more predicates into the file scan node. However, predicates can always be pushed.
For example, if you have a join node, and the join collision will likely mix with columns from both side. For example, if you join t1 with t2, and the connection may be t1.key extra one and [inaudible] is two. Because this is anti-predicate, you can safely push down these two predicates into the both join sides. For t1, we must say that key must equal to one and for t2, the [inaudible] t2 must meet the requirement key is the two.
However, if you use old predicate to compare these two small predicates with t1, t2 keys, then we can push down. Because for [inaudible] so you cannot have some very strict requirement of both join sides. Similarly, if there is a more complicated predict like this, the third one, as long as the [inaudible] is the old predicate, then you cannot push down, because it’s kind of mixed and you cannot push down this predicates into both sides.
Same problems happens in the normal predicates. If they mix with data predicates and partition columns, because as I mentioned, the partition predicates is handled by the catalog to reduce the files to scan and the data predicates is handled by the [inaudible] format to reduce the file chunks from the files.
They’re handled by different places, and we must separate this data and partition columns in the predicates to be able to push down. However, if you look at the last predicate more closely, I mean the third example, there’s something we can do. For all the loads created by t1, so it either meet the requirement that key equal to one, or it must meet the requirement key equals to three. That’s something we can do here to make Spark more smart and push down some predicates into one join side for other places.
It turns out that the theory for it, which is called conjunctive normal form, so for general predicate like a1 and a2, or b1 and b2, so we can relate it into a more robust form and that combines four predicates with end. Now, it’s very happy to see end because now we can speed the predicates and push down each part separately. In this list, we use this theory to push down more predicates to reduce the disk IO in the file scan.
At the end, I’d like to talk about one ongoing feature in Spark 3.2. Spark is famous for running long queries to be the successor of Hadoop. As a result, over the years, we don’t pay much attention on the query compiling latency as the query compiling time, it almost can be ignored comparing to the code execution time.
However, we don’t want to stop here. We want attack more use cases such as short queries to make Spark a truly general purpose SQL engine. We are doing major improvement of the catalyst framework to speed up most of the analyzer and optimizer rules, to cut down the query compiling time to half. After that, we may see more [inaudible] of the query compiling time and for example maybe we will improve the [inaudible] scheduling time to make Spark faster with its current tasks and to make it fit better with short queries. Please stay tuned.
Next my teammate Xiao will take over and introduce the rest of new features in Spark 3.1.

Xiao Li: Thank you Wenchen. I’m Xiao Li. Next I will introduce a new Spark 3.1 features in streaming Python, enhancements on usability and documentation and environment.
At same times, we will also present the feature [inaudible] in the upcoming releases.
Structured streaming is one of the most important components for Apache Spark. Here I will summarize the major streaming improvements including new streaming table API, and support for stream-stream join and multiple monitoring enhancements.
At the end, I will also talk about the ongoing [inaudible] related to the state store. Structured streaming makes streaming processing much simpler. On Databricks, the usage of streaming is growing very fast. Every day the number of records processed by our structured streaming is more than 120 trillion.
You can easily transform batch processing to stream processing. Stream sources [inaudible] can be viewed as unbounded tables. The stream of events can be seen as rows that are appended to an unbounded table. These concepts have been widely discussed in the streaming community. In Spark 3.1 release, we added two APIs in data stream reader and data stream writer. When you want to create a write streaming table, you can simply [inaudible] spark.readstream.totable without specifying the location.
When you want to load a streaming source, you can simply put spark.readstream.table to read it as a streaming job. You just need to specifying the streaming table name with our location.
Of course, you also can read it as a regular table using spark.read.table. Because this is a delta link table, it will output the current [inaudible]. A delta [inaudible] ACID [inaudible] and simplify your streaming applications.
Starting from Spark 2.3, we adding the support of stream-stream joins. You can join two streaming difference in inner, left outer, and right outer join types. This release will also added two more join types four outer, and left semi. Outer and semi joins have the same guarantees as a inner join, regarding watermark delays and whether data will be dropped or not.
In Spark 3.0 release, you already can monitor your streaming application in [inaudible]. In this release, you also can review your streaming application matches after the cluster is shut down. Also, we added two more streaming matches in both live UI and [inaudible] server.
In structured streaming, we process data from streams in batches. However, for some queries, we need to process the information across batches. This information must be [inaudible] to any kind of failure. This information needs to be efficiently acceptable and efficiently persisted into external [inaudible] storage. Like let’s say [inaudible].
Typical state for streaming operations include stateful aggregation, drop duplicates and stream-stream join. In Spark 3.1 release, we added four matches of state store to live UI and Spark [inaudible] server. It includes every day the number of total state roles updated state role and state roles job by the watermark and every dated state memory use in [inaudible].
These metrics are used for monitoring the capacity of the state store. In Spark 3.1, the different state store is HDFS-based T value stores. When the state for queries are processing data in Spark workers, the key values state data is stored in memory [inaudible]. The state updated are committed as a data file in a state store in a worker.
These delta files are periodically [inaudible] into the snapshot files to improve the recovery efficiency. However, this solution has two major drawbacks. The maximum quantity of the managed states is limited by the heap size of the executor. Because these states are maintained in memory, so second it is slow. When the users enables advanced features like watermarking and time logs, it becomes pretty slow. Because both requires full scan over all the data.
In the upcoming Spark 3.2, we’ll opensource a solution that is being used in Databricks. By default, we are using RocksDB for state store. It can manage state of any size and make all advanced features advanced and robust.
In this solution, for every partition of every state store SQL operator, so there’s a state store to store that partitions [inaudible] in every batch. The task that processes a state for operators’ partition will use a state store API to update all the move keys and then commit a new version of the key values in the store. The version is same as batch ID and it’s used to recover the correct version of state data in case a task or batch needs to be re-executed.
Our goal here is to build a state store that stores a [inaudible] values in RocksDB. RocksDB is running in that executor process. The RocksDB files on a local disc will be synced to a stable file system on every version commit. On recovery, the necessary file will be copied from the remote file system to the local disc to restore the correct version of data in RocksDB.
Next let us briefly summarize what we did in the Project Zen. Project Zen was started last summer. It is to improve the Python usability, make PySpark more Pythonic and improve the [inaudible].
The Python program [inaudible] itself becomes one of the most commonly used language in data science. In Databricks, we saw the same trends. In our notebook users, 47% of commands are using Python. It becomes a top one language. Improving the usability of PySpark is critical to the success of Spark. Today, I want to emphasize two major PySpark features in Spark 3.1, then I will also introduce ongoing work.
We are putting koalas into PySpark in the upcoming release. After that, PySpark will have pandas APIs, and also provide a native visualization and plotting features.
Python will remain a dynamically type language. Although the type is just an annotation in Python, it already can help us in many different ways. The benefits have been widely discussed in the Python community. In this release, we added the [inaudible] to the PySpark. Thanks to this work, PySpark are becoming simpler to use. Most ID and notebook can enable autocompletion for PySpark. For example, when you type Spark in Databricks’ notebook, you can simply press shift and tab. You will see the dark streams of Spark session. That includes an example.
When you type Spark.sparkcontacts., you can press tab and then it lists all the valid variables and functions. With this autocompletion features, you do not need to read the API reference document. This greatly improve your productivity.
Also, the added type hints help us improve the API reference [inaudible]. The [inaudible] can automatically generate the input and output tabs based on the hints. More importantly, it improve the code quality. Type annotation enable ID or [inaudible] to detect the errors without running your Python code. In Spark, we already run mypy in our CI systems to capture the bugs.
Because Python is not static type language, it is easy to write code in Python and make undetected mistake. After we added the type hints, your ID like [inaudible] can check your code and find the common bugs. Here, this shows you are pressing invalid input parameters in your code. This is highlighted in your ID.
For the new PySpark users, many of them do not know what are the simplest way to manage the Python dependency. It becomes a common question. In this release, we make it simpler and document the solution.
The basic idea is to automatically detect the dependency and shape it to the [inaudible] and unpack it on the working directory of the executor. It supports all the three popular environments including Conda, Virtualenv, and PEX. For details, you can read the blog post how to manage Python dependencies in PySpark.
Now, let us discuss ongoing efforts in the community. If you follow our former submitted keynotes, you might know the project Koalas very well. This project was announced two years ago. Because Pandas does not scale out to big data, Koalas fills a gap by providing Pandas’ equivalent APIs that works on the [inaudible].
We want to unify two ecosystem and provide a seamless transition between small and large data. One of our Koalas customer told us by change only 1% of their Pandas’ code, they were able to reduce the execution times by more than 10 times. From a few hours to just a few minutes.
In the last two years, the growth of Koalas was very fast. Koalas was pre-installed on Databricks. Every month, Koalas is imported more than two million times. In opensource, based on the PyPi [inaudible] Koalas has around three million downloads pre month. In the last quarter, Koalas has been used by 84 different countries. It sounds like Koalas is not being used in Africa, we need to extend the use base in Africa.
After a recent vote by the whole Spark community, the community decided to bring Koalas into Spark. In the upcoming release, all the Spark users can use both Spark API and Pandas’ APIs to process your data. This will simplify your migration from Pandas to Spark. This Pandas’ APIs also benefit PySpark user because they support many task that are difficult to do with PySpark. For example, plotting data directly from the PySpark data. To convert your Pandas’ code to Koalas, you just need to change your import statement and the [inaudible] alias names.
It is simple and straightforward. After the change, you can use Pandas’ APIs to process and visualize your data. After porting the Koalas’ code to PySpark, we change the package name from databricks.koalas and to pyspark.pandas and also the alias name PS. If you are the existing Koalas’ users, you need to update the import statements in your existing code to utilize the new version of Koalas in the Spark. Then the new Koalas’ features will be only added to the PySpark in the future.
The last line of this example shows how we visualize the data using plotting API. Most of our Python data science library support plotting features, but it is missing in PySpark. After porting Koalas, PySpark users can easily plot and understand their data visually without converting it to the third-party data structure.
This is another example by using the Pandas’ API on Spark. We can visualize a PySpark data gram by one histogram, we support the two plotting backend by default, it is [inaudible]. But you also can switch to the [inaudible].
The Pandas’ API coverage in plotting capability has reached 90%. In the future, we can add more Spark specific plotting APIs based on your request, based on our own community request and welcome your contribution to our Pandas’ API support, visualization support, and project [inaudible]. We will continue the investment on this direction, make PySparks simple to use for the Python community.
Spark 3.1 release also includes many usability improvements. We’re trying to make Spark simple to both beginnings and experienced user. Here I only highlight four features.
Conversion between timestamp and unit time and conversion from day, between the day and unix date are pretty useful. Spark 3.1 includes a few utility functions for these conversions. Here are a few examples. You can call timestamp seconds to build a timestamp from the number of seconds from UTC epoch. You can also call the unix seconds to convert a timestamp constant to the number of seconds since UTC epoch.
Before Spark 3.1 release, we already have a few related utility functions to unix timestamp and from unix time. Their name are close but their input and output data type are different. The existing functions are used for conversion between long values and timestamp streams.
And also, we introduced two utility functions for getting and setting the current local timezone. Before the release, you can do the same thing by using the SQL [inaudible] spark.sql.section.timezone. And date time handling is pretty complicated. I would suggest everyone to read the blog post comprehensive look at the dates and timestamps in Apache Spark 3.0.
By the way, in the next release, we will also add a new data type timestamp without timezone, which is part of MC’s SQL compliance. Explain is critical command for forms tuning. It output the execution plan for your queries. In Spark 3.0, we introduce a new format for explain command, and that is explain FORMMATTED. Starting from this release, our Spark UI uses a new format by default and also in this release, the EXPLAIN command can also show the runtime plan optimization prospect adaptive query execution in that way.
In the next release, Apache Spark will turn on the AQE, Adaptive Query Execution by default. In Spark 3.0, and more [inaudible] will added. These hints should be used with extreme caution, because it is difficult to manage over time. It is sensitive to your workloads. If your workload patterns are not stable, the hint could even make your query much lower or even [inaudible] out of memory.
When we serve our customers, we found some customer are trying to broke us a huge table. They did not realize the table size is much bigger than the original one. To quickly identify whether your hints are properly used, you can rerun the queries by ignoring all the hints. That becomes simple in Spark 3.0. you don’t need to change your queries. You just need to set the sessions specific SQL configuration, spark.sql.optimizer.disable hints to [inaudible].
After the value change, the query plan will not be affected by the hints. Then let us talk about the error message. Starting from the last year, the communities started working on the error message improvements. There are three major projects. First, group execution messages into dedicated files. After that, it would be easier for our developers to audit the new messages and improve the message quality.
The second, we build the error message guideline in the community and our error message should be standardized and actionable and the error message should [inaudible] answer what, why and how. We welcome your contribution to improving our error message.
In upcoming release, we are also proposing to add language agnostic and locale agnostic identifiers for the error messages. After the changes, each error message has its own identifier. In the SQLSTATE are the portable error code that are part of NC SQL standard. Here is an example, we are enabled to resolve a column from a query because it is missing in the input relation. In the new proposal, the message has an error ID, SPK 10,000 and a SQLSTATE 4-2-7-0-4. Which is from [inaudible].
With this message identifier, your application can easily capture the root cause from a [inaudible], and then either … so reported to the end user directly or automatically correct your queries.
In Spark 3.1, we also made [inaudible] major updates on the documentation and environment. I want to introduce our new document for Python users and search function in our main documentation [inaudible].
In the past, the major contents of the Spark documentation stack will majorly targeting [Scala] SQL users. However, some example code and distribution are confusing to Python users. Starting from this release, we introduced a new documentation side for Python users. In this website, the API reference is [inaudible]. You can search the site based on keywords and based on our experience in Koalas’s documentation development. We also introduced a live notebook.
Python users can directly play the live notebook to try the features in the latest version of Apache Spark. In addition, we also added the search function in our main Spark documentation side. Finding the result page, it’s becoming much faster. And also, we keep adding the new contents into our SQL reference pages. For example, we added a type conversion into the ANSI compliance page.
For Python users, from PyPI, you can download the PySpark in different Hadoop versions. The default version in Spark 3.1 is Hadoop 3.2. and you also choose a pre-build Hadoop 2.7 version, which is the default and 3.0 or your user provided version.
In Spark 3.1, we drop the official support of Python 2.7, 3.4, and 3.5 and also the hive-1.2 distribution is removed from the official release. The new release is based on the hive 2.3 if needed, you can also build it by yourself. In Spark 3.2, support for Apache Mesos as a resource manager is deprecated and will be removed in a future version.
And this part cover the features of Spark 3.1. Due to the lack of time so there are still many other nest features not being covered by this part. Please download the Spark 3.1 or try [inaudible] for these new features.
Now, the community is working on the new features for the next release like Spark 3.2. There are many exciting features, for example, [inaudible] join support, the new de-corelation framework, two new data type, timestamp without timezone, and interval. And also latency reduction, query compiler and scheduler, and also more optimizatin features are added to improve the performance. We turn on the adaptive query execution by default in Spark 3.2 release.
Many thanks to the Spark contributors over the whole work. Thank you. Thank you everyone. If you have any questions, please feel free to post it. Wenchen and I will answer it. Thank you.

Wenchen Fan

Wenchen Fan is a software engineer at Databricks, working on Spark Core and Spark SQL. He mainly focuses on the Apache Spark open source community, leading the discussion and reviews of many features/...
Read more

Xiao Li

Xiao Li is an engineering manager, Apache Spark Committer and PMC member at Databricks. His main interests are on Spark SQL, data replication and data integration. Previously, he was an IBM master inv...
Read more