Containerized Stream Engine to Build Modern Delta Lake

Download Slides

As days goes, everything is changing, your business, your analytics platform and your data. So, Deriving the real time insights from this humongous volume of data are key for survival. This robust solution can operate you to the speed of change.

Most compelling operational analysis demands real-time rather than historical, demanding the need for ML/AI algorithm to accept real time work loads to make ever-more-accurate operational predictions.

Data from various sources is pushed to kafka using open source CDC tool (DEBEZIUM). Spark Structured Streaming reads the near real time data and sink it in Delta Lake OSS. Entire data pipeline is orchestrated with Kubernetes to run as scalable pipeline.

Speakers: Sandeep Reddy and Karthikeyan Siva Baskaran


– Welcome everyone, in this talk we are going to share our experience on how data bricks and containerization has helped in building modern data pipelines. Before getting started, first we few words about who we are, myself Sandeep and Karthikeyan has joined along with me. We both are currently working as senior data analyst at Tiger Analytics. Tiger Analytics is an advanced analytics and AI consulting company. The current talk on containerized stream engine is part of Tiger data foundation products, which is an in house Tiger IP asset. So here is how we would like to spend next 25 minutes with you. The objective behind building this product, what are the design considerations and discussions that we took while building this product? How the infrastructure has been provisioned to make it reliable for any platform? We will also talk about detailed solution and CDC logic flow and how we monitor and debug the analytic workloads. And some of the points to be noted and then best practices. So for any questions, you can feel free to post it on the chat windows. So how many of you are working with most recent data from business? And taking real time decisions as part of your analytics workloads? So not an easy thing to achieve it, right? So but nowadays, business needs most recent data to make current decisions rather than working with statical data. But every business depends on multiple source of data to make the decision and all these data’s are stored in different layers. So our objective is to build a single source of truth for the enterprise level data and all analytics workloads can act on the centralized data hub. So having the object to in place, we will see what are the design decisions that we took to build more on data pipelines. So these are the some of the common ways to capture the data from business. Direct JDBC, in this method, we are directly connecting to production database by a sqoop or any other database utility. This leads to increase load pressure on source database, which will impact our production application. Also, in this method, we cannot capture some of the DML operations like deletes, schema changes, as well as we only get the snapshot data. The second method is Dual Writes. In this method, we will write the data into production database and any pub and also to a pub sub system like Kafka. And this data has to be streamed into data like to perform any analytics workloads. But due to dual writes, there are two copies of data needs to be written to two different system. So there might be some performance or latency issues on production applications. The final method is Change Data Capture. In this method, instead of connecting with the database, CBC tools will read the logs from database and pass the plant section happened in database. So advantage of this method is, there is no impact on production DB or production applications. Also, we can capture schema changes and all DML operations. So we have leveraged change capture method for data acquisition. So what is the right storage system to maintain your data lake? To understand that, let’s see the problems with traditional Data Lake Store. As part of schema enforcement and evolution, when there is a schema change in upstream application there’s no straightforward way to change the target schema in traditional data lakes and this will break the flow with schema evolution. As part of the versioning, in case of any failure, there’s no way to roll back to previous version in traditional data lake systems. And as part of data corruption, right? Assume you are writing the data in all right mode, spark will first truncate the data in destination and then only it will try to write the data. So in that case, we’re writing the data, if there is a any failure, truncate data is already lost. As data like does not support any atomic transactions, we contend close the transactions between begin and commit like our DBMS. And traditional data lake systems are not acid compliant. If we consider isolation, for example, when two users are simultaneously performing write operation on same table, it will not throw any error as there’s no clear isolation between multiple writes, this also leads to a data corruption. To overcome all these traditional data lake problems, we have delta lake to rescue. Delta lake by storage files, it’s not a different from data lake. Delta Lake also stores the data in parquet file format, but it needs an additional metadata layer to tag all the transaction capability like atomicity data versioning and rollback to the previous sources in case of failure. So delta uses an optimistic concurrency control method to isolate different user actions performed on the same data at the same time. So this provide transaction level isolation between different writes. Whenever there is an update or overwrite, this metadata layer tracks which files are in add state and which files on remove state. So this is how it becomes acid compliant. For schema enforcement and evolution, we have merge schema and overwrite schema options in delta lake. And for time travel, metadata layer will store the what is the latest state of each and every transaction and also older version of the transaction. So by mentioning specific portion and timestamp, we can go back to the older version of the transaction. And data qualities for the key aspect that we are seeing in the business currently. And delta lake provides the flexibility to create, check constraints on column level. So these are some of the design decisions made for data acquisition and storage. Now, how to deploy data processing engine between data acquisition and storage system? As part of infrastructure provisioning, we want to talk on how we are deploying data processing engine to be available in any platform. So we want to make this product cloud agnostic as we have many customers running the analytics workloads in various cloud platforms like AWS, Azure and On Premises as well. So we have leveraged terraform to provision the infrastructure in various platforms. So as terraform works based on inference a code, we also perform policy takes on infrastructure. Infrastructure provisioning code to meet all organization compliances while provisioning the infrastructure. So some of the example policy checks are no VM should open ports publicly and restrict to use only specific instance types per VM provisioning. Auto scaling of cluster should not scale more than 10 nodes. So we will create policy checks conflict by having all these checks in place and to act on the terraform template. So how we are deploying this product? Based on the selected environment, we will choose the respective terraform template to deploy Kubernetes cluster with necessary ports, services and volumes. We have launched the replica sets as part of the port’s deployment to avoid single point of failure. And we have leveraged various services for Kubernetes internal communication between ports. And also we attached persistent volumes to port to retain the state of the application even after the restart of particular port. It’s an evident that micro services architecture clearly raised the demand for containerization. So having containerization in place, we are making sure that our product can be launched in any platforms with minimal effort. So with that, I will hand it over to Karthik to cover the solution deep dive.

– Thanks Sandeep, hi, everyone. Now let’s deep dive into solution architecture. This is the overall architecture. We use open source CDC tool debezium to capture the change data from source system. As of today, debezium will support these five databases, which is listed here. Kafka Connect, Accessor, data producer or Kafka broker nodes. Kafka Connect uses division connector to pass the logs from source system and write it into Kafka broker nodes. Once it reads the data from source system, it first pushes the schema to the schema registry, again gets a unique ID for each schema and pushes this ID into Kafka along with actual data. This way, payload information in Kafka will be drastically get reduced. As business grows, obviously, there will be a need to add new feature, which in turn change the table schema. Schema registry is used to store the schema with respect to the received data. By doing this, we can switch between different versions of schema. Also, this will help to prevent the breakage of downstream applications when there is a schema evolution. Spark structured streaming, which runs incubators will access a Kafka consumer and process the change data and schema changes into delta lake. This infrastructure and application platform will be bringing up using terraform. This complete changes, streaming queue and processing layer, infra is running in Kubernetes. When there is a load increase, spark can scale up multiple containers to do processing efficiently. Finally, the data can be stored in delta lake which can be written on any distributed file system like HDFS, Blob or S3. It is also possible to write it into local file system for demo purpose, not for real project scenarios. In this demo, we are using open source CDC tool debezium which can support only any of these five databases. In case, if there is a need to read change data from different source system, which is not supported by debezium, then only change we have to replace debezium with some paid series tool. Other than that, no changes required in this entire architecture and this complete architecture can be reused. This is how Kafka Connector properties looks like to connect with Microsoft SQL Server using debezium connector. A number of tasks needs to be given us one always to preserve the order of change messages. If we increase the number of tasks to achieve parallelism, it may lead to process the delete first and then it will process the inserts for the same primary key which throws errors while processing the data in Kafka consumer. In our scenario, it is spark structured streaming. Other properties are connection properties that needed to connect with SQL Server and Kafka broker nodes. Last important properties are, key value converter and registry URL. Our converter is given for key and value converter. Using our format to process the data in Kafka broker helps to reduce the data volume size drastically. Each schema will be registered to their schema registry for which URLS is specified in the connector properties. These are all the connector properties. One point to note here is, for each table one topic will be created in Kafka to track all the DMS, but only one common topic will be created to track all the details for all the tables under one database. This is how sample message from debezium looks like. There will be a lot of information in actual data sent to Kafka, but for this demo purpose, I just took payload information for insert, update and delete. In right side, here we insert one record into sample employee table in source system SQL Server. When debezium interprets this message it sends before and after image of the data, which in this case of insert only after image data will be available, but for updates it captures both before and after image. During deletes we will see only before image once data there is no details available in the system that is SQL Server so after image for deletes will be null always. The reason to understand this is, our complete code logic flow depends on this information. Now, let’s see the high level flow of in logic code logic flow in spark streaming while processing the micro batch. In data pre-processing, split the data into two parts, one will have inserts and updates and other part will have only deletes. The reason for this is, schema of the payload is different in all the scenarios. But for insert and updates, we only need after image as we are maintaining a CD type one. We don’t need to need the before image in updates, but in case of deletes we need the before image to know what data got deleted. Using this primary key , we must delete the data in target system. If we take after image in delete, the data will be null. So, it is not possible to know which information needs to be deleted in delta lake. This is the reason data split is required before processing the actual data. During initial load, create a Delta table and load it by excluding the deletes. As it is a new table, there’s nothing to delete so delegates are excluded during initial load. Once the initial load is completed from next micro batch we will be receiving incremental data. Next step is data pre-processing. Once again for incremental data and process the DDL and DML changes. DML changes will be handled using one single command that is SQL merge. Merge command is used to handle inserts, updates and deletes in single command based on the operation column provided by debezium. Operation column provides information whether the record is insert or update or delete based on C, U and D flags. In first micro batch, spark consumes the initial load data from Kafka and process inserts, updates in one data frame and deletes in other data frame. Once data is split, then take only the recent records for that particular micro batch by using windowing and rank functions. This is a sample code snippet for doing this in spark and scale up. As a result, final data will look like this. As this initial load for the table, deletes needs to be excluded while inserting into target. Finally, the target data in target delta table looks like this once loaded. Now, initial load is done. In next micro batch, we are receiving incremental data. Let’s say data is incremental data consumed from Kafka. This data may contain inserts, updates and deletes. Also, for the same key there may be multiple entries as well. In the sample data ID 11 have two entries, same data pre-process step needs to be repeated for all subsequent micro batches. Only difference from initial load is, instead of excluding deletes we will use merge command. After pre-processing data looks like this, with only recent records on each primary key. Consolidated incremental stage the data is in the left side. To handle schema evolution, schema auto merge property needs to be enabled. This property is only available from Delta Lake not 0.6 and higher versions. Using merge command update will bring the key from incremental data match with Delta table and operation flaggers update orders deletes when primary key matched and operation flaggers delete. At last, insert when primary key from incremental load is not merged and with the Delta table and operation flaggers delete. Table highlighted in orange is the Delta table before applying merge. After applying merge the data table looks like this which is highlighted in green in right side. We SUMMIT spark application by talking directly to Kubernetes API servers on the master node which will then schedule a port for the spark driver. Once spark driver is up, it will communicate directly with Kubernetes to request spark executors, parts and start reading the data from Kafka topics. We have maintained spark streaming checkpoint in file system according to the platform that be used. For example, in AWS we used EFS and for in Azure we used Azure file share. So we ensure metadata check-pointing will not be lost even during restart of spark driver. Some of the key benefits to be noted while running spark on KS is, dev team will have more control on building the containers with necessary dependencies. Easy to migrate the job to any environment like Pro Cloud providers or On Premises. With spark jobs are idle, resources can be used by other application, which is not the case with other resource managers like you nesis. Next, let’s see about application monitoring. Centralized monitoring for both application and infrastructure has been built using elastic search fluent D, metrics beat and Kibana. Elastic search is document oriented no SQL DB stores that it stores the logs in inverted JSON index. Fluent D and metrics beat access an agent to transfer the logs into elastic search. Fluent D is used to collect the logs from containers running under Kubernetes cluster whereas metrics beat is used to collect the logs, which is from node and port metrics like CPU usage, RAM and disk usage. Kibana is used to visualize the various metrics captured in elastic search. This is how monitoring dashboard looks. Currently we have two node Kubernetes cluster and 34 port containers are running. It displays CPU and RAM usage across nodes and ports. This is our sample dev cluster. This can also be drill down to check which part is consuming how much CPU and RAM. Now let’s see some of the notable points. With respect to debezium, in order to track the change data, primary key is mandatory. Without primary key, it is not possible to track the change data. By default, Kafka Connect will create one topic per table with only one partition. Due to this spark job will not get paralyzed. To achieve paralysm, either configure it in Kafka Connect properties, or create a batch script to create a Kafka topic with greater number of partitions based on each table. One topic will be created for each table under the database to track all the DMS and only one common topic per database to track all the DDLs. With respect to spark, if typical small file problems are handled when merge happens, there is no need to rewrite a lot of files. In return, this will improve your micro batch performance drastically. There are a few ways to achieve this. Optimize in data bricks runtime will compact the data in delta lake, but this option is only available in data bricks runtime as of today. In delta lake OSS, right compaction logic with data change property as force. This is one of the salient feature in delta lake. Writing this way we’ll preserve the previous covers and overwrite entire table by evenly distributing the data. If this delta table is used as streaming source by any downstream applications, compacting data using data since property as false will be will not interpret streaming applications. Yes, compaction will make a new commit, which the downstream application will think it received new data to process but actually it is not true. So use this property with care. Another option is to enable adaptive execution mode. This will automatically control the number of partitions while writing it into delta lake. Though check-pointing directory consolidates the Delta log files in order to travel to specific version, spark will not trade checkpoint directory in metadata log. If we remove this tail commits using vacuum then it is not possible to time travel to specific commits. Thanks everyone, for joining us. Your feedback is important to us. Don’t forget to rate and review our session, thank you all.

Watch more Data + AI sessions here
Try Databricks for free
« back
About Sandeep Reddy

Tiger Analytics

Sandeep Reddy Bheemi Reddy is a Senior Data Engineer at Tiger Analytics with experience in building advanced analytics solutions to make business driven decisions. Extensive experience in designing & implementing modern data pipelines in Cloud platforms (Azure & AWS). At Tiger Analytics, my focus is on the evolvement and delivery of technology to enable large scale data processing for supporting machine learning and AI analysis on Enterprise Data Analytics Platform.

About Karthikeyan Siva Baskaran

Tiger Analytics

Senior Data Engineer at Tiger Analytics with experience in building advanced analytics solutions to make business driven decisions. Extensive experience in designing & implementing modern data pipelines in Cloud platform. At Tiger Analytics, my focus is on the evolvement and delivery of technology to enable large scale data processing for supporting machine learning and AI analysis on Enterprise Data Analytics Platform.