Information Solution Architect with over 20 years of experience. Extensive background in Data Management, Big Data, Information Systems, Data Governance as well as process and project management. Implementation of numerous solutions across a host of different architectures including IBM, Oracle, open source and datawarehouse appliances. Experience in database design, DBA, data integration, Security, Big Data, Business Analytics and advanced analytics. Implementation of open source software encompassing Hadoop (and peripheral components), Spark, R, Python, RDBMS and NoSQL technologies. Breadth of industry experience to each engagement with specific background in government, power, financial, manufacturing, technology, healthcare and insurance. Long track record of success and Delivery within time and budget. Managed up to 12 team members in various positions. Agnostic perspective to each assignment, providing the best overall solution to the challenge at hand.
There's a need to develop a recovery process for Delta table in a DR scenario. Cloud multi-region sync is Asynchronous. This type of replication does not guarantee the chronological order of files at the target (DR) region. In some cases, we can expect large files to arrive later than small files. With Delta Lake, this can create an incomplete version at the DR site at the breakup point. The assumption is that the Primary (Prod) site is not reachable and therefore there’s a need to identify and fix the incomplete version of the Delta Lake table. Similar scenarios happen with RDBMS replication, they rely on their logs to restore the database to a stable version and run the recovery or reload process. This document will address this need and look for a solution that can be shared with customers.[saisna20-sessions] [transcript-play-code]
- Hi, and welcome to simplifying disaster recovery with Delta lake. We're gonna talk to you today about how Delta works, what can happen with respect to Delta and cloud replication. And we're going to run you through some simulations, and solutions for issues that we discovered with Delta lake and disaster recovery scenarios, particularly with respect to cloud replication. So my name is Zeashan Pappa. I'm a Senior Solution Architect with Privacera, we make Apache Ranger in the cloud. I've been working with Apache Spark since version 1.4. I used to be an Enterprise Architect with Avanade, and previously before that with Teradata. I've consulted for a number of firms across Financial, Insurance, Tech and Pharma. - And hi all. my name is Itai Weiss, I'm a Senior Solution Architect with Databricks. We make your dreams come true. I have been working with Spark since version 1.6 and I've been consulting for big data, and data warehouse project across many verticals such as Financial, Insurance, Tech, Pharma, and others.
- So one of the things I'd like to talk with you real quick is a bit about Privacera. The way that enterprises have traditionally dealt with data access control, was in a siloed database by database, platform by platform, user by user approach. Which worked in a pre-hybrid cloud world, if not an ideal situation, but today, in today's world, it's completely untenable. With so many heterogeneous data and analytics services in use in the enterprise today, both on premises and in the cloud. And simply so much data flowing into these environments, and so many different users to manage that siloed approach is too much labor. too time intensive, it doesn't scale. And it also makes it very difficult to identify and tag your sensitive data at scale, and then to monitor and record against user access behavior across those silos. This leads to inconsistent policies for the same users and roles depending on the data service, and as each service has its own access control capabilities, some better than others, enforcement is largely manual across each of these services. So on the left, you see a pre-Privacera world, and on the right you see a scalable compliance in your hybrid cloud environment powered by Apache Ranger.
So Privacera's primary use cases are, at least in terms of what we're seeing consistently out of customers. And when it comes to customers, we're at multiple Fortune 500, and more multiple Fortune 100 companies. And while all of our customers are individually unique, we've identified these three as kind of common initiatives that companies come to us when looking for a solution. First, we hear that companies are trying to migrate from on premise environments like Cloud era, Hortonworks, or Matt bar. We also deal with a lot of companies that are trying to achieve some kind of regulatory compliance with the cloud. And then we also deal particularly quite a bit with the data democratization use cases, which is, I want to have and give my users the ability to have access to the data that they need access to in order to make the best decisions. And have the best insights while at the same time making sure that I'm achieving compliance, and applying consistent data governance. With that aside, I'd like to talk to you a little bit about the problem statement today, how Delta works, what exactly is going to happen or could happen in cloud replication. Then the Itai and I will simulate the problem and talk through some problem resolution options as well as demo some solutions for you and discuss future solutions.
So this is kind of the crux of the problem statement. So although the Delta file system itself caters towards disaster recovery use cases, because of the nature of it, when we combine it with cloud replication, there's the possibility of our Delta table being corrupted at our DR point. Which of course would hinder or outright delay possibly stop you from doing entirely your RPO for your mission critical workloads. So there's multiple things to consider here.
When you come to options, right? The first and foremost is you can certainly wait for the problem to resolve itself. Most of the time with most cloud storage failures are intermittent, most clouds have extremely high durability, usually in nine to 11 nines.
And you're looking at this being usually, in most cases, intermittent. But if you have mission critical data, you really don't have much of an option. And if it's time sensitive as well, you have to recover to a previous working version in your replicated instance, so you can continue. There's a couple of different strategies there as far as how we would resolve the problem. The first is can you actually query the table? And query the table, chances are it's not corrupted. But if you cannot query the table, then you would need to find out which process ran since the last recovered version. And then effectively batch whether it's a batch or a stream process, you'd have to re-run the processes in order to basically re-compute your data to get back to your your desired state.
So in order to kind of understand why this is important, something that we feel is a very key precursors to understand at a very high level at least, how Delta itself actually works.
So when you create a Delta table it effectively looks like this on disk, your root directory, as say for example, my table and underneath that you'll see a little folder called underscore Delta underscore log. So that is what is called the transaction log directory. So this basically identifies it as a Delta table. Underneath that directory, you'll see zero to L max value so that integer number there is actually a long rather, and it'll go to a long max value for all the table versions. So this is basically metadata that describes and identifies all of the versions of the table.
Separately, you'll see any partition directories and the underlying parquet files that are underneath them. So for those of you that aren't aware of this Delta actually utilizes open source parquet. So these files like for example, that file dash one dot parquet is actually readable and it's readable itself with any standard parquet reader. However in the context of your actual table, and when you combine it with the metadata that's generated from the Delta log. Delta overlays, which is effectively an access or a read pattern for the data that tells the Spark reader which files to read, what version, which files to skip, so on and so forth. So as you can surmise, that Delta log directory, and those small files that exist in there to describe all that metadata are very key to how Spark is going to read and write to your table.
So a little bit about Delta JSON files. So I think a reasonable way of looking at this is to just talk about add, remove, and set transactions. So these these JSON files have a variety of different actions, which are effectively things that can happen or things that are described as having happened within those files. And the most common ones are Add File, which would happen in an append operation for example, where a new file was added to your file system. So say you added a new file or an updated parquet file, you would see an Add File event in one of those version files. Same thing with the Remove file. So say a file was deleted or compacted, you would see that record in the version that corresponds to that action. And in the case of set transaction, this is a recording of the idempotent transaction ID for streaming use cases. So literally would contain the ID of the stream that lives in your checkpoint metadata, right? So these are things like epoch values, number of batches, stream identifier, so on and so forth. So all that data is actually stored within the Delta log directories. And you could absolutely go and inspect it yourself if you're ever curious as to how that works.
So just to be 100% clear, if I was to do say, a df.modeappendsave, right? I would expect a parquet file to be created underneath, I would expect that a corresponding Delta JSON file would be added with an action in there that signify that I added a parquet file named part whatever it was dot parquet. If I was to do a Delta table dot delete using the Delta IO API's, I would effectively create a new parquet data file without the deleted data. And then I would create a Delta JSON file with a delete reference to that file and then an add reference to the new file.
So the reason for that is that Delta implements atomicity, and because of that atomicity that Delta is implementing this is to support asset transactions and to support multiple readers, and multiple writers. In order to do that, though, changes to the table are stored as ordered atomic units called commits. And in order for Delta to be able to support multiple readers, we can't delete parquet files, if they're deleted, we have to keep them around for some period of time, so that any readers that are currently reading that data can finish their read operations.
So let's just talk a little bit about how these commits work. So let's say for example, I have one commit 0000.json , another commit, 01.json, and then another commit underneath that. So in the first JSON file, I basically created two parquet files and now that happens as part of the first version. In the second version JSON file, I've removed one and two, and then I've added three. So what ends up happening here is that if I read this table, and I look at this directory, and the Spark reader reads the sequence of these metadata files, it will effectively skip over one and two and just know that I only have to read 3.parquet.
So all right, I think we have a hopefully by now a pretty good understanding of how Delta is working. And I think the next place to start is to understand, okay, I've got all these small files, I've got all these big files, what can happen in cloud replication?
So a good level set is to understand what is cloud replication? So in cloud replication, you have data stored in multiple data centers, or cloud provider regions. This is something that's offered as a feature of underlying cloud storage, such as your AWS S3 or Azure Blob. And the intent here is to ensure consistency between primary and secondary regions.
So how does cloud application helping DR? Well most obviously, it helps in meeting your compliance, or high availability goals. And it also as part of other application level DR components helps you deal with your DR objectives, in terms of being able to have and hold to your business continuity plans, however they might be laid out. Most options with cloud providers allow for a variable types of replication options. Multiple copies can be stored in proximity or very far away from each other with things like minimum distance guarantees. So there's lots of ways in which you can utilize cloud replication to help you with DR.
Now, the two kind of high level at a very high level, there's two kind of big major differences in the way most cloud replication products work. So you've got inter versus intra-region replication. So inter-region replication is where you have one region that syncs to another region. So this could be I've got a region say East US two in Azure and I'm sinking my data down to Central US, in Azure. So these are two, like physically different geographical regions. And this is really good for disaster recovery use cases because if there's a disaster that happens, it's a lot of times we're planning and estimating for these disasters to happen along regional boundaries. And it's a safe bet that region two would still be available in the event of some kind of a failure at region one. In the intro use cases, right? T these are kind of more fit for high availability use cases. And this is where you would have availability zones within a single region. And those, your data or your applications would be synced or synchronized and replicated across those availabile distance. What we're mostly going to talk about today is inter-region replication.
And we're gonna try to just understand very briefly what an anatomy would look like. So let's just walk through this example. A user Databricks or Spark would write data to a cloud store device whether that's an ADLs account, a blob storage account, so on and so forth, or an S3 bucket, those rights would have happened in parallel from each worker node to each partition. And then Spark is also responsible for writing that Delta transaction model. So when that data lands at the first region, it's written to that one region, of course, it always goes to one, one region first. And then data is now replicated locally across site, depending on your cloud provider settings. This is generally a asynchronous operation, so data is replicated asynchronously to the second region. And by virtue of being asynchronous it's replicated in non-deterministic fashion which means there's no strict order guarantees, which means that larger files can take longer to arrive. So I think that's the deciding point here, right? We with the Delta file system, we have a lot of large files which are parquet data files and have a lot of small files, which are your table metadata.
So how does Delta help disaster recovery? I know I'm mentioned this earlier or briefly covered it the Delta log at the end of the day, this is 90% of the way there right, we're capturing the last write and delete transaction that's committed to disk as individual versions, right? So this identifies every time you write to the table or make any changes to the table, we have a record that specifically talks about the data files that are related to that transaction. So this is great with DR because this helps us in understanding things in order and predictable way. And we can clearly understand based on the log files, based on log what files might be missing in the case of a disaster recovery scenario, but gives us at least the ability to safely and predictably roll back to our primary region. And the bottom line is at Delta is the best big data format that allows for a reliable disaster recovery on the cloud. So Itai can you walk us through what could happen in a DR event with Delta?
- So thanks Zeashan, you did a really good job explaining how Delta Works and how the cloud replication work. And yes, I agree with you. In some cases, there may have been a potential failure of the synchronization Or actually the asynchronization order in which the data files and log files will arrive to the secondary region. And if we simulate it on the left side again, we'll have a user or a job writing data using Spark.
The first thing would be to write the data files. Once this is complete, if there is no any collision, a commit will happen, which will write a log file. Both of those will be committed on region one, anyway start their transfer towards region two. However, things that they defined is usually much much bigger than the log file, the log file will be received in region two. And if something happens, the data files will not end up being regenerated. In this case, we will have an outer sequence replication, In region one we'll have both the data files parquet zero zero whatever, and the JSON file explaining that this parquet is related to this version and contains this data.
On the second region, we will have the same JSON file indicating the diversion has been committed, but the data files will not make it and will end up with either our share file or no file at all. This can cause a problem.
And for example, this is one of the problems when the file will not be found, we can see that Spark is unable to read the Delta file or is off the table and Janet and Eric saying the data file does not exist. So the question is, how do we use Delta to fix this?
We're going to simulate a problem in a notebook, and then we're going to run two notebooks with two separate solutions. When we're back, we'll summarize these and then we'll go over and look at what are the future solutions.
So here we break for the first video recording of position. - Thanks Itai. So before I dive into the rest of the notebook, one of the things that I wanna talk about again, just to reiterate and drive these points home is what problem is we're trying to replicate here? So in the case of disaster recovery with Delta lake, if we have one region, like we have on the left and another region like they have on the right, and we have some asynchronous replication process, as part of the cloud providers offering that is syncing this data. What is possible to happen is that this manifest file that you see here on the left, could arrive here much faster than your data, because it's much smaller than your data, and your data may be very large, right? Certainly greater than a megabyte, and certainly less than a megabyte in terms of the actual manifest. So if and that in this time that this file is replicated over and this file doesn't make it, and in the event of a storage failure, right? Now, what you have is this corrupted table. So what we're gonna do is we're gonna go ahead and recreate, and run some data here. So we're gonna use the byte sharing dataset from the Databricks datasets, for this example, so we'll start by reading it into a Spark data frame from a CSV file, we'll filter out data from 2011 And write that into a table called bytes. And then we'll also filter out data from 2012 and write them into the same table. So we'll have two ads, and two versions in our table. So here's that data and what it looks like right now.
And now we'll take two filters of that data out for these these time ranges and write them out into our table and because these are two different save commands, they'll result in two different commits and two different versions to that Delta table.
Now we'll create some meta store entries, so we can easily query these using SQL.
And just a quick test, everything should be okay.
Looks good. If we were to use a describe history command on this table, what we'll see is our two append transactions.
And you can see here operation parameters mode and then you can see even if you dive it even a bit further, you can see the number of output rows for each of those appends and the number of files that were added into this Delta table. So what we'll do now is we'll create a secondary disaster recovery table that's basically a copy of this. So this is effectively analogous to what's happening with your inter-region replication, right? You've got you want to trade in your region one, which is the byte's table that we've just created above and queried. And now, there's gonna be some asynchronous process that's going to happen behind the scenes to synchronize that data at a file level to matching accounts within region two. So we're gonna do that, asimulate that with just a copy command.
And then create any necessary metadata entries for that. And we'll see the same two transactions, as we saw on the previous table, this is exactly what we would expect. And if you were to go look in the Delta log for either of these tables, what you'll see is version one and version two.
We can read in each JSON transaction into a data frame to see what's actually logged. So this is the actual raw data that's inside each of those commits, right? So this includes the number of records, this includes data about the file name that's written down to and it also includes the name and location of the parquet files as well.
So if we wanted to see what the latest version was the latest transaction, we can run this little code snippet here, which is basically, very simply looking in the Delta log folder, and grabbing the latest version, where ad is not null. So basically the latest ad version of the table.
That's it right there. So use your one dot JSON, which is what we would expect. And then we can see here the parquet files that make up that version.
So that's the actual physical file on disk, that refers to that latest version.
And there's two that make up the entire table right now. So if we were to query that, you'll see there's two files. The latest version is 968, and then the 2E26 is the version prior.
So Itai, let's take this and let's move over to your solution and see if you can explain to us how these corrupted transactions if we were to actually corrupt the transaction how we could correct it. - This was a really good notebook. Now, we have data already and we have two tables, a regular table, and the DR table and then let's use it to simulate.
Looking at the solution first of all, we'll have to check the last version, right? Is it corrupted? There is a good chance if we didn't have any right recently, or maybe everything worked fine, maybe the FDD would fail or is the ADLs or Blob replication. Everything will work, we don't have to worry about it. But if we do find a problem, then in the first solution we will be named the transaction log entries. In a way this is an undo of the last transaction, we will tell Delta to ignore it by the fact that Delta will not recognize the log files as the history. We'll then create a cache to make sure that we don't have any leftovers in Spark memory or in our Delta format, we will rewind the last job. So we will go back and recreate the same last version under the same version number. So if we have a version one corrupted, and we renamed it, so it doesn't exist anymore, we're now recreating version one again, with the right data. And then they will be ready for us to be able to use so then we can run the new jobs, let the users use it or basically continue business as usual. Let's look at it in practice. And let's go over the notebook and understand how these renaming works. Let's look at the first notebook. This is the first solution we're going to undo the corrupted version. First thing, let's look at the tables that this shall have prepared for us. The first table is our original bytes table into primary region, everything was fine we have convergent with time stamping for interpretation. And let's look at the DR table yesterday is in the same region would be on the secondary region in DR, but we have exactly the same versions written by the Zeashan simply. Let's see if the data works on our DR. So we query it quickly we can see that we have two different records as expected few member 2011 was created as this zero and 2012 was created this.
Let's look at the files and make sure that we have the same parquet files, one for each version that you've seen before.
And we can see two parquet files these are data files and log files.
We will simulate the corruption of the latest parquet data by version, using a code in a way we'll identify the last file, and then we'll delete it using the details. All right.
And now let's try to query the data and see what happens to our assumption. And in a way as expected, we found that we got this error reading file on the same parquet files that we (mumbles) And so we successfully tweaked, simulated the DR corrupted data file and now let's find out we can fix it. We create some Python functions that will identify these file names sorry the transaction file names. We have two one of them is JSON and one of them is CRC and then we'll create a function that will allow us to go back to a specific version. You will be able to find these notebooks as well as a session under the sum it archives website. And for now, we're just going to rename them, here the CSV file we'll name to CSV silver black and the same as the JSON. We'll visit cache just to make sure that nothing that refer to it will go back into the Spark cache or database cache, try to find files that don't exist anymore. And now let's describe the DR table history.
We can see that we have only version zero in here. So we successfully roll back version once corrupt version five. Let's try to query this in zero using the time travel mechanism we can query byte DR HV zero, we can see that we got only the data from the V zero, which is (murmurs) This V zero works and we can close the data. Let's leave one now the same version one will populate the data for 2012 again.
And now when we create the data, we see that now we go back 2012 data. So now this table is complete the same way that in mixing the same tables that we have in the region. With both the same amount of records for both here. Let's take a look at the desktop directory. And you can see here that we have version zero CRC in JSON version one CRC in JSON. But we also have a notion of the version one CRC in JSON which is what will work. So now it's very easy for us to see that version one was recreated, and we did roll back here and if we ever needed data for the roll back, or any other audit purposes, let's go back to the slides and look at the second solution. So this was solution number one, in solution number two, we'll check the same version however, instead of renaming the version, we will overwrite the data files in a way non fight.
So think about it as I really don't want to change my data log but I know that I have an option to overwrite my parquet files, I'll empty them, so I'll have an empty version, then I will clean up the cache, I cannot re-run the last job, I would still have the corrupted version. So again, if version one was corrupted version one will stay there, but will become empty. And I would create version two as a new version that is similar to version one in your primary region. But once we're done with this, then the table is ready and I can continue business as usual and open out a table for users.
And let's see it in action. Let's take a look at the second notebook. This is solution, the second solution overwrite corrupted version data files. First let's disable cache because we're going to play with data files. And then let's look at the history for both the byte stable and by DR. Just make sure that we start a fresh. We have two versions for the byte table or the primary region and let's look at the DR table. And as we see it we have the same two versions on the bytes, DR table it's fine. Let's query the data and make sure that it is operational. And just like before, we can see that we have two records, one for 2011 for version one, and one for 2012 for version two.
We look at the data files just making sure again, to parquet files and to catalog we'll use different methods, but in this case, we'll find out that they'll be fine. And right here we'll find that it is fine and (murmurs). Great, we will check just like before, we're supposed to get an error right here, and it says that we cannot read or defines it just delete and just go to save here. We see what I did again, and missing parquet data file in the DR. And now let's find out how we can change these. So first thing we're going to do is locate the DR table and find those JSON. And from there we'll find what is the latest version that you have. And you can see that version one is that expression. We knew that, so that's not a surprise. But now let's find out which parquet file are associated with this.
You can see the version one has only this parquet file, which is the same one that's been moved before. We also loved the latest stable version, which really is the version that we're going to use to overwrite version one five scheme. Now let's make sure first of all this version zero really works using again, Delta time travel, it brings the data and who sees we go to only the year 2011. The table works with version zero, so everything is fine. Next, will bring the scheme from version zero and restore it in a variable, we'll then create an empty data frame, we'll create a new parquet file imitate location which is an empty parquet file with the NIC. And then we'll copy over in a loop for all the files that are part of the latest version file.
We can see here is that we overwrite it the same parquet file, we did it before with an empty parquet file. So let's now first of all, let's make sure that we fix the table and you cannot query. As you can see, yes, we can present data we can see only 2011 which is really only version zero. So version one is actually empty. Let's end version one data again. This is a 2012 year data. And quickly let's check what happened if they found it. And we can see here is that we have version zero, version one and version two.
If we look at the data quickly, let's use it now we have the full set of data both 2011 and 2012. The one difference in this case is that version two now actually holds the data as if our production version holds in version one. Version one is empty, we can find it we can query the parquet files, and we can see the directly. But it's not as easy to track it, as we say for version. So we have two different versions tracking, either undeserved version and take a big risk of playing the wizard Delta to look directory, or we can identify the parquet files, empty them out in a way by overwriting things in empty parquet file with the same schemer and then continue to work as usual.
Let's go back into solutions, images, and future solutions, yeah. So now when we know how we can use Delta to fix the disaster recovery might have been problems.
Let's understand what are the caveats of the solution. First of all, you can only remove the latest version. Don't try and remove all the versions you do not want to corrupt Delta university will not be able to use it anymore. So we'll make sure to do it on EDR and do it only for the latest version. Make sure you do not cross the checkpoint Delta collects all the JSONs of the last 10 transactions into a checkpoint. Try not to overcome this problem and opt to overwrite a checkpoint, and just deal with the JSON files.
Last before last month, if your last action is delete, or update, the previous version files might be marked as deleted as Zeashan showed us when you delete you're actually creating a new copy of the data, and we will remove the old version files. The same happens with update. In this case, you will not be able to undo either the latest corruption version or overwrite the parquet file. You will have to re-insert the data from the stable last time trouble version, and execute, or update or delete again. And when you do all of those keep in mind, that every disaster recovery has a rollback. And when you go back into your primary region, you might have to resolve the same data corruption. So keep it as simple as possible and try to make sure that you know how to fail back again. As for future solutions, we have two or three solutions that can become handy.
One option is not to rely on your cloud replication, so create a separate process that will copy the data files once all the data files arrive, you cannot copy your Delta log file. This will create a larger gap between your region one and region two. But this will make sure that every version click lens on your region, on your secondary region, will be completely unexecuted and you'll even know exactly when and which one of the versions you'd have to be re-run again. As far as our second solution, in Delta we're preparing a clone replication. And as part of the clone replication we created deep clone. Deep clone will do a similar solution, and will replicate your data files for you. So you won't have to take care of it. And once the data files will be replicated, only then the JSON version five log files will be replicated as well. Making sure that whenever you need to touch your data on your secondary region, it is complete for the last version that you executed.
We'd be happy to hear your questions. Thank you very much for attending this. Please don't forget to rate us.