This is a collaborative post between the data teams as Badal, Google and Databricks. We thank Eugene Miretsky, Partner, and Steven Deutscher-Kobayashi, Senior Data Engineer, of Badal, and Etai Margolin, Product Manager, Google, for their contributions.
Operational databases capture business transactions that are critical to understanding the current state of the business. Having real-time insights into how your business is performing enables your data teams to quickly make business decisions in response to market conditions.
Databricks provides a managed cloud platform to analyze data collected from source systems, including operational databases, in real-time. With the Databricks Lakehouse Platform, you can store all of your data in a secure and open lakehouse architecture that combines the best of data warehouses and data lakes to unify all of your analytics and AI workloads. Today, we’re excited to share our partner Badal.io’s release of their Google Datastream Delta Lake connector, which enables Change Data Capture (CDC) for MySQL and Oracle relational databases. CDC is a software-based process that identifies and tracks changes to data in a source data management system, such as a relational database (RDBMS). CDC can provide real-time activity of data by processing data continuously as new database events occur.
Log-based CDC is an alternative approach to traditional batch data ingestion. It reads the database’s native transaction log (sometimes called redo or binary log) and provides real-time or near-real-time replication of data by streaming the changes continuously to the destination as events occur.
CDC presents the following benefits:
Google Cloud Datastream is an easy-to-use CDC and replication service that allows you to synchronize data across heterogeneous databases, storage systems and applications reliably and with minimal latency.
The benefits of Datastream include:
Badal.io and Databricks collaborated on writing a Datastream connector for Delta Lake.
Datastream writes change log records to files in Google Cloud Storage (GCS) files in either avro or JSON format. The datastream-delta connector uses Spark Structured Streaming to read files as they arrive and streams them to a Delta Lake table.
The connector creates two Delta Lake tables per source table:
read_timestamp | source_timestamp | object | source_metadata | payload |
---|---|---|---|---|
2021-05-16 T00:40:05.000 +0000 |
2021-05-16 T00:40:05.000 +0000 |
demo_inventory. voters |
{"table":"inventory.voters","database":"demo", "primary_keys":["id"],"log_file":"mysql-bin.000002", "log_position":27105167,"change_type" :"INSERT","is_deleted":false}
|
{"id":"743621506","name":"Mr. Joshua Jackson","address":"567 Jessica Plains Apt. 106\nWhitestad, HI 51614","gender":"t"} |
2021-05-16 T00:40:06.000 +0000 |
2021-05-16 T00:40:06.000 +0000 |
demo_inventory. voters |
{"table":"inventory.voters","database":"demo", "primary_keys":["id"],"log_file":"mysql-bin.000002", "log_position":27105800,"change_type": "UPDATE","is_deleted":false} |
{"id":"299594688","name":"Ronald Stokes","address":"940 Jennifer Burg Suite 133\nRyanfurt, AR 92355","gender":"m"} |
2021-05-16 T00:40:07.000 +0000 |
2021-05-16 T00:40:07.000 +0000 |
demo_inventory. voters |
{"table":"inventory.voters","database":"demo", "primary_keys":["id"],"log_file":"mysql-bin.000002", "log_position":27106451,"change_type": "DELETE","is_deleted":false} |
{"id":"830510405","name":"Thomas Olson","address":"2545 Cruz Branch Suite 552\nWest Edgarton, KY 91433","gender":"n"} |
id | name | address | gender | datastream_metadata _source_timestamp |
datastream_metadata _source_metadata_log _file |
datastream_metadata _source_metadata_log _position |
---|---|---|---|---|---|---|
207846446 | Michael Thompson | 508 Potter Mountain | m | 2021-05-16 T00:21:02.000 +0000 |
mysql-bin.000002 | 26319210 |
289483866 | Lauren Jennings | 03347 Brown Islands | t | 2021-05-16 T02:55:40.000 +0000 |
mysql-bin.000002 | 31366461 |
308466169 | Patricia Riley | 991 Frederick Dam | t | 2021-05-16 T00:59:59.000 +0000 |
mysql-bin.000002 | 27931699 |
348656975 | Dr. Riley Moody | 89422 Devin Ridge | t | 2021-05-16 T00:08:32.000 +0000 |
mysql-bin.000002 | 25820266 |
385058605 | Elizabeth Gill | 728 Dorothy Locks | f | 2021-05-16 T00:18:47.000 +0000 |
mysql-bin.000002 | 26226299 |
The connector breaks the data ingestion into a multi-step process:
Datastream sends each event with all metadata required to operate on it: table schema, primary keys, sort keys, database, table info, etc.
As a result, users don’t need to provide an additional configuration for each table they want to ingest. Instead, tables are auto-discovered and all relevant information is extracted from the events for each batch. This includes:
This section will describe how the MERGE operation works at a high-level. This code is executed by the library and is not implemented by the user. The MERGE into the target table needs to be designed with care to make sure that all the records are updated correctly, in particular:
First, for each microbatch, we execute an operations such as:
Then a merge operation comparable to the following SQL is executed:
Streaming workloads can result in a sub-optimal size of parquet files being written. Typically, if the data volume is not large enough, a tradeoff needs to be made between writing smaller files and increasing streaming latency to allow accumulating more data to write. Small files may lead to degraded read and merge performance, as the job needs to scan a lot of files.
Further, MERGE queries tend to result in a lot of unused data when new entries for updated records overwrite older entries. The unused records don’t affect query correctness, but degrade both CDC and user query performance over time.
To alleviate the problem, users are encouraged to do one of the following:
Delta Lake is an open-source project to build reliable data lakes that you can easily govern and scale out to billions of files. Delta Lake uses open-source Apache Parquet as the columnar file format for data that can be stored in cloud object storage, including Google Cloud Storage (GCS), Azure Blob Storage, Azure Data Lake Storage (ADLS), AWS Simple Storage Service (S3) and the Hadoop Distributed File System (HDFS). Thousands of organizations use Delta Lake as the foundation for their enterprise data and analytics platforms. Reliability, scalability and governance for data lakes are achieved through the following features of Delta Lake:
Delta Lake is fully compatible with Apache Spark APIs so you can use it with existing data pipelines with minimal change. Databricks provides a managed cloud service to build your data lake and run your analytics workloads with several additional performance features for Delta Lake:
To get started, visit the Google Datastream Delta Lake connector GitHub project. If you don’t already have a Databricks account, then try Databricks for free.