Get an early preview of O'Reilly's new ebook for the step-by-step guidance you need to start using Delta Lake
Note: We also recommend you read Efficient Upserts into Data Lakes with Databricks Delta which explains the use of MERGE command to do efficient upserts and deletes.
A common use case that we run into at Databricks is that customers looking to perform change data capture (CDC) from one or many sources into a set of Databricks Delta tables. These sources may be on-premises or in the cloud, operational transactional stores, or data warehouses. The common glue that binds them all is they have change sets generated:
- using an ETL tool like Oracle GoldenGate or Informatica PowerExchange,
- from vendor-supplied change tables (e.g., Oracle Change Data Capture), or
- user-maintained database tables that capture change sets using insert/update/delete triggers
and they wish to merge these change sets into Databricks Delta. Based on our experience implementing this use case across both our public and private sector customers, we present a reference architecture for performing CDC using features available today in Databricks Delta.
Background
Change Data Capture, or CDC, in short, refers to the process of capturing changes to a set of data sources and merging them in a set of target tables, typically in a data warehouse. These are typically refreshed nightly, hourly, or, in some cases, sub-hourly (e.g., every 15 minutes). We refer to this period as the refresh period.
The set of changed records for a given table within a refresh period is referred to as a change set. Finally, we refer to the set of records within a change set that has the same primary key as a recordset. Intuitively these refer to different changes for the same record in the final table.
FLAG | ID | VALUE | CDC_TIMESTAMP |
---|---|---|---|
I | 1 | 10 | 2018-01-01 16:02:00 |
U | 1 | 11 | 2018-01-01 16:02:01 |
D | 1 | 11 | 2018-01-01 16:02:03 |
U | 2 | 20 | 2018-01-01 16:02:00 |
D | 3 | 30 | 2018-01-01 16:02:00 |
Table 1: Change set C for table T at time 2018-01-01 17:00:00
Table 1 shows a sample change set C for a table T at a given time. The change set C has four columns:
- a FLAG indicating whether the change is of type I/U/D (insert /update/delete),
- an ID column uniquely identifying the recordset,
- a VALUE column that changes when the record is updated, and
- a CDC_TIMESTAMP indicating when the record was inserted/ updated/deleted. The target table T has the same schema except for the FLAG column.
In this change set, record ID 1 was inserted, updated, and deleted (rows 1, 2, and 3). As such the record set for ID=1 has three records. Record ID 2 was only updated, and record ID 3 was deleted. It is safe to assume that record ID 2 and 3 was inserted at some point earlier.
CDC before Databricks Delta
Prior to Delta, a sample CDC pipeline some of our customers was: Informatica => Oracle => Spark Nightly Batch Job => Databricks.
In this scenario, Informatica pushes change sets from over 30 different data sources and consolidates them in an Oracle data warehouse. Approximately once a day, Databricks jobs retrieve these change sets from Oracle, via JDBC, and refresh tables in Databricks. While this scheme was successfully productionized, it had two major drawbacks:
- It added load to an already overloaded Oracle instance, which resulted in constraints on when and how these ETL jobs could run, and
- The refresh rates were at best nightly, due to concurrency limitations of vanilla Parquet tables (prior to Databricks Delta).
CDC with Databricks Delta
With Databricks Delta, the CDC pipeline is now streamlined and can be refreshed more frequently: Informatica => S3 => Spark Hourly Batch Job => Delta. In this scenario, Informatica writes change sets directly to S3 using Informatica's Parquet writer. Databricks jobs run at the desired sub-nightly refresh rate (e.g., every 15 min, hourly, every 3 hours, etc.) to read these change sets and update the target Databricks Delta table.
With minor changes, this pipeline has also been adapted to read CDC records from Kafka, so the pipeline there would look like Kafka => Spark => Delta. In the rest of this section, we elaborate on this process, and how we use Databricks Delta as a sink for their CDC workflows.
With one of our customers, we implemented these CDC techniques on their largest and most frequently refreshed ETL pipeline. In this customer scenario, Informatica writes a change set to S3 for each of its 65 tables that have any changes every 15 minutes. While the change sets themselves are fairly small (Using Insert Overwrite
The basic idea behind this approach is to maintain a staging table that accumulates all updates for a given recordset and a final table that contains the current up-to-date snapshot that users can query.
Figure 1: Insert Overwrite Flow from Source to Informatica to Cloud Storage to Databricks Delta
For every refresh period, a Spark job will run two INSERT statements.
- Insert (Insert 1): Read the change sets from S3 or Kafka in this refresh period, and INSERT those changes into the staging table.
- Insert Overwrite (Insert 2): Get the current version of every record set from the staging table and overwrite those records in the final table.
Figure 2: Insert Overwrite Flow from Source to Kafka to Structured Streaming to Databricks Delta
A familiar classification scheme to CDC practitioners is the different Types of handling updates ala slowly changing dimensions (SCDs). Our staging table maps closest to an SCD Type 2 scheme whereas our final table maps closest to an SCD Type 1 scheme.
Implementation
Let's dive deeper into the two steps, starting with the first insert.
%scala
val changeSets = Array(file1, file2, …)
spark.read.parquet(changeSets :_*).createOrReplaceTempView("incremental")
%sql
INSERT INTO T_STAGING
PARTITION(CREATE_DATE_YEAR)
SELECT ID, VALUE, CDC_TIMESTAMP
FROM INCREMENTAL
Here, the first cell defines a temporary view over the change sets which is fed to the INSERT INTO
in the second cell. The INSERT INTO
is fairly straightforward with the exception of the PARTITION
clause, so let's take a moment to unwrap that one.
Recall that in cloud data stores and HDFS, records are stored in files, and the unit of an update is a file. In the case of Databricks Delta, these are Parquet files, as presented in this post. When a record needs to be updated, Spark needs to read and rewrite the entire file. As such, it's important to localize the updates to as few files as possible. As such, we partition both the staging and the final table by a column that minimizes the number of rows touched during CDC, and provide the partition column in the PARTITION specification (Azure | AWS) so that Databricks Delta can insert the records in the correct partition of T_STAGING
.
Next, we look at the second insert.
%sql
INSERT OVERWRITE TABLE T_FINAL
PARTITION(CREATE_DATE_YEAR)
SELECT ID, VALUE, CDC_TIMESTAMP
FROM (
SELECT A.*,
RANK() OVER (PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC) AS RNK
FROM T_STAGING A.*
WHERE CREATE_DATE_YEAR IN (2018, 2016, 2015)
) B
WHERE B.RNK = 1 AND B.FLAG 'D'
Let's start with the inner query that reads from T_STAGING
. Recall that the staging table may have any number of inserts, updates, and deletes for a given record set. These changes may come from a given change set (e.g., ID = 1
in Table 1 has 3 changes), or it may come across change sets, as they are inserted into the staging table across multiple refresh periods. The inner RANK
along with the outer filter B.RNK=1 and B.FLAG 'D'
ensures that:
- we only pick the most recent change for a given recordset, and
- where the most recent change is a
'D'
, we exclude the entire record set from being inserted in the final table, thus achieving the purpose of the delete record.
Next, notice the WHERE CREATE_DATE_YEAR IN ( … )
clause. This along with the PARTITION(CREATE_DATE_YEAR)
in the outer query ensures that Databricks Delta will overwrite only these partitions, namely, 2018, 2016, 2015, and the rest are left untouched. It is worth mentioning that while we provided hardcoded values for those partitions above for lucidity, in the actual implementation, those partitions are provided as a Scala list that is dynamically generated from a query to the change sets, such as
val partitionsToOverwrite = spark.sql("select year(to_date(create_date, "MM/dd/yyyy")) from incremental")
...
spark.sql(s"""
INSERT OVERWRITE T_FINAL
...
WHERE CREATE_DATE_YEAR IN ( ${partitionsToOverwrite.mkString(",") )
...
""")
Performance
As mentioned above, Databricks Delta enables the CDC pipeline to run concurrently with users querying consistent views of the data. Here, we show two features in Databricks Delta that can be used to optimize both readers and writers.
- Partition Pruning: In the second insert above (i.e., the writers), the query optimizer in Databricks Delta looks at the
PARTITION
specification and theIN
list in theWHERE
clause to read and rewrite only those partitions that need to be updated. In practice, this can easily cut the portion of the table touched to a half or, usually, much lower, thus helping the second insert by, both, localizing updates toT_FINAL
, and theSELECT
query onT_STAGING
. - Data Skipping / ZORDER indexes: Users querying
T_FINAL
can range from BI tools to ad-hoc SQL queries. Here, queries may or may not have the partition columnCREATE_DATE_YEAR
in theWHERE
clause. For example,
%sql
SELECT …
FROM T_FINAL
WHERE COL1 = val and COL2 = val
In this case, neither COL1
nor COL2
were part of the partition specification. Users can, however, create a Z-order index on those two columns:
OPTIMIZE T_FINAL ZORDER BY (COL1, COL2)
Underneath, Databricks Delta clusters Parquet files by their Z-values such that queries such as the above touch only those files that, possibly, contain COL1 = val
and COL2 = val
.
We note two niceties of the Z-order index that expand the list of queries where they can be used
- In the above case, queries that only filter on
COL1
(or, only, onCOL2
) can also benefit from the index since, unlike, composite indexes in RDBMS, a Z-order index does not bias towards queries that have filters on prefixes of the indexed column list. - If, unlike above, the query also has a filter on the partition column, then both partition pruning and Z-order indexing can be used to drastically reduce the number of files touched at query time.
We refer the reader to this excellent post for details on why and how data skipping and Z-order indexes work with or without partition pruning.
Concurrency
As presented in the earlier post, Databricks Delta adds transactional support to cloud storage. We rely on this support in the following way. While overwriting partitions, Databricks Delta will ensure that in addition to creating new Parquet files, it leaves the old Parquet files around for users who have queries concurrently running on this data. Queries that start after the overwrite completes will pick up the new data. Delta uses a transaction log to reliably point queries to consistent data versions.
Compaction and Cleanup
Over time, both T_STAGING
and T_FINAL
accumulate stale and unused records. For example, any record in T_STAGING
where RANK > 1
, or any file in T_FINAL
that was marked stale by an overwrite to that file. While this does not affect query correctness, it does degrade both CDC and query performance over time. Thankfully, maintenance tasks such as these are simplified in Databricks Delta. Purging old files in T_FINAL
, for example, is as simple as
%sql
VACUUM T_FINAL
Without retention parameters (see VACUUM docs: Azure | AWS), this purges all stale files that are no longer in the transaction log and older than 7 days, which is plenty of time to ensure that there are no concurrent readers accessing those files.
House cleaning on T_STAGING
, on the other hand, involves removing all records where RANK > 1
. The simplest way to do this is to copy T_FINAL
into T_STAGING
%sql
INSERT OVERWRITE T_STAGING SELECT * FROM T_FINAL
Both the above command and the previously shown OPTIMIZE
command can be organized into a notebook for maintenance tasks and scheduled to run as a Databricks job.
As part of the upcoming Databricks Runtime 5.0 release, using MERGE INTO may be another great approach due to Databricks Delta performance improvements and support for delete (D) records.
Productionizing Pipelines
Databricks as a platform helps not only develop and build ETL pipelines but also accelerates time to productionizing these pipelines. Here, we describe two features and an enabling technology in Apache Spark that helped us productionize CDC pipelines.
Configuration Driven Programming
A common design pattern in building large-scale applications is to drive software behavior using configuration (e.g., YAML or JSON-based config files). Spark's support for SQL plus general-purpose programming languages like Scala and Python is well suited for this design pattern since config can be stored in tables and dynamic SQL constructed to use it. Let's see how this would work in the CDC context.
First, recall that our CDC pipeline has 65 tables. We keep a CONFIG
table, where each row is one of the 65 tables, and the fields help us build the CDC SQL statements.
TABLE | PARTITION_COLUMN_EXPRESSION | PARTITION_COLUMN_ALIAS | RANK_EXPRESSION | IS_INSERT_ONLY |
---|---|---|---|---|
T1 | year(to_date(create_date, "MM/dd/yyyy")) | create_date_year | PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC | N |
T2 | year(to_date(transaction_date, "MM/dd/yyyy")) | transaction_date_year | PARTITION BY ID1, ID2 ORDER BY CDC_TIMESTAMP DESC | N |
T3 | null | null | null | Y |
T4 | ... | ... | ... | ... |
Table 2 - Configuration table for driving CDC pipeline for a set of tables
To get the config information for a particular table and perform the CDC logic for that table, use the following code.
val hiveDb = "mydb"
val CONFIG_TABLE = "CONFIG"
// Table is a notebook input widget
val table=s"""${dbutils.widgets.get("wTable")}"""
val (partitionColumnExpression, partitionColumnAlias, rankExpression, isInsertOnly) = spark.sql(s"""
SELECT PARTITION_COLUMN_EXPRESSION, PARTITION_COLUMN_ALIAS, RANK_EXPRESSION, IS_INSERT_ONLY
FROM ${hiveDb}.${CONFIG_TABLE}
WHERE TABLE_NAME=LOWER('$table')
""").as[(String, String, String, Boolean)].head
...
/*
* Insert 1 above would look like following. Here, the table
* variable is set to T1 or T2 from the config table
*/
spark.sql(s"""
INSERT INTO ${table}_STAGING
PARTITION(${partitionColumnAlias)
SELECT ${projectListFromIncremental}
FROM INCREMENTAL
""")
...
// Insert 2 could look like
val partitionsToOverwrite = spark.sql(s"""SELECT DISTINCT ${partitionColumnExpression} FROM INCREMENTAL""").as[String].collect
spark.sql(s"""
INSERT OVERWRITE TABLE ${table}_FINAL
PARTITION(${partitionColumnAlias})
SELECT ${projectListFromIncremental}
FROM (SELECT A.*, RANK() OVER (${rankExpression}) AS RNK
FROM ${table}_STAGING A.*
WHERE ${partitionColumnAlias} IN (${partitionsToOverwrite.mkString(",") )
) B
WHERE B.RNK = 1 AND B.FLAG 'D'
""")
Notebook Workflows and Jobs
Say, the above was implemented in a notebook called ProcessIncremental
. We can then use notebook workflows and have a Controller notebook that goes through each of the 65 tables, finds outstanding change sets for them, and calls ProcessIncremental
on them.
val startDate = "20180101"
val tables = spark.sql(s"""
SELECT TABLE_NAME
FROM $hiveDb.$CONFIG_TABLE
""").as[String].collect.map(_.toLowerCase)
tables.foreach { tbl =>
val processTheseChangeSets = dbutils.notebook.run("GetNextChangeSets", 0, Map(
"wHiveDb" -> hiveDb,
"wTable" -> tbl,
"wStartDate" -> startDate
)
)
if(!processTheseChangeSets.isEmpty) {
val stats = dbutils.notebook.run("ProcessIncremental", 0, Map(
"wHiveDb" -> hiveDb,
"wIncrFiles" -> processTheseChangeSets,
"wTable" -> tbl)
)
)
}
The Controller notebook can be easily scheduled as a job in Databricks to run the CDC pipeline at the desired frequency. Finally, while the above loop is serial, it can be easily changed to a parallel loop using, say, the .par idiom for turning a serial collection to a parallel collection, or, using Scala Futures.
Conclusion
In this blog, we presented a reference architecture for merging into Databricks Delta, change sets captured either by a CDC tool (e.g., Oracle GoldenGate or Informatica PowerExchange), or by change tables maintained by a vendor (e.g., Oracle Change Data Capture), or by change tables maintained by the user using insert/update/delete triggers. We dove into the Spark SQL used to reflect these records in Databricks Delta, two performance considerations (partitioning and z-order indexing), and ancillary considerations such as compaction and cleanup to ensure that the tables queried by end users are optimized for reads. We then saw how Databricks helps accelerates both the development of these ETL pipelines by supporting configuration driven programming, and productionizing these workflows using Notebook Workflows and Jobs.
Visit the Delta Lake online hub to learn more, download the latest code and join the Delta Lake community.