by Pranav Anand, Tathagata Das and Denny Lee
We recently announced the release of Delta Lake 0.8.0, which introduces schema evolution and performance improvements in merge and operational metrics in table history. The key features in this release are:
spark.databricks.delta.resolveMergeUpdateStructsByName.enabled
.io.delta.implicits.
to use the `delta` method with Spark read and write APIs such as spark.read.delta(“/my/table/path”)
. See the documentation for details.In addition, we also highlight that you can now read a Delta table without using Spark via the Delta Standalone Reader and Delta Rust API. See Use Delta Standalone Reader and the Delta Rust API to query your Delta Lake without Apache Spark™ to learn more.
Get an early preview of O'Reilly's new ebook for the step-by-step guidance you need to start using Delta Lake.
As noted in previous releases, Delta Lake includes the ability to:
With Delta Lake 0.8.0, you can automatically evolve nested columns within your Delta table with UPDATE and MERGE operations.
Let’s showcase this by using a simple coffee espresso example. We will create our first Delta table using the following code snippet.
The following is a view of the espresso table:
The following code snippet creates the espresso_updates DataFrame:
Observe that the espresso_updates DataFrame has a different coffee_profile column, which includes a new flavor_notes column.
To run a MERGE operation between these two tables, run the following Spark SQL code snippet:
By default, this snippet will have the following error since the coffee_profile columns between espresso and espresso_updates are different.
To work around this issue, enable autoMerge using the below code snippet; the espresso Delta table will automatically merge the two tables with different schemas including nested columns.
In a single atomic operation, MERGE performs the following:
UPDATE: espresso_id = 100
has been updated with the new flavor_notes
from the espresso_changes
DataFrame.espresso_id = (101, 102)
no changes have been made to the data as appropriate.INSERT: espresso_id = 103
is a new row that has been inserted from the espresso_changes
DataFrame.You can import io.delta.implicits.
to use the delta
method with Spark read and write APIs such as spark.read.delta("/my/table/path")
. See the documentation for details.
You can now add CHECK constraints to your tables, which not only checks the existing data, but also enforces future data modifications. For example, to ensure that the espresso_id >= 100
, run this SQL statement:
The following constraint will fail as the `milk-based_espresso` column has both True and False values.
The addition or dropping of CHECK constraints will also appear in the transaction log (via DESCRIBE HISTORY espresso) of your Delta table with the operationalParameters articulating the constraint.
When using Delta as a streaming source, you can use the options startingTimestamp
or startingVersion
to start processing the table from a given version and onwards. You can also set startingVersion
to latest
to skip existing data in the table and stream from the new incoming data. See the documentation for details.
Within the notebook, we will generate an artificial stream:
And then generate a new Delta table using this code snippet:
The code in the notebook will run the stream for approximately 20 seconds to create the following iterator table with the below transaction log history. In this case, this table has 10 transactions.
The iterator table has 10 transactions over a duration of approximately 20 seconds. To view this data over a duration, we will run the next SQL statement that calculates the timestamp of each insert into the iterator table rounded to the second (ts
). Note that the value of ts = 0
is the minimum timestamp, and e want to bucket by duration (ts
) via a group by running the following:
The preceding statement produces this bar graph with time buckets (ts
) by row count (cnt
).
Notice for the 20-second stream write performed with ten distinct transactions, there are 19 distinct time-buckets.
Using .option("startingVersion", "6")
, we can specify which version of the table we will want to start our readStream from (inclusive).
The following graph is generated by re-running the previous SQL query against the new reiterator table.
Notice for the reiterator table, there are 10 distinct time-buckets, as we’re starting from a later transaction version of the table.
Try out Delta Lake with the preceding code snippets on your Apache Spark 3.1 (or greater) instance (on Databricks, try this with DBR 8.0+). Delta Lake makes your data lakes more reliable--whether you create a new one or migrate an existing data lake. To learn more, refer to https://delta.io/, and join the Delta Lake community via the Slack and Google Group. You can track all the upcoming releases and planned features in GitHub milestones and try out Managed Delta Lake on Databricks with a free account.
We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.8.0: Adam Binford, Alan Jin, Alex Liu, Ali Afroozeh, Andrew Fogarty, Burak Yavuz, David Lewis, Gengliang Wang, HyukjinKwon, Jacek Laskowski, Jose Torres, Kian Ghodoussi, Linhong Liu, Liwen Sun, Mahmoud Mahdi, Maryann Xue, Michael Armbrust, Mike Dias, Pranav Anand, Rahul Mahadev, Scott Sandre, Shixiong Zhu, Stephanie Bodoff, Tathagata Das, Wenchen Fan, Wesley Hoffman, Xiao Li, Yijia Cui, Yuanjian Li, Zach Schuermann, contrun, ekoifman, and Yi Wu.