Diving Into Delta Lake

Unpacking the Transaction Log

Denny Lee. Developer Advocate at Databricks
Denny Lee is a Developer Advocate at Databricks. He is a hands-on distributed systems and data sciences engineer with extensive experience developing internet-scale infrastructure, data platforms, and predictive analytics systems for both on-premise and cloud environments. He also has a Masters of Biomedical Informatics from Oregon Health and Sciences University and has architected and implemented powerful data solutions for enterprise Healthcare customers.
Burak Yavuz. Software Engineer at Databricks
Burak Yavuz is a Software Engineer at Databricks. He has been contributing to Spark since Spark 1.1, and is the maintainer of Spark Packages. Burak received his BS in Mechanical Engineering at Bogazici University, Istanbul, and his MS in Management Science & Engineering at Stanford.

Series Details

This session is part of the Diving Deep into Delta Lake series with Denny Lee and the Delta Lake team.

Session Abstract

The transaction log is key to understanding Delta Lake because it is the common thread that runs through many of its most important features, including ACID transactions, scalable metadata handling, time travel, and more. In this session, we’ll explore what the Delta Lake transaction log is, how it works at the file level, and how it offers an elegant solution to the problem of multiple concurrent reads and writes.

In this tech talk you will learn about:

  • What is the Delta Lake Transaction Log
  • What is the transaction log used for?
  • How does the transaction log work?
  • Reviewing the Delta Lake transaction log at the file level
  • Dealing with multiple concurrent reads and writes
  • How the Delta Lake transaction log solves other use cases including Time Travel and Data Lineage and Debugging

What you need:
Sign up for Community Edition here and access the workshop presentation materials and sample notebooks.

 

Video Transcript

Today we’d love to talk to you about Delta Lake. And one of I would say like the key components of Delta Lake it’s transaction log, we’re going to go into the details of what’s in the transaction log and how it works. So before we start, I just want to introduce ourselves. Who are we. Kind of like a philosophical question. But I am a software engineer at Databricks. I’m a Spark Committer, and I’ve been working on Delta Lake for about three years now. We open sourced it last year, April. So it’s been almost a year since we open sourced it. And here we are. Danny. – Thank you very much. I’m a developer advocate at Databricks. I’ve been working with the Apache Spark since 0.5 There you go. A bunch of other stuff, but basically I’m your resident data engineering geek. That talks way too much. So that pretty much covers me.

– All right.

So today the way we want to introduce Delta’s transaction log to you is first we’ll go into the details of the transaction log.

Outline

What’s in the commit. How does optimistic concurrence control work? And then, you know, we’re going to talk about a cool feature that, you know, having such a, you know, multi version transaction log provides you, which is time travel. And we’ll try to show it all in action through a demo. But before we start, I just want to, you know, like I believe everyone knows like how, what Delta Lake is, but, you know, in a very short way to explain it before we go into the details. Delta Lake… is a file protocol built on this transaction log. That gives you transactionality as well as schema enforcement, as well as automated file management on top of files that you store in your Delta Lake. So the transaction log is the key piece here. And to show you how it works, when you create a Delta table, this is what it looks like on disc, right.

Delta On Disk

So let’s say that you have a table at a directory called my table.

You have the transaction log directory, which is called _delta_log within the root of the table. That’s how we identify that a Delta, you know, this table is a Delta table through the existence of this directory. Within this directory you’ll see multiple versions of json files. These are numbered zero to, you know, long dot max value. And these contain all the changes that you make to your Delta table. Then optionally, if your table, you know, there’s the concept of partitioning where you coarse grain, index your data through some common values of your data set. For example, here with date, we could have partitioned directories where your files may live in, and then you also observe your files within these partition directories or at the root of your table if your table is not partitioned. And Delta underneath the hood uses open source parquet, there’s nothing special about these parquet files. The key piece here is that Delta’s transaction log knows exactly which files to read.

Table – result of a set bf actions

So what exists in these json files?

You see we have several actions that we call within these json files. These actions are metadata. So the metadata includes the information of the name of the table, the schema of the table, the partitioning of the table, and any kind of table properties that you may have around this table.

We have add file actions, which means that, okay, this is a file that has been committed to this table. So this file may contain statistics.

This add file action will contain the size of the file. And also like when it was last modified and whatnot, we have the remove file actions. So one of the key pieces of Delta is that it provides asset transactions. And one of the key things here is with asset transactions, you need isolation. And what that means is if you’re running a query and some other operation is mutating your table, the query should run on an isolated snapshot of your data. And for that reason, when we’re deleting records, or when we’re deleting files from your table, we can’t actually physically delete them immediately. We just logically delete them with this remove file action. Then we have a set transaction action. This is used for structured streaming. Whenever you’re committing files to your table through structured streaming, we add new set transaction actions, and the set transaction action just contains the ID of the stream. The ID of your stream is exists in the checkpoint location of your stream. And it also contains the EPOC ID or the batch ID of how long, how many, how many batches it has committed for, you know that stream specifically. So by just using the set transaction action, we can say that, “Oh this stream has committed up to batch 35.” If it’s trying to recommit batch 34, we can just say, “Oh, we already have that data.” You don’t need to commit that.

Then we also have the protocol version for Delta tables. This provides kind of guarantees that, you know, if you’re using an old Delta Lake client, when trying to read from your table, this ensures that your old client or your newer client can actually deal with all the metadata and information within the transaction log of your table. So this is kind of like if you’ve ever used Kafka Kafka also has like a client versions that are protocol versions. So this is the exact same with Delta. This is just to provide, you know, backwards compatibility, as well as if we’re ever breaking forwards compatibility. You know about it. Then we also have some metadata about each commit, and this is contained in the commit info action.

And basically once you get to replay the transaction log, you know, add and subtract all these actions in a linear manner, then you, the end result is the state of your Delta table. It ends up with the current metadata, the current list of files, the current list of transactions and the version of your table.

Implementing Atomicity

So we talked about how Delta provides asset transactions.

You know, the first letter is atomic. So implementing atomicity, how does Delta provide that’s atomicity? So one thing to understand is the transaction log for Delta table is the single source of truth for your Delta table. So any reader that’s reading through your Delta table, will take a look at the transaction log first, therefore changes to the table are stored as ordered atomic units called commits. So here we have 0.json. And then once you make new changes to your table, they will appear as 1.json. when you’re querying your Delta table, this 1.json contains all the changes since version zero of your table. Since this file appear suddenly and atomically, you will be able to read this file to see what has changed in your Delta table even though there might be new files appearing in the background or files being deleted in the background.

So we can see here that, you know, in the first version, let’s say, you know, two files were added 1.parquet and 2.parquet. And then the second version we say that we removed 1.parquet removed 2.parquet. We did some compaction process to add back 3.parquet. So how would this work, as we played these lists of actions linearly in order, the removes we’ll cancel the ads and we will end up with the final state of the table being, there’s a single file in this table called 3.parquet.

Ensuring Serializablity

Another key piece here that, you know, guarantee that Delta requires is mutual exclusion. So we need to agree on the order of changes, even when there are multiple writers and this provides a guarantee and database called serializability. The fact that even though things are happening concurrently, you could play them as if they happened in a synchronous ordered manner. So we have 0.json. You could have two writers trying to write 1.json and 2.json. So you have user one that reads zero. You have user two that reads one as you’re trying to commit user 2.json, user two wins. And user one takes a look and sees that, “Oh 2.json is already there.” Due to this requirement of mutual exclusion.

Solving Conflicts Optimistically

Therefore it will have to say,

Ensuring Serializablity

“Oh this commit failed.” Let me try to commit 3.json instead.

Solving Conflicts Optimistically

So how does that work?

Within Delta within databases there are two modes of concurrency where you can have pessimistic concurrency or optimistic concurrency. Within Delta what one means is with pessimistic, you think that there are always going to be transaction conflicts. So in order to provide the best user experience, you tell…. You lock your entire table as you’re doing your transactions. Within Delta we chose optimistic concurrency because with big data workloads, you tend to append more data to your table, rather than having like multiple writers consistently trying to update the same records. Updating the same records is typically rare, at least concurrently. So the way that optimistic concurrency works is that we first record the start version. Then we record the reads and writes of that version. And then we try to attempt to commit. And if someone else wins, you have to check all the changes that have happened since your read version and see if anything changed with regards to what you read. So in the cases where, you know, if there are two appends coming in at the exact same time, since you didn’t read anything, you know, what would happen is you would have two writers, they would write out the data files first. Then one writer will make the commit and say that, “Okay, I’ve got all these ads.” Then you have your second writer, which already wrote out all the data files, but it sees, “Oh, there’s already a commit “but I wanted to commit at.” So commit version two has already made it in. I need to check what changed in commit version two. So it reads what happened in commit version two sees that, “Oh, all of those were appends.” “I’m also a pure append. “I don’t care.” So it just says, “Let me just commit version 0.3.” It doesn’t have to rewrite any files it doesn’t have to redo the entire computation again, it just automatically commits version three.

In other cases, if you see that, you know, it’s kind of like a compaction process. If you have like two compaction processes running at the exact same time, and then you notice, “Oh some other process deleted.” What I was supposed to delete, then it will throw an exception.

So here, for example, we see user one and user two waiting the schema. They’re both gonna be appends. User one wins commit version one, user two can automatically commit version two.

Handling Massive Metadata

So with regards to,

you know, how Delta scales. Is that since we’re dealing with big data, we have a ton of files. We can have tons of partitions as well.

How do we scale the metadata? Because they’re just stored in, you know as json files. So what we do is we use Spark for scaling. So let’s say you have all these actions committed through multiple versions of your data. We can read all these json files as a spark job. And then every now and again, every 10 commits, we automatically create a checkpoint that contains the entire state of your table within one parquet table within one parquet file. Then Spark can read this parquet file or Spark can read these json files, play the actions in a linear order and see what the latest state of the table is. So this is much better than going into, you know, hitting your hive metastore to continuously get metadata, because eventually, you know, the hive meta store will have, you know, thousands of tables, you know, tens of thousands of partitions and like concurrently hitting this one, single, you know, database to answer all these queries, you know, you’ll hit bottlenecks. Whereas with Spark, each of your clusters are individual, or I mean each of your processes are individual processes. Each of your tables have their metadata separately. So you have a lot better scaling guarantees. Might use spark on the transaction log.

Computing Delta’s State

So how does computing the state work? So first of all, let’s say that we’ve made three commits zero one and two. Spark would read these commits, play them in order. And then it’ll cache the version two of the table and state in memory. So whenever you’re acquiring your Delta table, we can use this in memory representation of your table, which is, you know, only megabytes of data. It’s very tiny to answer all your queries, but what happens when you get new data? So you were on version two, what you do is you start listing your table and then you see all these new versions pop up. So let’s say that five commits have been made since you last graded your table. So the latest version of the table is version seven. So what we do is we call a method called list from which is in Delta’s lock store implementation. It lists all the changes since version two.

Updating Delta’s State

Then we input all these new json files, as well as the cache version into spark, play them in order. And then we can cache version seven on top of it.

Then when you’re at seven, let’s say five more commits made it in. As you can see this time, we have a checkpoint file that happened, you know at the 10th commit. This checkpoint file contains all your changes, I mean all the changes since version 10. So when we list from version seven, what we’re going to see is, “Oh, there’s a checkpoint file.” And then there are two more versions after that. So what we’re going to do is we’re going to actually forget about the cache version. We’re going to recheck point 10 and then add versions 11 and 12 on top of it and cache version 12 to this. So this makes it a lot easier and a lot faster to update Delta state in such cases.

So that was kind of like a brief overview of how Delta’s transaction log works. And as you can see, we’ve had all these like versions and of the table. So one of the cool things that this version and multi version table provides is time-travel. So let’s talk about how that works.

Time Travelling by version

So let’s say that we’re going to time travel by version. With Delta you can time-travel by versions of your table, as well as timestamps. So you can run a query called select star from my table version as of 1071, or you can provide at syntax with 1071 within Databricks front time to query your table. Or you can use data frame API, such as version as of, or you can provide the add syntax within your path to specify which version that you want to load. And once you specify this version, it’s very simple to just say, “You know deltalog.getSnapshotAt(1071) and the computation that you just saw just happens very naturally. We can list all the files. See what’s the latest checkpoint. We can see version 1070 at having a checkpoint. Play the one commit right after it and come out to version 1071.

While you’re time traveling by timestamp.

Time Travelling by timestamp

Then the case happens is that you can’t just call get snapshot at 1071. You need to figure out, you know, which version that timestamp corresponds to within your table. To explain this concept, I’m gonna use some data from the middle ages. Let’s say that you want to understand which version of your tables running at 1492-10-28, which is when the Americas was discovered by Christopher Columbus. So you might write timestamp as a 1492-10-28 and try to load it. So what happens underneath the hood?

So we take a look at all your commits. And all your commit timestamps come from storage system modification timestamps. What does this mean? If you use HDFS and you write a file to HDFS, it assigns a modification timestamp to your commit file. Same thing with Amazon S3, same thing with Azure data Lake. So we use these timestamps as kind of, you know the source of truth of when these commits made it into your adultated table.

But the problem is these are distributed systems and distributed systems can definitely have clock skew. So in this example, I’ve kind of used Turkish history to demonstrate this, you know, timestamps. 375 was one of the Turks migrated from central Asia into Europe and Anatolia or towards the West instead of staying in central Asia. And that was the beginning of the middle ages.

1453 is when Istanbul was conquered by the Turks or the Ottomans. And in the West they generally treat this as like the fall of Constantinople, but we, you know, treated us the conquer of Istanbul. 1923 is when the Turkish Republic came to be. And 1920 was when the parliaments of the Turkish Republic was first opened. So as you can see, there’s some inconsistency between when the parliament opened and when the Republic was, you know, when the Republic came to be, so how do we deal with this.

Since timestamps can be out of order, we have to list all these commits and we adjust these timestamps by adding one millisecond to any timestamp that’s out of order. So as you can see version 1073 will suddenly have one millisecond after the 1923 version.

Then when we’re trying to find, oh, when was 1492 within these lists of commits, we play… We look at the latest version that has the closest timestamp that doesn’t exceed the given timestamp. So as you can see 1453 happened right before 1492, and we can therefore choose version 1071 to actually understand which version that we have to load and then we can call Delta log, get snapshot at 1071 to replay that timestamp.

Time Travel Limitations

So kind of limitations with time-travel.

First of all it requires transactional log files to exist with Delta we keep by default 30 days worth of transaction logs. I mean 30 days worth of logs and changes for auditing purposes. And these can be modified by the log retention duration configuration within Delta.

It requires the data files to exist because as you do compactions you logically remove old files, but once you call vacuum on your table to delete these logically deleted files, you know, you lose the chance or opportunity to go back in time and read those files. So to prevent that you can also set like a deleted file retention duration equal to your log retention duration to maintain full compatibility of the entire history that you can time travel to.

The thing here is that time traveling in order of months and years is infeasible. And the reason for that is your storage costs will just explode. So if you want to be able to, you know, go back in time to see how a record, what a record was, you know a year ago or two years ago, or 10 years ago, even then, you know, you have to change your architecture in such a way, whereas like you can store your data in STD type two format. For example, which retains the entire history of you know all the changes to each of your records. But when you query the data, it always shows you the latest value, thanks to like a Boolean flag, which is part of your table schema. So do not use time traveled for, you know, years and years worth of, you know, history. Time travel is kind of, you know, the way we built it initially was to prevent mistakes from happening because we’ve observed a lot of users, first time users, you know, misusing Delta and accidentally deleting all their data. With time travel you could very easily, you know, restore all your deleted data because it wasn’t actually like really physically deleted.

And another reason is that computing Delta’s state won’t scale when you’re deleting data, if you’re trying to contain, you know, years worth of history. And the reason for that is we delete checkpoint files a little more aggressively. Therefore you might end up with cases where you might have to load one checkpoint file and a hundred thousand json files. So if you do need history, you know, there are architectural, you know, means of better storing all that history.

So that was me talking a lot. And I’d like to hand it off to Danny to show you all of this in action. So I’m gonna stop sharing my screen now.

– Perfect. Thanks a lot Burak. I appreciate it. If you have any questions, please plop them in the Q and A, but I will not answer any questions on Turkish history. That was awesome. (laughs) All right. So diving right into it. I’m gonna go ahead and present my screen. I will show you Chrome and all right, so give me a second, sorry. As I try to… there you go. Okay. So this notebook is a…

Diving into Delta Lake: Unpacking the Transaction Log

just to give you some context for everybody here is actually available already, but we will share out this specific version of the notebook shortly, okay. Now this is a version of the Delta Lake tutorial from Spark and AI summit from 2019 in Europe. But what we’re doing now is we’re gonna… we’ve done a modified version specifically for this talk. Okay. And so, as you say, diving into Delta Lake I’m back into transaction log, so I’ve already started some of the running. Basically what I’ve done is I’ve downloaded some data specifically to a, which is specifically some loan risk data that we had already generated. Okay so this is just downloading the file. It’s a relatively small file. And I’ve said in case you want to run this on Databricks community edition, you absolutely can. So we’ve already configured it as such. So that makes it easier. And the first thing we’re doing is we’re creating a parquet version of the table. So in other words, we’re not gonna go ahead and just go to Delta right away. We just want to start with parquet to give you some context behind what Burak actually was talking about to give you some context on, how does it operate when you just go with parquet alone? Okay. So I created this parquet table. I defined the view loan’s parquet here. So hopefully you’ll be able to see it. I actually gonna zoom one more time, so you can see it better, okay. So loans parquet, and you’ll see the schema of this table. This table has four columns, loan ID, funded amount, paid amount and the state. All right. So how many records do you have? We’re just an account on that loans parquet. And what you’ll see is 14705. All right. So that’s the number dejure your first today. All right. And as you can tell, I just ran it. So, but now I’m gonna go ahead and do kick off that structured streaming that Burak had referred to earlier and giving the context of being able to run structured streaming and batch at the same time, because Delta Lake allows you to do that because it has acid transactions to protect everything. So let’s go ahead and just create this particular function and let’s go ahead and run it. Okay. So what’s going on right now is I’m gonna go ahead and simply run the stream. It’s a right stream, as you can see here. Okay. And give it a couple seconds for it to basically kick in. And as you can tell, the input and processing rate are starting to kick off and here’s the batch duration, all right. So if I was to go ahead and do account on the same path, now note the fact that I’ve indicated here, parquet path, format parquet. But what I’m doing basically is I’m not doing account on the stream. I’m doing count on the batch. So in fact, I can go ahead and do that, which is pretty nice. Okay. So it’s right now telling me 160, but there’s a couple of problems here. I’m actually gonna address two of them. First thing is what happens when I try to add a second stream, okay. So the exact same function, we’re just generating different sets of data and we’re gonna write to the same location parquet path. Okay. So when I do that, basically I’m gonna kick off the stream, what you notice is that basically over time, while this stream, there’s no input processing rate. There’s actually not writing any data. The batch duration is gonna go up, but there’s actually nothing being written here. Okay. This other one other stream, this first stream it’s actually writing still. Okay at 41.5 records per second, input rate is five records per second, okay. But nothing’s happening here, okay. So it’s good that I’ve got data going in, but I can’t put a second stream in. So in other words if I’m building these data pipelines where I’m actually trying to scale and process multiple threads from multiple sources or partition source and push them down to my table, I can’t do that, okay. So the good news is that there is data going in. But I can only do one stream. So in essence, I’ve locked my table. All right. That’s problem number one, Problem number two is, wait didn’t I just say there was 14,705 rows, right. So right now, from the last query of 410, what happened to all my data? Well, if I look at the schema here, what you’ll see here right away is that, the problem that I have is I do have loan ID, funded amount, paid amount in Azure state. The four columns that I originally started off with, but I’m actually missing, but I’ve actually have two additional columns, timestamp, and value. So this is cool in terms of, I have to timestamp of when this data went in, but it’s not cool because I just basically corrupted my table for all intents and purposes. I don’t have the actual count but I want it. So this isn’t good all right. So I’m gonna stop all the streams right now, but basically what it comes down to is because I’m using a parquet table. I can’t actually run multiple streams against my table at the same time. Okay. Nor can I once there’s a schema change it basically in essence corrupts the data that I have. So both are pretty not fun options. And common enough when you’re working within any data engineering pipelines, okay. So let’s go ahead and do this now with Delta Lake. So I’m gonna rerun this particular portion of again this is just recreating the views, except I’m cleaning it up and I’m gonna create a loans Delta, okay. And so when I run this again, you’ll see the same 14705. So that’s a good starting point. Okay. And again, when we look at the data, there are the four columns that we’re working with. Okay, so perfect. All right. So we’re back in action. We’re using a Delta table instead of parquet table, okay. But then can we do the things that Burak had just mentioned? All right. Well, let’s go ahead and start talking about a little bit about schema enforcement, by the way, as I talk about schema enforcement, I’m only gonna talk about it a little bit. In fact, Andreas and myself will have a session next week, where we dive into schema enforcement and schema evolution. Okay so we’re just gonna cover a little bit here just to give you some context, but we’re gonna dive in much deeper in the next session. So just a little call out for ourselves okay. So I’ve created the view. Now note that I’m actually creating a view called Loans Delta stream, okay. Off this Delta path location, okay. So it’s one file system location that in which I’m creating a view that’s for streaming purposes, okay. So I’m now gonna do a count against that, and it’s basically trying to process the data, okay. So it’s just simply trying to do a count off that, that file path, the Delta path location here. So now I’m gonna go ahead and try to run that same code again. Remember I tried put two additional columns. The reason it is because by definition, when you run a stream it’ll actually add those timestamp value columns automatically. Aha. But there is now a schema mismatch. So this is the one cool. One of the cool things about Delta Lake right away is that there is schema enforcement. It’ll protect the table right from the get go. So it doesn’t allow this function to write to that same table because there’s a schema mismatch. In fact if you open up the error message, it tells you right from the get go that if I want to enable this merger schema again, we’ll talk more about this in the next session, but I just have to turn on this option where it’s schema is true. And here’s the reason for the difference. Basically the original table schema has these four columns, but the data schema that we’re trying to write to has six columns. So you can go ahead and basically if you were to change it or you to turn on often word schema, then you’re good to go. Alright. Alright. But let’s go rewrite the function. So all we did is basically add this particular statement here. So we’re saying let’s only write the four columns that you see here. All right, let’s go run this now. Okay. And now let’s go ahead and write this again. So we’re gonna do the exact same thing as we did before. We’re gonna go ahead and run and we’re gonna run this particular one to basically write to that Delta table. Just the same process like we had solved before. So if I open this up, I’ll see the input processing rate kick in and per the batch duration. And if I give it a couple seconds, when I scroll back up, you’re gonna see the numbers change. In fact it started at 14705 and now it’s already at 14805. So as you can tell the first stream is currently writing into this particular table now. So that’s great. Oh, well now drop the 15805. Perfect. So, but because I have asset transactions and the transactions that Burak had been talking about to protect the table, I’m actually able to run a second stream and for them, and I can run the third one, the fourth one if I want to as well. But for the purpose of the demo, I’m just gonna run two today. But again, same idea as you can tell right away, as opposed to with parquet, you see the input versus processing rate kick in for the second stream. All right, right here. You know 50 records per second input rate and 186.7 records per second for the processing rate. And this is for the second stream and the first stream, same concept. So both streams are writing to this table at the exact same time. All right. Why? Asset transactions are protecting it. If I scroll back up, you’ll notice that the counts are quickly, quickly updating themselves. So now that what you see here is I’ve got two streams that are writing to the same file system. I’ve got one stream that’s actually reading from that same file system. That’s why we’re 2505. If I scroll down and run a batch, I’m gonna run a batch at the exact same time as well. So now I have a batch count all going at the same time. So concurrently to right streams, sorry, two rights streams, one read stream, and one read batch. So gives you that flexibility to run multiple things at the same time against your data. All right. So we’re gonna stop all the streams and we’re gonna shift over to exactly what Burak had referred to before about that table, Delta Lake table history. Alright, so before we do that, what you’ll notice here actually is that remember that the data was sitting in this location, Loans Delta. What happens is that there’s actually a _delta_log, folder as well. So if I run that, what you’ll see is a bunch of files here. So here’s the zero file, the json. All right. That’s the one that actually initially created the table. And for each json that’s been added, this is the transaction log that you’re calling out and just as Burak had noted when he had mentioned before, every 10th one, we go ahead and actually switch over to our parquet to speed things up, okay. And right now, there we go. Here’s the second one because this is the 20th file. And so right now we’re on version 21 or 22 while we’re of versions of the history for our Delta table.

So you don’t have to look at it like this. You can actually look at it in a much nicer fashion. So we’re gonna go ahead and actually simply run the Delta table history. So this is just by the way to provide some context, this is actually all the open source code. We happen to be running this on Delta on Databricks, but you can actually run this on the OSS code yourself. And so here are the 22 different versions of your table. Okay. All right. So on the first initial, right, to all the streaming updates that went into your table, All 22 versions, just like the file system. And so just as also Burak had noted previously, if I was to look at the file all inside each json is all this commit information, all this metadata right. The timestamp, the user ID, who I was, who did it, what was the actual operation, the parameters, the append, the query ID, isolation levels, the transaction, yada yada, yada, all of this information per each file, right, is all here. So it gives you each batch, each transaction, you understand exactly what’s being impacted. What’s being apended. What’s being modified, what’s being changed. So that’s what’s inside these json files. Alright. And then we’re gonna cover this a little bit more in terms in some of the future sessions, when we do, especially the DML operation, where we’re doing insert and updates, but just to give you some quick context, just as Burak had noted, I can now with time travel, go ahead and say, “Okay give me the version.” You can do timestamp too, but for the purpose of this demo, I’m just gonna do version. And so what’s v01, the data frame here. I’m just simply saying, “Okay. Format Delta gives me the table.” From that Delta path version one versus version 11 versus the current version. Okay whatever that version is in this case, since we did this live is version 22. And then as you see here, here’s the counts. So initially the table was 14705. Alright. By version 11, there’s 18,505 rows inside it. And for the current version there currently 24,000. So that’s really cool, but just as Burak had noted. How can I actually have all the data still there? Like after all you just answered. Why can I know what didn’t happen on version one versus version 11 versus the current version? Well, it’s because there’s a lot of files, so I’m actually just gonna do a quick query. You’re gonna see a lot of files here. So here’s all these little files, all these little small parquet files that have been added right inside that directory. Okay. So it starts off with the, I’m sorry, the Delta log, like I had mentioned before, but here’s all those files. Okay. They’re all pretty small, as you can tell, and actually I’m gonna scroll all the way to the bottom. And there’s like, here’s the initial file that we had created, right. For the initial table creation. But otherwise other than that lots of small little files. So as you can probably guess this is gonna be slow, right. If you keep on adding more and more files into this, this will become a very slow process trying to read that data becomes slow. It’s a management headache, all that fun stuff. So how do we avoid that problem? Well, we’re gonna talk about compaction of files and also vacuuming. All right. So first things first, let’s go ahead and compact the files. All right. So this, in this case, what I’m doing is I’m gonna read from that Delta path location. Okay. All the files that you just saw there all the small small files, I’m saying explicitly the note that I wanna create, I want you to create four files. And we know its re partition the number of files I want for. I’m doing this just because in this particular case, a good rule of thumb or good starting point is that I have four worker notes. So yeah I’m just gonna go with four. All right. And then I have to specify where the data change is false by default, the data change is true, okay. So when it’s false, it says, “Okay, no I’m gonna go ahead and actually make changes.” I’m not actually gonna change assume there are no changes to the data. So I can actually cleanly read and actually run the operations. And it won’t change the data at all basically. All right. So I’m gonna keep it as a format Delta. I’m going to overwrite it and then basically save it to the Delta path. So I’m gonna go run this now. So what happens here is now I’m quickly going to create four new files, right. Literally 4.32 seconds. So that’s pretty sweet, except if I was to go ahead and look at this all over again.

I still see a lot of files. All right. Why do I still see a lot of files? I see a lot of files because in fact, if I actually just do account, oops, sorry about that. There we go. There’re 329. So, but why are they so many files? Well, the reason why there’s so many files is because for stash on isolation purposes, if there is somebody trying to query the data at the exact same time as I was running that operation, they can still read the old files that are still there before they go ahead and try to read the new files.

So that makes things a lot better from a concurrency perspective where basically you can have multiple people quering at the same time. They can still go ahead and actually look at the older files while you’re making your performing your operations. All right. So, but it’s okay. Let me go ahead and just simply clean up that data. Now do know in production, this is an anti-pattern. Do not do this in production. By default when I run a vacuum, what I’m doing is without specifying retention hours equals zero. What I’m doing is I’m saying, “Go ahead and by default remove any older files “that are seven days or older.” Okay you can change that number obviously, but the idea is that by default seven days or older, I’m gonna go in and say, “Nah don’t even bother doing a retention duration check, “just simply set to zero.” Okay so in other words, what I’m saying is that if it’s older than now, delete it all, okay. So the data’s still there in terms of the most recent query. But if I was to try to requery the older data, I will actually result in a failure because while the transaction log has all of this information, the actual data no longer resides. And how do I show that? Well let me go back into the file system and boom, there you go. Now you have only the files, the four files. The four files that were generated as part of the compaction. You still have the Delta log. So if I was to go ahead and just run this, okay.

All the log is still there. All the transactions that you just did are still recorded. So from a GDPR or CCPA or GRC compliance perspective, you still have that information on what happened to the table, but the underlying data no longer exists, as you can tell, because I just vacuumed it out. So that should cover it for the demo. We do actually, like I said, we have actually these demos available. We’re gonna be sending this demo out. But if you don’t can’t wait the five hours for us to go ahead and email this out to you or updated in YouTube. It’s okay. You can go to the Sparks Summit Europe demo, right, that we had in the Delta repo. But otherwise that we will be sending it out inside this notebook we also call it that you can join Delta Lake on GitHub. Obviously that’s where we put a lot of our stuff. Also the Delta Lakes Slack channel, this is where you can ask us tons of questions and also the public mailing list. So with that, I did wanna leave time for questions, but before I did that, Burak anything else that you wanna add? – I mean, thank you, Danny. That was great. One thing that I like wanted to also stress out, Delta, you know, in order to maintain this mutual exclusion property within its transaction log has specialized implementations of its log store, depending on which storage system you’re on. For example, Amazon S3 has eventual consistency and it doesn’t have but if absent guarantees. So when you’re using Delta on S3, there are special configurations that you have to set, which is in, which is all documented in the Delta Lake docs. So please read those when you’re working on local file system or Hadoop distributed file system or Azure data Lake gen two, or HDFS, these provide, you know, atomic renames. So there are also special configurations to set for these. So please take a look at all of them. – Cool, rock on. So for folks who would like to ask questions, could you do your best to please put those questions in the Q and A session as opposed to in the chats so that way it’s easier for us to track them all. So let me start taking a look at it. Actually hey Burak, there’s a really good question. Let’s start with this one. Do you recommend Delta Lake for OLTP like workloads? – Great question. I would definitely not recommend Delta Lake for OLTP. You can think of Delta more for OLAP use cases and like big data. So think about it this way. You don’t want to insert one record at a time or update one record at a time. You want to batch things as much as possible. So to provide this batching. You can use some centralized message buffer like Kafka or Kinesis or Event Hubs to actually batch up, you know, like 10,000 changes or a hundred thousand changes, otherwise, you know, one at a time or you know, several, you know, few at a time, definitely not. – Cool, rock on. Nothing I can add to that. So that’s sweet. There’s another good question about should vacuuming being done offline as opposed to versus there’s nothing to stop vacuuming from proceeding if someone was reading one of those older files, is that correct? – Yes. I mean, since we have this like seven day buffer, you don’t need to actually, you know, have vacuum block anything. Vacuum is something that you just do offline. Some other process you can have like some other cluster running it every, you know, once a week or whatnot. – Yeah. And just to add to the point, remember how we said that it was a anti-pattern for us to set retention to zero, right. So this is precisely the point that the idea that if you run it with retention zero, you’re going to delete everything right away, which sort of sucks. – Yeah you would definitely delete things that you were just about to commit to the transaction log and that would suck. It would just lose data. – Yeah, oh so there’s another question related to vacuum. So what does vacuum look like in the transaction log? And so, yeah, go for it. – Great question. Vacuum, it doesn’t show up in the transaction log because it’s not a commit tier table. So it doesn’t change anything about that table. – Right. But do note the fact that recall that the transaction log basically hasn’t changed, okay. But the underlying data has. So in other words, when you run a query against a version that you see in the transaction log, but no longer the file no longer exists in the system what’ll end up happening is you’ll get an error, okay. Now, in this case, this is a desired error because you’re actually doing exactly what happened. You’re removing older data. And as a quick caller, this is very useful for that for debugging purposes, useful for oops scenarios and even short term, like model drift or short term, you know, validations.

But I would not definitely not do this as a longer term approach just because exactly as Burak noted. And you could see from the way the file system operated, there’s lots of small files, right. Even if I was to do a compaction all the time, I still have a lot of files. So I really don’t want that, right. And so this is good for those specific scenarios. I would not try to run this like, you know, for like three years worth of data, right. If you’re going to do this for three years of data, I would actually snapshot those tables up, save them to cold storage or something akin to that. And then look for example, a good example would be like financials, right. You know, like financial reporting. Okay, fine. Fair enough. Then I could take a snapshot of the table on the 31st or on the first, depending on how you do it or on the fifth, if you need reconciliation time and then store those files in that table to cold storage that you still have it when you wanna query it, but I would not use this as a longterm approach, okay.

So let’s see. We’ve probably got time for about two more questions. Okay Burak. Because I wanna close things up. Let’s see.

Oh, okay. A good question. That we’ll dive into deeper next session, but I did want to go ahead and call it out now. If there is a schema change apart from adding comm, is there any other way then to reprocess older data with the new schema? – That’s a good question. So if you’re like drop, so if there’s a schema change instead of adding a column, so if the columns do not exist, as you’re appending to your table, those columns will just be, you know, filled as nuls. So that’s totally fine.

If you’re trying to change the data types, we try to, you know, allow certain data type changes, which Parky allows. And that’s only for, you know, shorts, bytes and ints are stored as ints within parquet. So you can upcat from one to the other, but you cannot change from int to long for example. In those cases, parquet encodes these values differently, therefore you would have to either override your entire table or you can also decide to, you know, create a view on top of your table that would omit certain columns or whatnot. – Yeah makes a ton of sense. Okay. So then the final question, when using Delta Lake with structured streaming, we sort of alluded to this before, but let’s call it out specifically. You can end up with a lot of small files just as we’ve seen. So what do you think is the best way to handle all of this then? – Yep as Danny showed in the demo, we have the ability to read from your Delta table and right back to your Delta table with the option of Delta data change false. So this just tells Delta that you know, this transaction is happening. It’s only, you know, rearranging the data in a different layout. It’s not changing any of your data. So this has like a lot looser isolation guarantees when, you know, committing this transaction. So you can use this data change false to reorganize your data, as well as within Databricks, we have more, you know, we try to make this process simpler with, you know, optimize as a function, which you know, can do other things underneath the hood. But yeah, this like data change false, the ACID transactions is the key here. You can reorganize, rewrite your data in any way that you want that could give you, you know, better data skipping guarantees, better indexing guarantees and whatnot.

– Perfect. Well, I think that covers it for today. I realized there are a bunch of other questions that we haven’t had chance to answer. Saying this, this video will be propped up onto the YouTube channel, Databricks YouTube channel. And from there you’re more than welcome to continue conversing with us and asking questions there. We’ll go ahead and do our best to answer those questions there. We also will send out an email for everybody who registered via zoom to go ahead and actually that we’ll send you a reminder for both the video and also the notebooks as well. So we’re gonna put all this information out there