Automatically Evolve Your Nested Column Schema, Stream From a Delta Table Version, and Check Your Constraints
We recently announced the release of Delta Lake 0.8.0, which introduces schema evolution and performance improvements in merge and operational metrics in table history. The key features in this release are:
- Unlimited MATCHED and NOT MATCHED clauses for merge operations in Scala, Java, and Python. Merge operations now support any number of whenMatched and whenNotMatched clauses. In addition, merge queries that unconditionally delete matched rows no longer throw errors on multiple matches. This will be supported using SQL with Spark 3.1. See the documentation for details.
- MERGE operation now supports schema evolution of nested columns. Schema evolution of nested columns now has the same semantics as that of top-level columns. For example, new nested columns can be automatically added to a StructType column. See Automatic schema evolution in Merge for details.
- MERGE INTO and UPDATE operations now resolve nested struct columns by name. Update operations UPDATE and MERGE INTO commands now resolve nested struct columns by name, meaning that when comparing or assigning columns of type StructType, the order of the nested columns does not matter (exactly in the same way as the order of top-level columns). To revert to resolving by position, set the following Spark configuration to false:
spark.databricks.delta.resolveMergeUpdateStructsByName.enabled
. - Check constraints on Delta tables. Delta now supports CHECK constraints. When supplied, Delta automatically verifies that data added to a table satisfies the specified constraint expression. To add CHECK constraints, use the ALTER TABLE ADD CONSTRAINTS command. See the documentation for details.
- Start streaming a table from a specific version (#474). When using Delta as a streaming source, you can use the options startingTimestamp or startingVersion to start processing the table from a given version and onwards. You can also set startingVersion to latest to skip existing data in the table and stream from the new incoming data. See the documentation for details.
- Ability to perform parallel deletes with VACUUM (#395). When using `VACUUM`, you can set the session configuration spark.databricks.delta.vacuum.parallelDelete.enabled to true in order to use Spark to perform the deletion of files in parallel (based on the number of shuffle partitions). See the documentation for details.
- Use Scala implicits to simplify read and write APIs. You can import
io.delta.implicits.
to use the `delta` method with Spark read and write APIs such asspark.read.delta(“/my/table/path”)
. See the documentation for details.
In addition, we also highlight that you can now read a Delta table without using Spark via the Delta Standalone Reader and Delta Rust API. See Use Delta Standalone Reader and the Delta Rust API to query your Delta Lake without Apache Spark™ to learn more.
Get an early preview of O'Reilly's new ebook for the step-by-step guidance you need to start using Delta Lake.
Automatically evolve your nested column schema
As noted in previous releases, Delta Lake includes the ability to:
- execute merge operations,
- simplify your insert/update/delete operations in a single atomic operation,
- enforce and evolve your schema (more details can also be found in this tech talk),
- evolve your schema within a merge operation.
With Delta Lake 0.8.0, you can automatically evolve nested columns within your Delta table with UPDATE and MERGE operations.
Let’s showcase this by using a simple coffee espresso example. We will create our first Delta table using the following code snippet.
# espresso1 JSON string
json_espresso1 = [ ... ]
# create RDD
espresso1_rdd = sc.parallelize(json_espresso1)
# read JSON from RDD
espresso1 = spark.read.json(espresso1_rdd)
# Write Delta table
espresso1.write.format("delta").save(espresso_table_path)
The following is a view of the espresso table:
The following code snippet creates the espresso_updates DataFrame:
# Create DataFrame from JSON string
json_espresso2 = [...]
espresso2_rdd = sc.parallelize(json_espresso2)
espresso2 = spark.read.json(espresso2_rdd)
espresso2.createOrReplaceTempView("espresso_updates")
Observe that the espresso_updates DataFrame has a different coffee_profile column, which includes a new flavor_notes column.
# espresso Delta Table `coffee_profile` schema
|-- coffee_profile: struct (nullable = true)
| |-- temp: double (nullable = true)
| |-- weight: double (nullable = true)
# espresso_updates DataFrame `coffee_profile` schema
|-- coffee_profile: struct (nullable = true)
| |-- flavor_notes: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- temp: double (nullable = true)
| |-- weight: double (nullable = true)
To run a MERGE operation between these two tables, run the following Spark SQL code snippet:
MERGE INTO espresso AS t
USING espresso_updates u
ON u.espresso_id = t.espresso_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
By default, this snippet will have the following error since the coffee_profile columns between espresso and espresso_updates are different.
Error in SQL statement: AnalysisException:
Cannot cast struct,temp:double,weight:double> to struct.
All nested columns must match.
AutoMerge to the rescue
To work around this issue, enable autoMerge using the below code snippet; the espresso Delta table will automatically merge the two tables with different schemas including nested columns.
-- Enable automatic schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled=true;
In a single atomic operation, MERGE performs the following:
UPDATE: espresso_id = 100
has been updated with the newflavor_notes
from theespresso_changes
DataFrame.espresso_id = (101, 102)
no changes have been made to the data as appropriate.INSERT: espresso_id = 103
is a new row that has been inserted from theespresso_changes
DataFrame.
Simplify read and write APIs with Scala Implicits
You can import io.delta.implicits.
to use the delta
method with Spark read and write APIs such as spark.read.delta("/my/table/path")
. See the documentation for details.
// Traditionally, to read the Delta table using Scala, you would execute the following
spark
.read
.format("delta")
.load("/tmp/espresso/")
.show()
// With Scala implicts, the format is a little simpler
import io.delta.implicits.
spark
.read
.delta("/tmp/espresso/")
.show()
Check Constraints
You can now add CHECK constraints to your tables, which not only checks the existing data, but also enforces future data modifications. For example, to ensure that the espresso_id >= 100
, run this SQL statement:
-- Ensure that espresso_id >= 100
-- This constraint will both check and enforce future modifications of data to your table
ALTER TABLE espresso ADD CONSTRAINT idCheck CHECK (espresso_id >= 100);
-- Drop the constraint from the table if you do not need it
ALTER TABLE espresso DROP CONSTRAINT idCheck;
The following constraint will fail as the `milk-based_espresso` column has both True and False values.
- Check if the column has only True values; NOTE, this constraint will fail.
ALTER TABLE espresso ADD CONSTRAINT milkBaseCheck CHECK (`milk-based_espresso` IN (True));
-- Error output
Error in SQL statement: AnalysisException: 1 rows in profitecpro.espresso violate the new CHECK constraint (`milk-based_espresso` IN ( True ))
The addition or dropping of CHECK constraints will also appear in the transaction log (via DESCRIBE HISTORY espresso) of your Delta table with the operationalParameters articulating the constraint.
Start streaming a table from a specific version
When using Delta as a streaming source, you can use the options startingTimestamp
or startingVersion
to start processing the table from a given version and onwards. You can also set startingVersion
to latest
to skip existing data in the table and stream from the new incoming data. See the documentation for details.
Within the notebook, we will generate an artificial stream:
# Generate artificial stream
stream_data = spark.readStream.format("rate").option("rowsPerSecond", 500).load()
And then generate a new Delta table using this code snippet:
stream = stream_data \
.withColumn("second", second(col("timestamp"))) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "...") \
.trigger(processingTime = "2 seconds") \
.start("/delta/iterator_table")
The code in the notebook will run the stream for approximately 20 seconds to create the following iterator table with the below transaction log history. In this case, this table has 10 transactions.
-- Review history by table path
DESCRIBE HISTORY delta.`/delta/iterator_table/`
-- OR review history by table name
DESCRIBE HISTORY iterator_table;
Review iterator output
The iterator table has 10 transactions over a duration of approximately 20 seconds. To view this data over a duration, we will run the next SQL statement that calculates the timestamp of each insert into the iterator table rounded to the second (ts
). Note that the value of ts = 0
is the minimum timestamp, and e want to bucket by duration (ts
) via a group by running the following:
SELECT ts, COUNT(1) as cnt
FROM (
SELECT value, (second - min_second) AS ts
FROM (
SELECT * FROM iterator_table CROSS JOIN (SELECT MIN(second) AS min_second FROM iterator_table) x
) y
) z
GROUP BY ts
ORDER BY ts
The preceding statement produces this bar graph with time buckets (ts
) by row count (cnt
).
Notice for the 20-second stream write performed with ten distinct transactions, there are 19 distinct time-buckets.
Start the Delta stream from a specific version
Using .option("startingVersion", "6")
, we can specify which version of the table we will want to start our readStream from (inclusive).
# Start the readStream using startingVersion
reiterator = spark.readStream.format("delta").option("startingVersion", "6").load("/delta/iterator_table/")
# Create a temporary view against the stream
reiterator.createOrReplaceTempView("reiterator")
The following graph is generated by re-running the previous SQL query against the new reiterator table.
Notice for the reiterator table, there are 10 distinct time-buckets, as we’re starting from a later transaction version of the table.
Get Started with Delta Lake 0.8.0
Try out Delta Lake with the preceding code snippets on your Apache Spark 3.1 (or greater) instance (on Databricks, try this with DBR 8.0+). Delta Lake makes your data lakes more reliable--whether you create a new one or migrate an existing data lake. To learn more, refer to https://delta.io/, and join the Delta Lake community via the Slack and Google Group. You can track all the upcoming releases and planned features in GitHub milestones and try out Managed Delta Lake on Databricks with a free account.
Credits
We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.8.0: Adam Binford, Alan Jin, Alex Liu, Ali Afroozeh, Andrew Fogarty, Burak Yavuz, David Lewis, Gengliang Wang, HyukjinKwon, Jacek Laskowski, Jose Torres, Kian Ghodoussi, Linhong Liu, Liwen Sun, Mahmoud Mahdi, Maryann Xue, Michael Armbrust, Mike Dias, Pranav Anand, Rahul Mahadev, Scott Sandre, Shixiong Zhu, Stephanie Bodoff, Tathagata Das, Wenchen Fan, Wesley Hoffman, Xiao Li, Yijia Cui, Yuanjian Li, Zach Schuermann, contrun, ekoifman, and Yi Wu.