Leveraging Apache Spark and Delta Lake for Efficient Data Encryption at Scale

Download Slides

The increase in consumer data privacy laws brings continuing challenges to data teams all over the world which collect, store, and use data protected by these laws. The data engineering team at Mars Petcare is no exception, and in order to improve efficiency and accuracy in responding to these challenges they have built Gecko: an efficient, auditable, and simple CCPA compliance ecosystem designed for Spark and Delta Lake.

Gecko has allowed us to simultaneously achieve the following benefits within our data platform:
– Automatically handle consumer deletion requests in a compliant manner.
– Increase the overall security of PII data in the Petcare Data Platform (PDP) Data Lake.
– Maintain Non-PII data structure, in order to continue to provide analytical value and overall data integrity.
– Make PII data accessible when required.

These benefits have been achieved by a conceptually simple solution: using row (client) level encryption for all PII tables in our system, whilst storing the encryption keys in a single, highly secure location in our lake. By leveraging the power of Spark and Delta Lake, the Gecko ecosystem can carry out a full encryption of all personal data, automatically handle consumer data requests, and decrypt personal data when required for other engineering or analytical projects.

The process has the added benefit of generating a huge labelled training dataset containing all PII in the PDP, for future use in the design of a machine learning model for automatic PII detection. A tool such as this would then enable us to remove the risk of human error when labelling PII on ingestion, as well as enabling PII removal from free text fields.

This presentation will share:
– How the solution can achieve automated privacy rights requests and enhanced platform security.
– How Spark & Delta lake have been leveraged in these applications.
– Why these technologies have been essential in achieving the necessary requirements.

Speakers: Jason Hale and Daniel Harrington


– Hey, good afternoon everyone or good morning, depending where you’re in the world, and welcome. Today, Jason and I are here representing Mars Petcare. And today we’ll be discussing how we leveraged Apache Spark and Delta Lake for efficient data encryption at scale. I’m just gonna take you through the agenda today. So first I’m gonna start off with a bit of background, about the authors, Mars as a company and the Petcare Data Platform which is the foundation on what our team here is built. And then I’m gonna take you through a bit about CCPA ’cause this is the main motivation for the project, and how this has enhanced consumer privacy rights and consumer data protection. Then finally, the main bulk of this project is gonna be around Gecko. And this can be a deep dive into our Bespoke Automated CCPA compliance tool. Within this section, we’ll be discussing the encryption and how we’ve utilized Spark and Delta Lake in order to make this possible on a large scale. And so background first. So, there are two authors for this particular project. So my name is Daniel Harrington. I graduated last year from the University of Bath, with a Masters’ in Integrated Mechanical and Electrical Engineering. I’ve since transferred to become a data engineer here at Mars Petcare, and I’ve been working here for around 10 months now. And in my time here, I’ve worked solely on Gecko, which is the Bespoke encryption layer that we’ve designed here in the engineering team.

– And hi, I’m Jason. I graduated two years ago with a Masters’ in Physics from the University of Exeter. And I’ve been a data engineer here at Mars Petcare now for about 16 months. And in my time here I’ve kind of been split between two different projects. So the first eight months I’ve been here, I’ve worked on the design implementation of our ELT framework. And in the last eight months I’ve been working on Gecko resound. So just to begin with a bit of an overview on Mars Petcare as a company. So Mars are a broad range and the Mars family of a broad range of companies, that are constantly evolving, ranging from confectionary products and food to Petcare as well. So moving into Mars Petcare, Mars Petcare began with the pet nutrition brand called Chappie, and has since expanded to a broad range of businesses expanding multiple different sectors within Petcare. So some of the areas that these businesses work in include pet nutrition for example, Whiskas and IAMS and Chappie, as well as Pet Health or Pet Hospitals, for example Banfield and VCA, and also tech products, for example Whistle, which is kind of like a Petparts for dogs. So now moving away from Mars and looking more into the Petcare Data Platform and the team that we work in, the Petcare Data Platform is the team that basically manages a platform anonymized data from lots of these different businesses from within Mars Petcare. So the kind of function of this platform is Kyte which is our ingestion framework and our ELT Pipeline. And this is the pipeline which connects the multiple different business units from within Mars Petcare, and standardizes that data into a final output. So you can imagine that a lot of these different business units have a lot of different systems that we’re pulling data from. For example, whether it’s a Postgres database SQL Server or Azure Blob. For example, and we bring this all in and standardize it into a single output. We’ve then build engines and design processes on top of this output, in order to enhance the business integrity of the Petcare Data Platform. One of these being Gecko which is our CCPA compliance ecosystem designed for Spark and Delta Lake. So if we have a brief look at the architecture diagram of some of the things that the Petcare Data Platforms do. So if we look here on the left, we’ve got Kyte which is our ingestion framework, and this is really where we bring in everything from the multiple different sources into an outfit ready for use for the other engineers in the analysts within our team. We then have transformations on top of this output Gecko being one of them, which we’re gonna be talking about for the rest of this presentation. But we also have a framework for dynamic transformation which is driven by Jason’s context. Then on top of these transformations and this transformed data, we have what we call Engines, which are essentially Data Lakehouses or data models, which are built on top of these transform data sets, in order to add business value for analysts unexplained the teams as well. We then also have a bit of marketing and activation work as well as Web Services, which we use within the engineering team to help things like monitor and trigger runs.

– Okay, so I’m now gonna give you a bit of background around CCPA and how this affected the design of our projects. So CCPA, for those that don’t know is the California Consumer Privacy Act. And this has been effective from January, 2020. Its the first law of its kind in the US and it gives users more rights surrounding the data that they share with businesses, and how this data is stored, used and shared. So there are three rights, three key rights that come with CCPA. The first of these is the Right to Opt Out. So this gives users the Right to Opt Out of having that data sorts of third parties. The second of these is the Right to Request Disclosure. So if a business has to disclose all the data that they have around a consumer. And thirdly and most importantly for this project, is the Right to Request Deletion or the Right to be Forgotten. And this gives you the right to have all of their personal data removed from the systems of business. So whilst this sounds simple in concept, our old process that we use not to become CCPA compliant was actually quite complex because we have to find every instance of an ID in our Data Lake and physically Delete the row. In our Data Lake, as large as complex as ours, this process was very difficult to manage and required. a lot of manual work for us to become CCPA compliant. In order to improve on this process, we came with a three-part mission which formed the basis of Gecko. The first of these parts was to handle CCPA Rights to Forget requests efficiently, safely and effectively. As there could be many dependencies between different data sets in our Lake. And people may occur in any number of different locations. We want to ensure that we designed a consistent, reproducible and trackable method to ensure the requests could be filled with confidence and in a quick and efficient manner. We also want to make this process as simple as possible for audit purposes when we can clearly show what steps have been carried out in order to fulfill a Right for Forget requests. The second part of our mission was to increase the overall security of PII-data in the Petcare Data Platform Vault. As this is a very important aspect of what we do here in the engineering team. And thirdly, wanted to maintain the Non PII-data structure in order to continue to provide analytical value and data integrity. PII forms the basis of some really interesting and valuable projects within our team. So it was a key aim to ensure that this PII that we are allowed to use is still available for use by our analysts. So how do we achieve these goals? Well, I’m now gonna talk to you a bit about the Gecko Ecosystem, and I’m gonna explain some of the core concepts behind this. At its heart, the Gecko ecosystem is our Bespoke automated CCPA compliance tool. And the key concept behind it, is that we utilize row level encryption for PII data and store these encryption keys in a single Delta Lake in our table. Gecko is made up of two core functions. These being the Gecko Crawl and Gecko Delete process. Gecko Crawl is what handles the encryption and decryption of PII data. And we also generate a Master Table which contains all PII within the Petcare Data Platform. The Crawl process was also the namesake for our projects. A quick Google search of animals that crawl will quickly return gecko as the first result. So whilst not a lot of time went into naming this, we’re quite proud of the name and we hope you are too. And the second of these aspects is Gecko Delete. And this is the aspect that handles CCPA compliance through reduction of encryption keys. The main focus of this talk is going to be on Gecko Crawl, as this was a computationally intensive process as we had to process all of our data. And this is what required most optimization in Spark and utilization of delta tables.

– So now we’re going to dive into a little bit more of the technical details of how Gecko Crawl and Gecko Delete work. So to begin with, I’m gonna introduce you to our architecture diagram for Gecko Crawl. And before I go into the details, I just wanna point out that this diagram is split into three main sections. So the first you see this arrow here on the left, this represents the first section where we deploy our config. The second section, I’ll refer to is this bit in the middle where it says the encrypt banner over it. And this represents where we encrypt data. And then the final section is this section on the far right, with the yellow strip. And this represents when we’re talking about the Master Table. So to begin with, to go through this diagram, the first thing we do is deploy our conflicts on runtime. These conflicts contain information on which data needs encrypting and generates an encryption key for each row, for each table that is being run. And the reason we do this on runtime is to ensure that we have encryption keys generated for every row, since data maybe ingested into the Lake between when Gecko was last run, and when it’s currently running. So after we’ve generated these encryption keys, what we do is we thread notebooks at the source-level and then at the table-level, and this is to ensure ultimate parallelism. And then once we threaded at the table-level, what we do is we then encrypt the data, and we write, encrypt this data at a three right locations within our ELT process. So if I point you now to these three little directory symbols at the bottom raw schema and output, it’d be worth remembering these as the three right locations within our ELT process as we’ll touch from these again later in the presentation. Then finally, once encryption is complete at each of these three locations, we then generate something called a Master PII Table which contains all personal information encrypted within the Petcare Data Platform. So the first thing that we do then is generate the encryption keys. And the way we do this, is we loop through each data source in each table specified in the run and first generate something called a salt-key for each row within the table. These salt-keys are simply a random 16-byte binary strings. And this salt will provide the basis of forming the encryption key later. So what we actually do is we produce one salt-key per Source_Id, and Source_Id is what we refer to as the Id_Column at the base of the encryption on Forgiven table. And here it’s important to know that, if the table has a Client Id, we’ll prioritize the Client Id as the Source_Id, rather than the primary key of the table and this is because if we receive a CCPA request, typically that will be in the form of a Client Id. So we want to ensure that if a Client Id exists in the table we want an encryption key generated for it. So here we can see an example of this table and what it looks like. So if we look at the different columns, we’ve got the data source this will be the data source that’s specified in the run. We’ve then got the ID column which will typically be the Client Id if not primary key. Then we’ve got the Source_Id which is the value of that ID itself. And then we’ve also got the salt pair as well which had the random 16-byte binary string associated with that ID. So you may be wondering how this fault is actually used to encrypt data. Well the salt-key combined with a password is, what generates the encryption key. These passwords are stored in Azure Key Vault, and are actually still completely separate from where we saw the salt-keys. So this table that you see here is actually stored that Delta length table within our Lake. And the reason we actually separate these out into two, is to just to add an extra layer of security. So you can imagine that in order to actually obtain the encryption key and decrypt data, you’d have to know how the process works of combining the salt and the password, and further you’d need to have access to two different systems the Delta Lake table and the password and keyboard. So it really just provides an extra layer of security. So how do we actually generate this table? Firstly, what we do is we generate an incremental data frame for each table that is executed as part of the run and that will be done through a Pythonic for loop. Then once we’ve got those incremental tables for each table, what we’ll do is we’ll union them together and aggregate to generate the final days frame to ensure that we’ve got a single Salt per Source_Id. We optimize this process by not actually calling any actions within the four loop. And this way because of a lazy evaluation Spark can actually iterate through that four loop pretty quickly and build up the execution plans required without any calling without any actions. Then when an action is called on right, when we actually go to deploy this config, that is when Spark can efficiently distribute this data across the cluster and process this deployment as fast as possible. We also have the added benefit here of knowing exactly which tables and ID existence. And this is making use of PII Sparks collect list function. And this is that final column that you see here in the table, tables exists. So since we’re iterating through every table we actually have that as a value within the incremental data frame. So then what we can do is when we run that aggregation or window function, we can run the collect list function bring those tables together. And we know exactly what tables, the specific ID existed. And this can be particularly useful again, to CCPA reasons in case we need to know where an ID exists. As well as you see actions within this fallway, we also utilize those late features too. So for every deployment of this table, we run an optimize and a Z-order on Source_Id, as well as partitioning the table by data source. And this way, when we then joined back to this table to get the encryption keys when we’re running the encryptions, we can ensure that the queries are executed much more efficiently. So after fine-tuning this process, thanks to Spark and Delta Lake, we managed to get this deployment down to roughly 10 minutes per source.

– So as previously mentioned, the data encryption aspects of the Gecko Crawl process is very computationally heavy. And we’ve got to implement some different tools which enable this process to be done more efficiently and in a reasonable time frame. The first of these is to utilize multi-threading of notebooks at the table level. And this allowed us to encrypt data in parallel and maximize the huge usage of Spark. We also have three locations which required to encrypt due to three right locations in the ELT process. And this is what Jason mentioned earlier with the raw schema of verified and output locations. So this again increases the complexity that we’re dealing with in the Crawl process and in terms of how the actual encryption works, the first thing we do is that when we’re encrypting a table, we read it in, we then join it to the ID_SALT table, and this allows us to derive the salt-key by using the given Id_Column that we have in the ID_SALT config. From here, we then apply a Fernet encryption udf and this is applied across all PII columns. The Fernet package is actually built up into the Python cryptography package and it’s a symmetric encryption mechanism. And we’ve actually put this into a Pi Spark udf, which can be applied row by row to ensure that each row is encrypted as it should be. Once we’ve encrypted the data, this is then validated. And we overwrite the existing ingested data. We also have the ability to decrypt and obtain the original PII value when required and permitted. And this is actually inbuilt into the validation step. So the first part of the validation that we do is performed counts to ensure that no data has been gained or lost when performing the encryption. And the second part is that we decrypt the data and write it out. And this ensures that all the data can that we had originally can be regained and nothing is corrupted or lost. And here we have an example of some encrypted data. So on the left and right you can see some non PII fields which had just been left as is, but the no column here is a PII column. And via the encryption, we now have been left with the Fernet token in place of the original data that was there. So if anyone could gain access to this table that shouldn’t, this data is completely meaningless. And the only way that they could regain the value, is that they had access to the passwords, the ID_SALT table and also a knowledge of how the process works intrinsically. And so going back to the three right locations that we have, we’re actually dealing with two different types of files as well, which again adds another layer of complexity. So within the raw and schema of verified locations, we’re dealing with Parquet files. And here we’re actually dealing with individual files for each date ingested. And we’re actually required to encrypt each file path one by one, in this is up to 600 and some cases. And so this meant that the encryption process was really hard to optimize, as we were dealing with a huge number of individual parts. Conversely in the output location we’re actually dealing with a Delta file format and here this massively simplified the process as we’re only dealing with a single delta table for all dates and a single path to encrypt. And of course the encryption process was easily optimized by Spark due to things like partitioning. So I’m gonna take you through a few issues that we came across during the optimization of the Parquet aspects. So some initial shortfalls that we had was that we’re actually looping through file by file. And as I said, this could be up to around 600 files. And another issue was that data wasn’t partitioned across the cluster, leading to extremely low utilization. And what we can see here on the right is a Ganglia screenshot of our cluster. And we’re operating at very low utilization around 4.7% on average. The issue of partitioning of data was made worse by skewed data sets. That meant the data wasn’t partitioned well across the cluster. And this typically occurred within a data sets that had free text fields. And the consequence of having such a low cluster utilization was that not only runs were extremely slow and were actually hitting the 48 hour at Databricks notebook limit, but we’re also wasting lots of cluster resource and also hence wasting lots of money. So the solution that we came up with was to implement parallelism with threading and make the use of Apache Spark features. So the first thing that we did was increase the number of partitions after the shuffle. And this hence removed the skew effect and ensure the Spark was operating at maximum parallelism because we had an even distribution of data across the cluster. We then also implemented Python concurrent features via multi-threading. And this allowed us to execute encryption logic for multiple Parquet files across multiple workers in parallel. And the consequences of this meant that, we weren’t waiting for a task to finish before moving on to the next file which massively increase the speed. And as you can see here from the Ganglia screenshot on the right, we actually operated a much higher cost of utilization for the same process with a baseline of around 60% utilization going up even as high as 90%. And this was obviously very beneficial as we increase the cluster utilization this in turn reduce run time from days to hours. We also did experiment with a number of Python workers that we were using, but we actually saw a high utilization at a lower number of workers. And this really showed that there was a balance required in order to find the most optimal configuration. As we have reduced the run times by such a drastic percentage, we didn’t spend too much time hyper parameter tuning.

– For the third and final piece that I wanna talk to you is about within Gecko Crawl is the Master Table. So here, this is where we collect all of the PII within the Petcare Data Platform into a single Delta Lake table within our Lake. And this table contains fields for each PII attributes. So Name, Phone Number, Email Addresses and Notes. And each of these fields contains an array of encrypted data for each PII for that specific Source-Id. So if we have a look here at the, an example of the table, we haven’t managed to fit all of these fields on the slide, but you can see here we’ve got columns for data source, table name, the Id_Column and the Source_Id. And these are columns which can match to that ID_SALT table before. And then on the right hand side you can then see a Name Master, Address Master and we have those other columns as well. And what, this is an array of every name for example, that is associated with this ID. So you can imagine if there’s fields say for First Name, Last Name, perhaps Next of Kin First Name, Next of Kin, Last Name. I think it will be collected together into a single row. And the reason why we did this, is because we thought that given that we were looking through all of the tables anyway, all of the PII tables, we thought it would add some business value in the future to actually collect all of the PII into one place. And the reason being is that in the future, we’re hoping to create some sort of custom-built in-house NLP model, which we can use for potentially PII detection in the future.

– So I’m now going to talk you through the Gecko Delete process and show how this process has massively simplified our CCPA compliance. So the Gecko Delete process offers superior simplicity consistency, and tractability. And this is extremely important for manager of the process and also audit scrutiny. And the process is as follows. So request is ingested into our configs and this then triggers the delete pipeline. From here, the request ID is used to filter our ID_SALT config and the relevant Salt is redacted. And that is how simple it is. The client’s PII is now irretrievable, and the Non-PII structure is maintained. I’m not going to take you through a visual example of how this actually works. So on the left-hand side, we have an example requests coming. So this is coming from a Mars Petcare Business Unit, and in this case Banfield. So we get this ID come through. This is ingested into C-SPAN into one trust which is the Mars wide CCPA piece of software. Once we have this in one trust, we then can identify the client within our Vaults. From here, we identify the client within our ID_SALT config, and their Salt is reducted. And without the SALT_ID the encryption key can no longer be generated as Jason said earlier, it’s the salt and the password which make up the encryption key. So without that Salt, that user’s PII data is no longer retrievable. And by only removing a single record we’ve actually met, we’ve actually achieved the fact the client’s PII can never be retrieved. And also that all of the Non-PII data stays exactly as it was in the Lake. So if you can imagine in the middle of that row will stay exactly as it was in the Lake, but that PII is no longer retrievable. And this shows how it really is a consistent reproducible and tractable method.

– So now I’d just like to summarize the key benefits. That we have through Gecko as well as some of the future work. So we have five main key benefits that will be identified the first being Data Security. So as we’ve mentioned previously, every instance of an individual PII is encrypted, which means that data can never be personal data, can never be just viewed on the fly like it used to be able to. The secondly, Speed. When we receive a CCPA request it’s now just a single filter and reduct in order to be CCPA compliant, rather than having to remove that ID in every instance that it existed. Thirdly, Auditability. So we have a Bespoke logging system in place, so we can keep track of where PII is being used, where it’s been redacted from and where it hasn’t. And this just makes things really really easy and simple. Forthly, Automation. So this process can now be easily monitored as part of the BAU process. Again, rather than having to spend a large amount of time trying to find every instance of that ID. And finally, Data Integrity. So by not deleting the rows a data structure and the integrity is maintained. So finally, just to wrap up some points around the work that we’ve got left with Gecko. So firstly, we’d like to build a future custom NLP model to use for all types of PII detection on ingestion. We’d also like to create an API layer over a key access in order to enhance speed and security. And finally, we’d like to integrate the Gecko module within the ingestion process itself. So rather than having two end-to-end processes one for the ELT pipeline one for Gecko, we just use Gecko as a module that is called within the ELT Platform itself again to enhance speed. Thank you for listening, And we look forward to hearing any questions you may have.

Watch more Data + AI sessions here
Try Databricks for free
« back
About Jason Hale

Jason is a technology and sustainability enthusiast with a first class Masters in physics from the University of Exeter. His career began on an analyst grad scheme within the Energy sector, however he very quickly realised he was on the wrong career path when he was working with Excel spreadsheets all day whilst teaching himself Python and SQL in his spare time. Quitting after 5 months, he then went on to join Mars Petcare as a Data Engineer. In the 16 months that he has been working with Mars, he has been on various projects including ELT and API framework design and development, as well as the design and implementation of Gecko: the long term strategy for CCPA compliance.

About Daniel Harrington

Daniel graduated from the University of Bath in 2019, with a Masters' Degree in Integrated Mechanical and Electrical engineering. Since joining Mars Petcare as a data engineer in January 2020 he has been involved in designing and building out Gecko: A bespoke CCPA compliance tool to be used within the Petcare Data Platform.