Time Traveling with Delta Lake: A Retrospective of the Last Year

Try out Delta Lake 0.7.0 with Spark 3.0 today!

It has been a little more than a year since Delta Lake became an open-source project as a Linux Foundation project.  While a lot has changed over the last year, the challenges for most data lakes remain stubbornly the same – the inherent unreliability of data lakes.  To address this, Delta Lake brings reliability and data quality for data lakes and Apache Spark; learn more by watching Michael Armbrust’s session at Big Things Conference.

Watch Michael Armbrust discuss Delta Lake: Reliability and Data Quality for Data Lakes and Apache Spark by Michael Armbrust  in the on-demand webcast.

Delta Lake: Reliability and Data Quality for Data Lakes and Apache Spark by Michael Armbrust

With Delta Lake, you can simplify and scale your data engineering pipelines and improve your data quality data flow with the Delta Architecture.

Delta Lake Primer

To provide more details, the following section provides an overview of the features of Delta Lake.  Included are links to various blogs and tech talks that dive into the technical aspects including the Dive into Delta Lake Internals Series of tech talks.

The Delta Architecture with the medallion data quality data flow

The Delta Architecture with the medallion data quality data flow

Building upon the Apache Spark Foundation

Transactions

  • ACID transactions: Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level. Learn more at Diving into Delta Lake: Unpacking the Transaction Log blog and tech talk.
  • Unified Batch and Streaming Source and Sink: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.  To see this in action, try out the Delta Lake Tutorial from Spark + AI Summit EU 2019.

Data Lake Enhancements

Schema Enforcement and Evolution

  • Schema Enforcement: Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption. For more information, refer to Diving Into Delta Lake: Schema Enforcement & Evolution blog and tech talk.
  • Schema Evolution: Business requirements continuously change, therefore the shape and form of your data does as well. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL. For more information, refer to Diving Into Delta Lake: Schema Enforcement & Evolution blog and tech talk.

Checkpoints from the last year

In April 2019, we announced that Delta Lake would be open-sourced with the Linux Foundation; the source code for the project can be found at https://github.com/delta-io/delta.  In that time span, the project has quickly progressed with releases (6 so far), contributors (65 so far), and stars (>2500).  At this time, we wanted to call out some of the cool features.

Execute DML statements

With Delta Lake 0.3.0, you now have the ability to run DELETE, UPDATE, and MERGE statements using the Spark API.  Instead of running a convoluted mix of INSERTs, file-level deletions, and table removals and re-creations, you can execute DML statements within a single atomic transaction.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(sparkSession, pathToEventsTable)
deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete($"date" < "2017-01-01")  

In addition, this release included the ability to query commit history to understand what operations modified the table.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table.

val lastOperationDF = deltaTable.history(1) // get the last operation.

The returned DataFrame will have the following structure.

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      5|2019-07-29 14:07:47|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          4|          null|        false|
|      4|2019-07-29 14:07:41|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          3|          null|        false|
|      3|2019-07-29 14:07:29|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          2|          null|        false|
|      2|2019-07-29 14:06:56|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          1|          null|        false|
|      1|2019-07-29 14:04:31|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-07-29 14:01:40|  null|    null|    WRITE|[mode -> ErrorIfE...|null|    null|     null|       null|          null|         true|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+    

For Delta Lake 0.4.0, we made executing DML statements by supporting Python APIs as noted in Simple, Reliable Upserts, and Deletes on Delta Lake Tables using Python APIs.

Sample merge executed by a DML statement made possible by Delta Lake 0.4.0

Support for other processing engines

An important fundamental of Delta Lake was that while it is a storage layer originally conceived to work with Apache Spark, it can work with many other processing engines.  As part of the Delta Lake 0.5.0 release, we included the ability to create manifest files so that you can query Delta Lake tables from Presto and Amazon Athena.

The blog post Query Delta Lake Tables from Presto and Athena, Improved Operations Concurrency, and Merge performance provides examples of how to create the manifest file to query Delta Lake from Presto; for more information, refer to Presto and Athena to Delta Lake Integration.  Included as part of the same release was the experimental support for Snowflake and Redshift Spectrum. More recently, we’d like to call out integrations with dbt and koalas.

Delta Lake Connectors allow you to standardize your big data storage by making it accessible from various tools, such as Amazon Redshift and Athena, Snowflake, Presto, Hive, and Apache Spark.

With Delta Connector 0.1.0, your Apache Hive environment can now read Delta Lake tables.  With this connector, you can create a table in Apache Hive using STORED BY syntax to point it to an existing Delta table like this:

CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION '/delta/table/path'

Simplifying Operational Maintenance

As your data lakes grow in size and complexity, it becomes increasingly difficult to maintain it.  But with Delta Lake, each release included more features to simplify the operational overhead.  For example, Delta Lake 0.5.0 includes improvements in concurrency control and support for file compactionDelta Lake 0.6.0 made further improvements including support for reading Delta tables from any file system and improved merge performance and automatic repartitioning.

As noted in Schema Evolution in Merge Operations and Operational Metrics in Delta Lake, Delta Lake 0.6.0 introduces schema evolution and performance improvements in merge and operational metrics in table history. By enabling automatic schema evolution in your environment,

# Enable automatic schema evolution
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")

you can run a single atomic operation to update values as well as merge together the new schema with the following example statement.

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, DELTA_PATH)

# Schema Evolution with a Merge Operation
deltaTable.alias("t").merge(
    new_data.alias("s"),
    "s.col1 = t.col1 AND s.col2 = t.col2"
).whenMatchedUpdateAll(  
).whenNotMatchedInsertAll(
).execute()

Improvements to operational metrics were also included in the release so that you can review them from both the API and the Spark UI.  For example, running the statement:

deltaTable.history().show()

provides the abbreviated output of the modifications that had happened to your table.

+-------+------+---------+--------------------+
|version|userId|operation|    operationMetrics|
+-------+------+---------+--------------------+
|      1|100802|    MERGE|[numTargetRowsCop...|
|      0|100802|    WRITE|[numFiles -> 1, n...|
+-------+------+---------+--------------------+    

For the same action, you can view this information directly within the Spark UI as visualized in the following animated GIF.

Sample schema evolution in merge operations and operational metrics Spark UI.

Schema Evolution in Merge Operations and Operational Metrics Spark UI example

For more details surrounding this action, refer to Schema Evolution in Merge Operations and Operational Metrics in Delta Lake.

Enhancements coming with Spark 3.0

While the preceding section has been about our recent past, let’s get back to the future and focus on the enhancements coming with Spark 3.0.

Support for Catalog Tables

Delta tables can be referenced in an external catalog such as the HiveMetaStore with Delta Lake 0.7.0. Look out for Delta Lake 0.7.0 release working with Spark 3.0 in the coming weeks.

Expectations – NOT NULL columns

Delta tables can be created by specifying columns as NOT NULL. This will prevent any rows containing null values for those columns from being written to your tables.

CREATE TABLE events (
    eventTime TIMESTAMP NOT NULL,
    eventType STRING NOT NULL,
    source STRING,
    tags MAP<STRING, STRING>
)
USING delta

More support is on the way, for example the definition of arbitrary SQL expressions as invariants as well as being able to define these invariants on existing tables.

DataFrameWriterV2 API

DataFrameWriterV2 is a much cleaner interface for writing a DataFrame to a table. Table creation operations such as “create”, “replace” are separate from data modification operations such as “append”, “overwrite” and provide the users a better understanding of what to expect. DataFrameWriterV2 APIs are only available in Scala with Spark 3.0.

// Create a table using the DataFrame or replace the existing table
df.writeTo(“delta_table”)
.tableProperties(“delta.appendOnly”, “true”)
.createOrReplace()

// Insert more data into the table
df2.writeTo(“delta_table”).append()    

Get Started with Delta Lake

Try out Delta Lake with the preceding code snippets on your Apache Spark 2.4.5 (or greater) instance (on Databricks, try this with DBR 6.6+). Delta Lake makes your data lakes more reliable (whether you create a new one or migrate an existing data lake).  To learn more, refer to https://delta.io/, and join the Delta Lake community via Slack and Google Group.  You can track all the upcoming releases and planned features in GitHub milestones. You can also try out Managed Delta Lake on Databricks with a free account.

DATABRICKS KOSTENLOS TESTEN Erste Schritte

Registrieren