Skip to main content
Product

Simplifying Change Data Capture with Databricks Delta

by Ameet Kini and Denny Lee

 Get an early preview of <a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=changedatacapture-blog-oreillyupandrunning ">O'Reilly's new ebook </a>for the step-by-step guidance you need to start using Delta Lake</p><hr><p><em>Note: We also recommend you read </em><a href="https://www.databricks.com/blog/2019/03/19/efficient-upserts-into-data-lakes-databricks-delta.html" rel="noopener noreferrer" target="_blank"><em>Efficient Upserts into Data Lakes with Databricks Delta</em></a><em> which explains the use of MERGE command to do efficient upserts and deletes.</em></p><p>A common use case that we run into at Databricks is that customers looking to perform change data capture (CDC) from one or many sources into a set of Databricks Delta tables. These sources may be on-premises or in the cloud, operational transactional stores, or data warehouses. The common glue that binds them all is they have <em>change sets</em> generated:</p><ul><li>using an ETL tool like Oracle GoldenGate or Informatica PowerExchange,</li><li>from vendor-supplied change tables (e.g., <a href="https://docs.oracle.com/cd/B28359_01/server.111/b28313/cdc.htm#CHDGABFJ">Oracle Change Data Capture</a>), or</li><li>user-maintained database tables that capture change sets using insert/update/delete triggers</li></ul><p>and they wish to merge these change sets into Databricks Delta. Based on our experience implementing this use case across both our public and private sector customers, we present a reference architecture for performing CDC using features available today in Databricks Delta.</p><p><a href="https://www.databricks.com/wp-content/uploads/2018/10/CDC-in-Databricks-Delta-Architecture.png" data-lightbox=" "><img class="aligncenter wp-image-34651" style="width:750px;" src="https://www.databricks.com/wp-content/uploads/2018/10/CDC-in-Databricks-Delta-Architecture.png" alt height="411"></a></p><h2>Background</h2><p>Change Data Capture, or CDC, in short, refers to the process of capturing changes to a set of data sources and merging them in a set of target tables, typically in a <a href="https://www.databricks.com/glossary/data-warehouse">data warehouse</a>. &nbsp;These are typically refreshed nightly, hourly, or, in some cases, sub-hourly (e.g., every 15 minutes). We refer to this period as the <em>refresh period</em>.</p><p>The set of changed records for a given table within a refresh period is referred to as a change set. Finally, we refer to the set of records within a change set that has the same primary key as a recordset. Intuitively these refer to different changes for the same record in the final table.</p><table class="table"><thead><tr><th>FLAG</th><th>ID</th><th>VALUE</th><th>CDC_TIMESTAMP</th></tr></thead><tbody><tr><td>I</td><td>1</td><td>10</td><td>2018-01-01 16:02:00</td></tr><tr><td>U</td><td>1</td><td>11</td><td>2018-01-01 16:02:01</td></tr><tr><td>D</td><td>1</td><td>11</td><td>2018-01-01 16:02:03</td></tr><tr><td>U</td><td>2</td><td>20</td><td>2018-01-01 16:02:00</td></tr><tr><td>D</td><td>3</td><td>30</td><td>2018-01-01 16:02:00</td></tr></tbody></table><p><em>Table 1: Change set C for table T at time 2018-01-01 17:00:00</em></p><p>Table 1 shows a sample change set C for a table T at a given time. The change set C has four columns:</p><ul><li>a FLAG indicating whether the change is of type I/U/D (insert /update/delete),</li><li>an ID column uniquely identifying the recordset,</li><li>a VALUE column that changes when the record is updated, and</li><li>a CDC_TIMESTAMP indicating when the record was inserted/ updated/deleted. The target table T has the same schema except for the FLAG column.</li></ul><p>In this change set, record ID 1 was inserted, updated, and deleted (rows 1, 2, and 3). As such the record set for ID=1 has three records. Record ID 2 was only updated, and record ID 3 was deleted. It is safe to assume that record ID 2 and 3 was inserted at some point earlier.</p><h2>CDC before Databricks Delta</h2><p>Prior to Delta, a sample CDC pipeline some of our customers was: Informatica =&gt; Oracle =&gt; Spark Nightly Batch Job =&gt; Databricks.</p><p><a href="https://www.databricks.com/wp-content/uploads/2018/10/Traditional-CDC.png" data-lightbox=" "><img class="aligncenter wp-image-34652" style="width:751px;" src="https://www.databricks.com/wp-content/uploads/2018/10/Traditional-CDC.png" alt height="338"></a></p><p>In this scenario, Informatica pushes change sets from over 30 different data sources and consolidates them in an Oracle data warehouse. Approximately once a day, Databricks jobs retrieve these change sets from Oracle, via JDBC, and refresh tables in Databricks. While this scheme was successfully productionized, it had two major drawbacks:</p><ol><li>It added load to an already overloaded Oracle instance, which resulted in constraints on when and how these ETL jobs could run, and</li><li>The refresh rates were at best nightly, due to concurrency limitations of vanilla Parquet tables (prior to Databricks Delta).</li></ol><h2>CDC with Databricks Delta</h2><p>With Databricks Delta, the CDC pipeline is now streamlined and can be refreshed more frequently: Informatica =&gt; S3 =&gt; Spark Hourly Batch Job =&gt; Delta. In this scenario, Informatica writes change sets directly to S3 using Informatica's Parquet writer. Databricks jobs run at the desired sub-nightly refresh rate (e.g., every 15 min, hourly, every 3 hours, etc.) to read these change sets and update the target Databricks Delta table.</p><p><a href="https://www.databricks.com/wp-content/uploads/2018/10/CDC-with-Databricks-Delta.png" data-lightbox=" "><img class="aligncenter size-full wp-image-34821" style="width:1615px;" src="https://www.databricks.com/wp-content/uploads/2018/10/CDC-with-Databricks-Delta.png" alt height="602"></a></p><p>With minor changes, this pipeline has also been adapted to read CDC records from Kafka, so the pipeline there would look like Kafka =&gt; Spark =&gt; Delta. In the rest of this section, we elaborate on this process, and how we use Databricks Delta as a sink for their CDC workflows.</p><p>With one of our customers, we implemented these CDC techniques on their largest and most frequently refreshed <a href="https://www.databricks.com/glossary/extract-transform-load">ETL pipeline</a>. In this customer scenario, Informatica writes a change set to S3 for each of its 65 tables that have&nbsp;any changes <em>every 15 minutes</em>.&nbsp; &nbsp;While the change sets themselves are fairly small (Using Insert Overwrite</p><p>The basic idea behind this approach is to maintain a <em>staging</em> table that accumulates all updates for a given recordset and a <em>final</em>&nbsp;table that contains the current up-to-date snapshot that users can query.</p><p><iframe allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen frameborder="0" height="315" src="https://www.youtube.com/embed/NWbWVRwzhdM" title="YouTube video player" width="560"></iframe></p><p><em>Figure 1: Insert Overwrite Flow from Source to Informatica to Cloud Storage to Databricks Delta</em></p><p>For every refresh period, a Spark job will run two INSERT statements.</p><ul><li><strong>Insert</strong> (Insert 1): Read the change sets from S3 or Kafka in this refresh period, and INSERT those changes into the <em>staging</em> table.</li><li><strong>Insert Overwrite</strong> (Insert 2): Get the current version of every record set from the staging table and overwrite those records in the final table.</li></ul><p><iframe allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen frameborder="0" height="315" src="https://www.youtube.com/embed/RvsMuSR7Z_c" title="YouTube video player" width="560"></iframe></p><p><em>Figure 2: Insert Overwrite Flow from Source to Kafka to Structured Streaming to Databricks Delta</em></p><p>A familiar classification scheme to CDC practitioners is the different <em>Types</em> of handling updates ala slowly changing dimensions (SCDs). Our staging table maps closest to an SCD Type 2 scheme whereas our final table maps closest to an SCD Type 1 scheme.</p><h4>Implementation</h4><p>Let's dive deeper into the two steps, starting with the first insert.</p><pre style="font-size:10pt;">%scala val changeSets = Array(file1, file2, …) spark.read.parquet(changeSets :_*).createOrReplaceTempView("incremental") %sql INSERT INTO T_STAGING PARTITION(CREATE_DATE_YEAR) SELECT ID, VALUE, CDC_TIMESTAMP &nbsp;FROM INCREMENTAL </pre><p>Here, the first cell defines a temporary view over the change sets which is fed to the <code>INSERT INTO</code> in the second cell. The <code>INSERT INTO</code> is fairly straightforward with the exception of the <code>PARTITION</code> clause, so let's take a moment to unwrap that one.</p><p>Recall that in cloud data stores and HDFS, records are stored in files, and the unit of an update is a file. In the case of Databricks Delta, these are Parquet files, as presented in this <a href="https://www.databricks.com/blog/2017/10/25/databricks-delta-a-unified-management-system-for-real-time-big-data.html">post</a>. When a record needs to be updated, Spark needs to read and rewrite the entire file. As such, it's important to localize the updates to as few files as possible. As such, we partition both the staging and the final table by a column that minimizes the number of rows touched during CDC, and provide the partition column in the PARTITION specification (Azure | AWS) so that Databricks Delta can insert the records in the correct partition of <code>T_STAGING</code>.</p><p>Next, we look at the second insert.</p><pre style="font-size:10pt;">%sql INSERT OVERWRITE TABLE T_FINAL PARTITION(CREATE_DATE_YEAR) SELECT ID, VALUE, CDC_TIMESTAMP &nbsp; FROM ( SELECT A.*, RANK() OVER (PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC) AS RNK FROM T_STAGING A.* WHERE CREATE_DATE_YEAR IN (2018, 2016, 2015) ) B WHERE B.RNK = 1 AND B.FLAG 'D' </pre><p>Let's start with the inner query that reads from <code>T_STAGING</code>. Recall that the staging table may have any number of inserts, updates, and deletes for a given record set. These changes may come from a given change set (e.g., <code>ID = 1</code> in Table 1 has 3 changes), or it may come across change sets, as they are inserted into the staging table across multiple refresh periods. The inner <code>RANK</code> along with the outer filter <code>B.RNK=1 and B.FLAG 'D'</code> ensures that:</p><ol><li>we only pick the most recent change for a given recordset, and</li><li>where the most recent change is a <code>'D'</code>, we exclude the entire record set from being inserted in the final table, thus achieving the purpose of the delete record.</li></ol><p>Next, notice the <code>WHERE CREATE_DATE_YEAR IN ( … )</code> clause. This along with the <code>PARTITION(CREATE_DATE_YEAR)</code> in the outer query ensures that Databricks Delta will overwrite only these partitions, namely, 2018, 2016, 2015, and the rest are left untouched. It is worth mentioning that while we provided hardcoded values for those partitions above for lucidity, in the actual implementation, those partitions are provided as a Scala list that is dynamically generated from a query to the change sets, such as</p><pre style="font-size:10pt;">val partitionsToOverwrite = spark.sql("select year(to_date(create_date, "MM/dd/yyyy")) from incremental") ... spark.sql(s""" INSERT OVERWRITE T_FINAL ... WHERE CREATE_DATE_YEAR IN ( ${partitionsToOverwrite.mkString(",") ) ... """) </pre><h4>Performance</h4><p>As mentioned above, Databricks Delta enables the CDC pipeline to run concurrently with users querying consistent views of the data. Here, we show two features in Databricks Delta that can be used to optimize both readers and writers.</p><ul><li><strong>Partition Pruning:&nbsp;</strong>In the second insert above (i.e., the writers), the query optimizer in Databricks Delta looks at the <code>PARTITION</code> specification and the <code>IN</code> list in the <code>WHERE</code> clause to read and rewrite only those partitions that need to be updated. In practice, this can easily cut the portion of the table touched to a half or, usually, much lower, thus helping the second insert by, both, localizing updates to <code>T_FINAL</code>, and the <code>SELECT</code> query on <code>T_STAGING</code>.</li><li><strong>Data Skipping / ZORDER indexes</strong>: Users querying <code>T_FINAL</code> can range from BI tools to ad-hoc SQL queries. Here, queries may or may not have the partition column <code>CREATE_DATE_YEAR</code> in the <code>WHERE</code> clause. For example,</li></ul><pre style="font-size:10pt;">%sql SELECT … FROM T_FINAL WHERE COL1 = val and COL2 = val </pre><p>In this case, neither <code>COL1</code> nor <code>COL2</code> were part of the partition specification. Users can, however, create a Z-order index on those two columns:</p><pre style="font-size:10pt;">OPTIMIZE T_FINAL ZORDER BY (COL1, COL2) </pre><p>Underneath, Databricks Delta clusters Parquet files by their Z-values such that queries such as the above touch only those files that, possibly, contain <code>COL1 = val</code> and <code>COL2 = val</code>.</p><p>We note two niceties of the Z-order index that expand the list of queries where they can be used</p><ul><li>In the above case, queries that only filter on <code>COL1</code> (or, only, on <code>COL2</code>) can also benefit from the index since, unlike, composite indexes in RDBMS, a Z-order index does not bias towards queries that have filters on prefixes of the indexed column list.</li><li>If, unlike above, the query also has a filter on the partition column, then both partition pruning and Z-order indexing can be used to drastically reduce the number of files touched at query time.</li></ul><p>We refer the reader to this excellent <a href="https://www.databricks.com/blog/2018/07/31/processing-petabytes-of-data-in-seconds-with-databricks-delta.html">post</a> for details on why and how data skipping and Z-order indexes work with or without partition pruning.</p><h4>Concurrency</h4><p>As presented in the <a href="https://www.databricks.com/blog/2017/10/25/databricks-delta-a-unified-management-system-for-real-time-big-data.html">earlier</a> post, Databricks Delta adds transactional support to cloud storage. We rely on this support in the following way. While overwriting partitions, Databricks Delta will ensure that in addition to creating new Parquet files, it leaves the old Parquet files around for users who have queries concurrently running on this data. Queries that start after the overwrite completes will pick up the new data. Delta uses a transaction log to reliably point queries to consistent data versions.</p><h4>Compaction and Cleanup</h4><p>Over time, both <code>T_STAGING</code> and <code>T_FINAL</code> accumulate stale and unused records. For example, any record in <code>T_STAGING</code> where <code>RANK &gt; 1</code>, or any file in <code>T_FINAL</code> that was marked stale by an overwrite to that file. While this does not affect query correctness, it does degrade both CDC and query performance over time. Thankfully, maintenance tasks such as these are simplified in Databricks Delta. Purging old files in <code>T_FINAL</code>, for example, is as simple as</p><pre style="font-size:10pt;">%sql VACUUM T_FINAL </pre><p>Without retention parameters (see VACUUM docs: Azure | <a href="https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-vacuum.html#vacuum" rel="noopener noreferrer" target="_blank">AWS</a>), this purges all stale files that are no longer in the transaction log and older than 7 days, which is plenty of time to ensure that there are no concurrent readers accessing those files.</p><p>House cleaning on <code>T_STAGING</code>, on the other hand, involves removing all records where <code>RANK &gt; 1</code>. The simplest way to do this is to copy <code>T_FINAL</code> into <code>T_STAGING</code></p><pre style="font-size:10pt;">%sql INSERT OVERWRITE T_STAGING SELECT * FROM T_FINAL </pre><p>Both the above command and the previously shown <code>OPTIMIZE</code> command can be organized into a notebook for maintenance tasks and scheduled to run as a Databricks job.</p><blockquote><p>As part of the upcoming Databricks Runtime 5.0 release, using <code>MERGE INTO</code> may be another great approach due to Databricks Delta performance improvements and support for delete (D) records.</p></blockquote><h2>Productionizing Pipelines</h2><p>Databricks as a platform helps not only develop and build ETL pipelines but also accelerates time to productionizing these pipelines. Here, we describe two features and an enabling technology in Apache Spark that helped us productionize CDC pipelines.</p><h3>Configuration Driven Programming</h3><p>A common design pattern in building large-scale applications is to drive software behavior using configuration (e.g., YAML or JSON-based config files). Spark's support for SQL plus general-purpose programming languages like Scala and Python is well suited for this design pattern since config can be stored in tables and dynamic SQL constructed to use it. Let's see how this would work in the CDC context.</p><p>First, recall that our CDC pipeline has 65 tables. We keep a <code>CONFIG</code> table, where each row is one of the 65 tables, and the fields help us build the CDC SQL statements.</p><table class="table"><thead><tr><th>TABLE</th><th>PARTITION_COLUMN_EXPRESSION</th><th>PARTITION_COLUMN_ALIAS</th><th>RANK_EXPRESSION</th><th>IS_INSERT_ONLY</th></tr></thead><tbody><tr><td>T1</td><td>year(to_date(create_date, "MM/dd/yyyy"))</td><td>create_date_year</td><td>PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC</td><td>N</td></tr><tr><td>T2</td><td>year(to_date(transaction_date, "MM/dd/yyyy"))</td><td>transaction_date_year</td><td>PARTITION BY ID1, ID2 ORDER BY CDC_TIMESTAMP DESC</td><td>N</td></tr><tr><td>T3</td><td>null</td><td>null</td><td>null</td><td>Y</td></tr><tr><td>T4</td><td>...</td><td>...</td><td>...</td><td>...</td></tr></tbody></table><p><em>Table 2 - Configuration table for driving CDC pipeline for a set of tables</em></p><p>To get the config information for a particular table and perform the CDC logic for that table, use the following code.</p><pre style="font-size:10pt;">val hiveDb = "mydb" val CONFIG_TABLE = "CONFIG" // Table is a notebook input widget val table=s"""${dbutils.widgets.get("wTable")}""" val (partitionColumnExpression, partitionColumnAlias, rankExpression, isInsertOnly) = spark.sql(s""" SELECT PARTITION_COLUMN_EXPRESSION, PARTITION_COLUMN_ALIAS, RANK_EXPRESSION, IS_INSERT_ONLY FROM ${hiveDb}.${CONFIG_TABLE} WHERE TABLE_NAME=LOWER('$table') """).as[(String, String, String, Boolean)].head ... /* * Insert 1 above would look like following. Here, the table * variable is set to T1 or T2 from the config table */ spark.sql(s""" INSERT INTO ${table}_STAGING PARTITION(${partitionColumnAlias) SELECT ${projectListFromIncremental} FROM INCREMENTAL """) ... // Insert 2 could look like val partitionsToOverwrite = spark.sql(s"""SELECT DISTINCT ${partitionColumnExpression} FROM INCREMENTAL""").as[String].collect spark.sql(s""" INSERT OVERWRITE TABLE ${table}_FINAL PARTITION(${partitionColumnAlias}) SELECT ${projectListFromIncremental} FROM (SELECT A.*, RANK() OVER (${rankExpression}) AS RNK FROM ${table}_STAGING A.* WHERE ${partitionColumnAlias} IN (${partitionsToOverwrite.mkString(",") ) ) B WHERE B.RNK = 1 AND B.FLAG 'D' """) </pre><h3>Notebook Workflows and Jobs</h3><p>Say, the above was implemented in a notebook called <code>ProcessIncremental</code>.&nbsp; We can then use <a href="https://docs.databricks.com/notebooks/notebook-workflows.html#id1">notebook workflows</a> and have a Controller notebook that goes through each of the 65 tables, finds outstanding change sets for them, and calls <code>ProcessIncremental</code> on them.</p><pre style="font-size:10pt;">val startDate = "20180101" val tables = spark.sql(s""" SELECT TABLE_NAME FROM $hiveDb.$CONFIG_TABLE """).as[String].collect.map(_.toLowerCase) tables.foreach { tbl =&gt; val processTheseChangeSets = dbutils.notebook.run("GetNextChangeSets", 0, Map( "wHiveDb" -&gt; hiveDb, "wTable" -&gt; tbl, "wStartDate" -&gt; startDate ) ) if(!processTheseChangeSets.isEmpty) { val stats = dbutils.notebook.run("ProcessIncremental", 0, Map( "wHiveDb" -&gt; hiveDb, "wIncrFiles" -&gt; processTheseChangeSets, "wTable" -&gt; tbl) ) ) } </pre><p>The Controller notebook can be easily scheduled as a <a href="https://docs.databricks.com/jobs.html">job</a> in Databricks to run the CDC pipeline at the desired frequency. Finally, while the above loop is serial, it can be easily changed to a parallel loop using, say, the .par idiom for turning a serial collection to a <a href="https://docs.scala-lang.org/overviews/parallel-collections/overview.html">parallel collection</a>, or, using Scala Futures.</p><h2>Conclusion</h2><p>In this blog, we presented a reference architecture for merging into Databricks Delta, change sets captured either by a CDC tool (e.g., Oracle GoldenGate or Informatica PowerExchange), or by change tables maintained by a vendor (e.g., Oracle Change Data Capture), or by change tables maintained by the user using insert/update/delete triggers. We dove into the Spark SQL used to reflect these records in Databricks Delta, two performance considerations (partitioning and z-order indexing), and ancillary considerations such as compaction and cleanup to ensure that the tables queried by end users are optimized for reads. We then saw how Databricks helps accelerates both the development of these ETL pipelines by supporting configuration driven programming, and productionizing these workflows using Notebook Workflows and Jobs.<br>&nbsp;</p><div style="border-radius:10px;border:medium solid #0CF;margin-left:auto;margin-right:auto;max-width:700px;padding:25px;text-align:left;"><strong>Interested in the open source Delta Lake?</strong><br><a href="https://delta.io?utm_source=delta-blog" rel="noopener noreferrer" target="_blank">Visit the Delta Lake online hub</a> to learn more, download the latest code and join the Delta Lake community.</div>

Get the latest posts in your inbox

Subscribe to our blog and get the latest posts delivered to your inbox.