A High Performance Mutable Engagement Activity Delta Lake

May 26, 2021 05:00 PM (PT)

Download Slides

In Salesforce, our customers are using High Velocity Sales to intelligently convert leads and create new opportunities. To support it, we built the engagement activity platform to automatically capture and store user engagement activities using delta lake, which is one of the key components supporting Einstein Analytics for creating powerful reports and dashboards and Sales Cloud Einstein for training machine learning models. 

To convert leads and create new opportunities requires our engagement activity delta lake to handle data mutations at scale. In this presentation, we will share the challenges and learnings from building a high performance mutable data lake using delta lake which will include:

  • Independent Stream Process to Support Engagement Data Life cycle
  • Downstream Incremental Read
  • High Throughput Transactions in Engagement ID Mutation
    • Detect Cascading ID Mutation with Graph
    • Data Skipping and Z-Order with I/O Pruning
  • High Data Consistency and Integrity
    • Exact Once Write Across Tables
    • Global Synchronization and Ordering
In this session watch:
Heng Zhang, Developer, Salesforce
Zhidong Ke, Software Engineer, Salesforce

 

Transcript

Zhidong Ke: Hello everyone. This is Zhidong with Heng here. Today we are going to talk about our journey of building high performance mutable engagement at Delta Lake in Salesforce to support Einstein Analytics. Here is the agenda. First, we will cover what is engagement at Delta Lake in Salesforce and why we are building it. Then we will talk about the pipeline requirements and the design, like how we’re going to support our customer’s incremental reach, and how our jobs achieve exact once write across multiple tables in one transaction.
After this we will dig into the challenge of the handling mutations including performance, tuning, benchmarking results. Lastly, we will keep some time at the end for Q and A. First of all, at Salesforce our customer are using High Velocity Sales to intelligently convert leads, create new opportunities. We built engagement activity platform to automatically capture and store our user engagement activities. The engagement activity platform is a key component to supporting Einstein Analytics creating powerful reports, dashboards, Sales Cloud Einstein for training machine learning models.
This large amount of data can only be scaled using engagement activity Delta Lake, which built on top of Delta Lake. Here is one key use case of engagement at Delta Lake. For example, our sales rep can use engagement metrics including number of engagement metrics, rates, to easily identify which cadence, templates is more effective to run a campaign. For example, in engagement dashboard we can easily get how many emails sent, opened, replied, discovered and how many phone calls are meaningful contact or not interesting or left voicemail for any dates, any sales rep. We can leverage the engagement dashboard to drive intelligence into sales productivity.
With all these use case for our product management, we convert that into our technical requirements. First of all, we need an independent stream process to support engagement data lifecycle. And we have to have multiple downstream consumers. Some of them need incremental reach, some of them require batch process reach. For example, one of our customer pool our data periodically to generate a snapshots and build up engagement dashboard. Last but not least, unlike our immutable data lake we build before the data engagement is mutable, which is in a very high throughput and transactional, which is one of our biggest challenges.
And one more thing, we have to make sure our data has to be consistency and has a very high integrating. And so all of this bring a huge challenge for us. We will start with a high level architecture. As you can see in this diagram it includes three major components. While on the center is our data lake table, as you can see, and on the left is our injection jobs, which consumes engagement data through our internal CAFCA queue and appending into our engagement data table. On the right side is our mutation component, which consume implementation request also from the CAFCA queue from upstream and apply the mutation to our engagement data table. On the top side is the pipeline that handle retention requires which ensure all data policy for our customers. With list design, each stream processing is independent and has its own life cycle.
When it comes to our first requirement, the batch and incremental reach, the Delta API is quite powerful. It come with the native support for batch read such as time range query from Q1 to Q2 or organization layer queries like return all data for specific org. It also supports incremental reach that returns all data since any checkpoint, but there is a one prerequisite for this feature.
It ask for the append only data table. But remember we just mentioned our table is mutable. We will have come up with our own design to handle the incremental reach. As you can see in the figure two, we create a separate a table called the notification table, which is a partition by organization ID and the injection time. When engagement is generate into our data table or where existing engagement data is updated or deleted, we would create a notification and insert in to our notification table. Then we’ve notified our downstream consumers. Say, oh, there’s a new data coming in. Yeah. We found there are many jobs can feed into this design pattern, like our mutation job, data retention job or GDPR jobs. We extract lists out and make it more generic.
As you can see, we have these schema showing here. We use OrgID as the partition key and a current user engaging the page as a query pattern. Under table name, which is what is the table list data writing to, and then last modified time. We also keeps some counters like update/insert/delete for auditing purpose and the AppName, which app writing to this notification.

Heng Zhang: We also want to achieve a high throughput of transactions in ID engagement ID mutation, which is one of our critical use cases. What is ID engagement ID mutation? Each engagement activity has an owner. The engagement ID mutation is triggered when a row of the owner has changed. Take convert for example, a lead L could become a contact C with a new ID and all engagement that belongs to L will have a new Engagement ID. Our Engagement data table have three columns. The OrgID, the engagement date is the partition column and engagement is a struct with the engagement fields describing this kind of activity. Move to mutation request table, the column of OrgID and engagement date is used to join the partitions in the data table. The mutation operation could be either convert, merge, or delete. The key is the ID of the engagement identification, which is used to locate the particular engagement record in the data table. And the insert record is the new engagement activity record, if applies. We have done the optimization at both data layer and application layer.
In data layer, we partitioned our table by OrgID and Z-Order by engagement date. With this partition strategy, data are written to a partition directory of org and clustered by Z-Order column of engagement date. There are multiple benefits of this partition strategy. First, we can manage the granularity of a partition, small files per org and date can be merged into a bigger one to help reduce the number of small files and also the data of engagement table evenly distributed across reasonably sized files. Furthermore, we can tune the partition granularity with the config of MaxFileSize. We also optimize the query by I/O Pruning with data skipping and Z-Order.
What is data skipping? Delta Lake automatically maintain the minimal and maximal value for up to 32 fields and store these values as part of the metadata. By leveraging the minimal and maximal ranges, Delta Lake is able to skip the files that are out of the range of the query field values. In order to make data skipping effective, data can be clustered by Z-Order columns so that the minimal and maximal ranges are narrow and ideally non-overlapping. These are the five examples from our data table. Take a look at the column of size. You can see that each file size is around one gigabyte, which indicates that the data are evenly distributed. Move to the column of minimal value, engagement, date, and maximal value engagement date. The date range of each file you can see has no overlapping and is pretty aligned. Zhidong.

Zhidong Ke: There is one more optimization we explored in application layer to reduce the number of updates. As we just mentioned, mutation could be cascaded, and then we will have to maintain the ordering of mutation requests. In this example the user can change their ID, like workflow ID from one to two to three to four and so on. But eventually will request just need to update their workflow ID for one to four. But we also, in this case, we get four request from upstream. We have to design an algorithm that can reduce, like pre-process, those requests and the reduced amount of a request. Here is the diagram, as you can see, first we read from mutation table, get all requests. And the group lamb by organization and sort them by execute mutation, execution time. For each group, we use a Spark Graph API to build a direct graph. Well, the node is ID and then the age is the request. And then we find the all kind of connected components based on the graph direction and find each nodes corresponding final stage.
Finally, we convert these sub-graph into a list that can be passed into our Spark Delta query to update the data table. With this optimization, we reduce a huge amount of unnecessary operations significantly thus improve the performance. But we have one more challenge. How to ensure the high data consistency and integrity. Let’s go through one example that we faced in our practice.
For example, our job, as we mentioned, writes into multiple tables in one transaction. The ingestion job first write into the data table then write into notification table. What if first write succeed, then second write failed? Now our customer will completely lose the notification. Lost layer beta, right? That becomes a very, very common problem across all of our projects designs. And it need to be resolved. We designed a pipeline that we want be able to exact. We call it exact once write across multiple tables. That requires any writes into the table operation in transaction fail, it should not commit. And if there’s any fail to write, our job will only reprocess it. To achieve this requirement, we definitely need to save something like some metadata, similar as our Delta log, which includes some stage of each write and the last succeed check pointing offset things, our source is Kafka. We create something called checkpoint store that stores the start, end offset, and a Kafka message, Kafka metadata and last job state for given checkpoint.
The first task of our Spark job is to read this store and get to the last checkpoints metadata, and then the last job state. Let’s go through these happy paths flow first. As you can see on this table, we have a two records, one for the same ingestion job. There are two process here. One is the data ingestion process. One is the data notification process and both batch ID are 10. Our Spark job will first fetch the batch ID from the Spark API and then compared to this batch ID. If they are same, if they are matched means our job completely succeed in this room. It will start pulling data from the store, our checkpoint store to get the new data from Kafka message and process of the file sequence. After all this it will check pointing bach to the batch metadata store.
And let’s go through the unhappy paths. In this case, you can see the data notification process is ID, the batch ID is nine, which means it was not succeed in the last batch. Our Spark job will go to retry for this specific process by pulling from Kafka for previously data. And only process that data notification process. Then if it succeed, it will check pointing bach with batch ID ten to kind of catch up for this last failure.
With this design, we are able to perform exact once write and if there are any process fail, it could retry and only retry the specific process in the next scheduled job. No duplicate write, no data lost.

Heng Zhang: In order to maintain a high degree of consistency and integrity we came up with another design called global synchronization and ordering, because our pipeline has multiple independent process strings, which concurrently rights to the data table. It falls into the race condition model where the shared resources are our engagement table and the threads are the independent stream process. We need a global synchronization to avoid a conflicting command. And we also want to ensure the engagement life cycle ordering in our data process. And we want to apply the two in micro batch level. We create a component called a job coordinator, which used a ZooKeeper distributed a lock to ensure the global synchronization. And we implement the compare and swap to ensure the ordering.
In order for the job coordinator to work, it takes three parameters. One is the lock domain. It indicates what resources to protect. In this case it is our data table. And it takes the name as a second parameter to indicate his self and the third parameter is predecessor to indicate what job should be wrong first.
With this three parameters, this is how the job coordinator works. First, the streaming job starts and the job coordinator is initialized with a ZooKeeper. And then it pulls the data from Kafka periodically and starts a micro batch process when message arrives in Kafka. Within a micro batch process, the job coordinator first try to obtain a disputed log with the given resource name, sad in the lock.name config. If they cannot obtain a log within a given name, it gives up. The next pull will start from the last two checkpoint. Once it obtains the log, it reads the predecessor field and it compares it with the expected one, sad in the job coordinator predecessor config. If the predecessor is not expected, it gives up his term release the log and next pull will start from the last checkpoint. If the predecessor expected, then it just register himself set in a job coordinator.name config. So after a micro batch successfully obtain a lock and validate the predecessor it starts process and save the checkpoint when the process is complete.
With all this design in places, we did a performance benchmarking for our ID mutation jobs. We use the cluster of 32 note with the note type of i3 8x large. And we set the maximum file size equal to 128 megabytes. Then we are able to handle 28 million update and delete request within eight minutes. And finally, we list all the blog post and video recordings of our pipeline design in details for your future reference. And thanks for watching our sessions and have a good day.

Heng Zhang

Heng Zhang

I am a software engineer who is interested and specialized in micro services, distributed systems and big data
Read more

Zhidong Ke

I am passionate in designing distributed systems, real-time/batch data processing and building applications.
Read more