data brew logo
EPISODE 3

Demystifying Delta Lake

Delta Lake is an open source storage layer that brings reliability to data lakes. Delta Lake offers ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. It runs on top of your existing data lake and is fully compatible with Apache Spark APIs. For our “Demystifying Delta Lake” session, we will interview Michael Armbrust – committer and PMC member of Apache Spark™ and the original creator of Spark SQL. He currently leads the team at Databricks that designed and built Structured Streaming and Delta Lake.

Michael Armbrust
Michael Armbrust is committer and PMC member of Apache Spark and the original creator of Spark SQL. He currently leads the team at Databricks that designed and built Structured Streaming and Databricks Delta. He received his PhD from UC Berkeley in 2013, and was advised by Michael Franklin, David Patterson, and Armando Fox. His thesis focused on building systems that allow developers to rapidly build scalable interactive applications, and specifically defined the notion of scale independence. His interests broadly include distributed systems, large-scale structured storage and query optimization.

Video Transcript

The Beans, Pre-Brewing

Denny Lee: 00:00
Welcome to Data Brew by Databricks with Denny and Brooke. The series allows us to explore various topics in the data and AI community. Whether we’re talking about data engineering or data science, we will interview subject matter experts to dive deeper into these topics. And while we’re at it, we’ll be enjoying our morning brew. My name is Denny Lee, and I’m a developer advocate here at Databricks.

Brooke Wenig: 00:30
And I’m Brooke Wenig, Machine Learning Practice Lead at Databricks. In this session, we will discuss the engineering journey from building data lakes to what is now Delta Lake with Michael Armbrust, Spark PMC member and engineering lead for both Structured Streaming and Delta Lake at Databricks. So, Michael, how about we kick off the session with just a quick introduction of yourself and how you got into the field of big data?

Michael Armbrust: 00:52
Yeah, so I’ve been working on data for almost a decade now. I did database things for my PhD. I spent a little bit of time at Google. And I joined Databricks about six years ago to help write the first version of Spark SQL and then Structured Streaming, and I’m now still the tech lead for Structured Streaming and Delta Lake.

Brooke Wenig: 01:12
So what made you join Databricks? What in particular was interesting about the big data projects that Databricks is working on?

Michael Armbrust: 01:20
Yeah. So back when I joined Databricks, I was coming from Google, I was doing kind of a post doc there, which is this weird position where you’re a full-time engineer, but you’re only actually going to be there for two years. And the idea is after that, they’re going to send you back to academia. And so, while I was at Google, I was working on this project where the thesis was, can we build a composable optimizer where it’s easy for many people to extend the optimizer, rather than it being one giant monolithic program? And so we had built this really cool prototype and I was ready to take this to the next level, and I thought Apache Spark was the right place to do that. And so I was actually on vacation with Ali, our CEO, and a bunch of friends from grad school. And he convinced me that Databricks was the place to kind of see this vision through to the end. And that is what eventually became Catalyst and Spark SQL.

Brooke Wenig: 02:12
Do you mind if I ask where you all went on vacation?

Michael Armbrust: 02:15
Oh, we were in Hawaii. It was for another one of the co-founders of Databricks, Andy Konwinski’s 30th birthday.

Brooke Wenig: 02:23
So I think we need our next retreat in Hawaii, but since we’re all in quarantine, we’re all just going to be enjoying this from our homes. Next question for you, Michael, how did the conception, or how did you conceive the idea of Delta Lake? What were some of the ideas that led to the evolution of Delta Lake and maybe what were some of the shortcomings of Spark that led to that evolution?

Michael Armbrust: 02:43
Yeah, so Delta Lake came from years and years of watching people struggle to use Spark in the cloud. It turns out Spark was designed for HDFS, where you have kind of nice primitives like transactionally renaming entire directories. And that kind of allows you to work around the lack of partitions. But when you move Spark and you run it on something like S3, where you have to struggle with eventual consistency, I just saw people fail over and over again to get their programs to run correctly. So problems like jobs crashing in the middle and leaving a bunch of state that needed to be cleaned up, people dumping garbage data with the wrong schema into a table that prevents you from reading it back, and then really just kind of scalability limitations around metadata. I think even things like listing files on S3 can be significantly slower than HDFS.

Michael Armbrust: 03:33
So if you have a table that has hundreds of thousands of files in it, it could take… We had customers where it was taking 40 minutes just to load the table, not even doing any querying over it. And so after seeing this for many years and kind of collecting all of these different problems through our support process, we’ve been talking about building a scalable transaction log as kind of a cool abstraction.

Michael Armbrust: 04:00
And then the way Delta actually came to be was I was at a Spark summit talking with one of our largest customers, and they had this pretty crazy idea where they wanted to be able to ingest petabytes of data per week, trillions of records per day into a table that would be available to be queried in real time. And I knew at the time that we wouldn’t be able to handle it with our current architecture, but I was pretty sure that this idea of a scalable transaction log would allow us to do it. And so from that moment, we actually kind of went straight back and started working on it. And six months later had a real prototype that was actually running in their production.

Denny Lee: 04:37
So then how does your experience as a Spark PMC member help you make those contributions into Delta Lake? And for that matter, maybe let’s focus a little bit about the making streaming first-class for that matter.

Michael Armbrust: 04:48
Yeah. So, looking at streaming and Delta and how they’re related, it kind of turned out that one of the things we saw over and over again that people were trying to do with Structured Streaming was read data in from a variety of sources and then write it out as Parquet onto S3 or ADLS. And so Delta was kind of a natural fit there. And if you look at the first version of the streaming file sync in Apache Spark, it actually looks quite a bit like Delta. It has this same idea that you write out Parquet files, and then you have a log that tells you at each version of the table which of those files are actually valid.

Michael Armbrust: 05:24
However, the streaming file sync is quite a bit simpler. It doesn’t have any support for concurrency. You could have only a single writer writing to a table. It kind of had support for removing things, but we never actually fleshed that all the way out. And so Delta really kind of took that architecture and really finished it. It brought it to the next level by coming up with a full protocol for doing full ACID transactions on this log. And so, I think really our experiences with Delta were pretty informed by this. And then also knowing all of the right places to plug in to Spark. I think we started by plugging in with Data Source V1, and then Delta became one of the main motivating use cases to finish Data Source V2. So, not just Delta, but really any data source could plug into Spark and leverage all the power of that execution engine.

Brooke Wenig: 06:12
So given all the benefits that Delta provides to Spark, why isn’t Delta built into Spark?

Michael Armbrust: 06:17
That’s a good question. I think as the Spark project has grown, we kind of made this conscious decision a couple of years ago to avoid having it become the kitchen sink of everything. I think like how the Hadoop project turned out, where everything started being tossed into the Hadoop umbrella, and then they started to split out a bunch of projects from it. I think it makes it harder to maintain. I think it means you end up going a very long time without releases and things like that.

Michael Armbrust: 06:42
And so we made this conscious effort to make Spark really the core of data processing and have APIs from other systems to plug into it. We want there to be a rich ecosystem around Spark, but we don’t want to kind of inflict our will on the Spark community. So I think really having a powerful plugin architecture is just, it’s a better choice for the community than just shoving everything into the main projects. Delta is a relatively young project. So we actually do releases significantly more often than Spark does. And I think that’s a nice thing about having this decoupling.

Denny Lee: 07:14
So then what aspects of Delta are most exciting to you? Talk a little bit about the roadmap from your perspective.

Michael Armbrust: 07:21
Yeah. So I think if you look at the early days of Delta, it was all about scalability, transactional metadata, and things like that. As we move forward, it’s really about taking the power of Delta and the power of Spark and bringing it to a new audience. So I think Delta made it easy for data engineers to run real production pipelines that were up 24/7 and had low latency. However, I think still there’s a pretty big gap from writing a SQL query to running a production ETL pipeline. And so where we’re really taking Delta is we want to kind of take this idea and make it possible for anybody to take advantage of it. And so I think you’ll be seeing some really cool things in the pipeline space, also in the ability to ingest data into Delta without having to write any code through the auto loader stuff that we’re doing in Databricks. So it’s really building around the Delta ecosystem to make it possible for everybody to take advantage of it.

Brooke Wenig: 08:15
So taking a step back. Why is the project named Delta Lake? What were some of the names that you were considering? Does the word Delta have any impact?

Michael Armbrust: 08:23
Yeah, naming is hard and Delta actually, the first version of it, when I opened that poll request after Spark Summit, we actually started by naming the project Tahoe. So I don’t know if everyone around the world is familiar, but there’s a pretty large lake in California called Lake Tahoe. And it’s actually so large, it has so much water in it, that if you were to take this lake and spread it out over the entire state of California, the water would actually be over a foot deep, which I thought was just an absolutely crazy statistic.

Michael Armbrust: 08:52
And so we started with this idea of a massive Data Lake as kind of the impetus for naming it. And we wanted something that was a little more general and not so California specific. So one day at a happy hour, we were debating different options. And Jules, another person at Databricks, suggested this idea of Delta. And we really liked it because it actually captured a bunch of the kind of core ideas. It maintains the streaming and water metaphor. You have the River Delta as it goes into the ocean. And also, even in the early days, the log that records changes, we had actually always called it a Delta log. So it was actually a pretty natural name for the project.

Brooke Wenig: 09:33
That’s super interesting. I’m from California and I didn’t even know that Lake Tahoe had that much water in it.

Michael Armbrust: 09:39
I know. It’s really, it’s a big lake. Although I think we might actually be rivaling it in size now with the amount of data stored in Delta.

Brooke Wenig: 09:48
So what were some of the most challenging aspects of building out Delta? I know you had mentioned that it took six months to get a prototype. How much effort was it to be able to release version 0.1?

Michael Armbrust: 09:58
Yeah. So there were a bunch of different phases of Delta. So the first version was just a scalable transaction log with support for Streaming. And that was actually pretty easy to get right. We really just took what we learned in Apache Spark and kind of built the next version of that transaction log. I think the first big challenge was rounding out its SQL support. That took almost another year after building Delta to add full support for update, delete, merge, create table, alter table, all the things that you want from a real data warehouse. And I think that’s actually one of the things I think sets Delta apart from its contributors is you don’t have to write scala code in order to use it. You can do almost everything you want to do using pure SQL. And then I think after that, we spent a lot of time on scalability and automatic schema migration, improving the performance of all of those operations. So it’s really been quite a long journey to get to where the software is today.

Denny Lee: 10:53
Actually, I’m curious, why automatic schema migration? What was the impetus for doing such a thing? Was that really that common in a lot of those Spark environments?

Michael Armbrust: 11:02
Yeah. So, if you look at where the data is coming from, I think that at least a lot of our customers are ingesting data from these just massive, structured, JSON, XML, usually JSON data sources. And so often the first hop is you want to have… We talk about bronze, silver, and gold, kind of the different quality classes of data. When you’re in your first class, when you’re ingesting the raw data into the bronze zone of your Delta Lake, you really just want to capture everything. And when you have these complex, deeply nested schemas, you would spend all day writing alter table statements if you wanted to just ingest that data without doing any transformation on it.

Michael Armbrust: 11:41
And so from the early days of Delta, we had this idea that since Spark knows the schema and Spark can tell the schema to Delta, why can’t Delta just automatically do the alter table statements for you. And that was actually one of the first features we added back in the early prototype days. And I think today it’s one of the most popular features. I think the way you could tell how popular it is, is up until recently we didn’t have support for this in the merge into operation. And it was one of the most requested features, out of all of the different support channels and Slack and all the different places where people ask for stuff. They love auto schema migration, because I think it really does make that initial ingestion much easier and allows you to capture everything and keep it forever.

Brooke Wenig: 12:25
So I think you’ve made a very good point for why Delta is important for data engineers and data analysts, making sure that they have a central source of truth. The data is consistent. The data is up to date. How is Delta important for data scientists and machine learning engineers?

Michael Armbrust: 12:38
Yeah, so I think good data science and good machine learning always starts with clean data. And if you are not using a transactional storage system for that data, and you’re storing it on something like S3, you just don’t have clean data. You may not realize it, but I think it’s very, very difficult to get correctness without asset transactions on top of an eventually consistent system. So, I think Google had this really good paper a couple of years ago, where they talked about all of the pre-work that it took to do good machine learning. And it’s ETL, data extraction, featurization. And I think Delta makes all of those parts dramatically simpler. So you can get to the fun part of doing data science and machine learning more quickly and more accurately.

Brooke Wenig: 13:20
So I know a lot of the customers that I’ve worked with have struggled with this. They’re trying to build machine learning models on top of some data they have on S3, they build a model today, tomorrow they change the hyperparameters. But oops, overnight, the data’s changed. How does Delta solve some of those problems?

Michael Armbrust: 13:34
So it’s very natural because of the architecture of Delta to be able to do time travel. So under the covers, Delta uses this trick called multi version concurrency control, a well-known trick from the database industry. And what that means is, in Delta we never modify data. Rather than changing a record, we actually make a copy of that file and leave the old copy there. And the transaction log says which of these multiple versions is valid at any time. But what that means is if you want to go back and reconstruct the table as it was yesterday, all you need to do is play the transaction log up to that point and just stop early. Stop before you get to the end of the table. And that allows you to get exactly that data that you were training on yesterday. And when you connect this ability to MLflow’s ability to record the inputs and other parameters into your machine learning models, it means that you can very accurately reproduce any model that you’ve ever trained in the past, as long as all of that data is stored in Delta.

Denny Lee: 14:27
So the thing about MVCC though is that you’ve got a lot of files. Right? So I guess the first question is the implication that you’re going to have slow performance when you’re listing all of these files, especially from cloud optic stores, like S3.

Michael Armbrust: 14:41
Yeah. So that’s actually… No, that’s okay. That’s actually a common misconception. I get this question pretty often. Like, “Hey, my Delta table is slow. Is it listing all of the things?” So Delta was actually designed such that you could query any Delta table making only one list RPC to the file system, which is pretty weird and how do we actually do that? So there’s a couple of tricks that we have under the covers. First of all, the Delta log itself, the names of the files that actually make up the Delta log, are written such that they sort lexicographically. So systems like S3 are lexicographically partitioned in the metadata tier and then hash partitioned. So everything’s just randomly spread out in the data tier. What that means is when you’re listing a directory, if you order the file names correctly, you can list only part of it. You don’t have to list that entire log directory. So you can say list just the end of the directory.

Michael Armbrust: 15:38
The other trick is we have this thing called the last checkpoint file that gets you close to the end of the transaction log. So we start close to the end of the transaction log, we list just those files, and we discover where the end of the transaction log is. And then all of the other file names are stored in the transaction log itself. So one list to the log and that’s all you need to do in terms of discovering what data is in the table. The rest of it all happens in parallel across the Spark cluster when you fetch those files. And you’re only fetching the ones that are actually valid at that time.

Brooke Wenig: 16:11
So if someone’s complaining that their Delta table is slow, what advice do you have to diagnose and troubleshoot some of the slow queries that they’re running on their Delta table?

Michael Armbrust: 16:20
Yeah. So step number one, run describe details. Check to see how big your files are. Almost always the problem is tiny files. And fortunately Delta has a lot of cool tricks to handle these tiny files. So first of all, if you haven’t turned on auto optimize, is a feature we have inside of Databricks, turn that on. It almost always makes your performance better. What it does is it actually, as you’re writing into the Delta table, will look at the sizes of all the different partitions and collect them down into nice reasonable sizes, which will dramatically improve your read performance.

Michael Armbrust: 16:54
And then the other thing you can do after the fact is you can run the optimize command, which basically scans all of the data, finds files that are too small, and issues an ACID transaction to take those small pieces and turn them into bigger pieces.

Michael Armbrust: 17:08
The other thing you can do is you can cluster related data. And so this is what optimize z-order is for. If you tell us what columns you’re going to be issuing predicates over, we will actually take all of those columns, map them onto this multidimensional space, sort the data, and then rewrite it out, such that those columns are clustered together. And what that means is the data that you’re looking for for any given query will be in the smallest number of files possible. And we’ll use statistics about what data is in each file to eliminate files that aren’t relevant to your query. And we’ve seen cases where that’s eliminated up to 97% of the table. So it can make your query significantly faster. We’re talking hours to minutes.

Denny Lee: 17:51
Well, in that case, when should you call optimize? Why not just automatically optimize everything right away?

Michael Armbrust: 17:57
Well, so that’s kind of what auto optimize is. And we’re actually looking at, in the future, turning it on by default, because I think it’s almost always a good idea unless you are hand tuning your partitions. And even if you’re hand tuning your partitions, I’ve got news for you, your data size is probably changing and you’re probably not updating that over time. So auto optimize is almost always a good idea and we’ll be turning that on. And we are looking at the ability to kind of automatically run optimize overnight or other times on Delta tables. That’s definitely, I think one of the making Delta simple things that we’re looking at doing in the near future.

Denny Lee: 18:30
I mean, related to that, could you sort of clarify the difference between when you’re running the optimized versus when you’re running a vacuum to actually remove those files?

Michael Armbrust: 18:40
Yeah. So optimize is about taking the active data in the table and making it more efficient for the next query, or creating the next version of the table and the state that is faster for readers. Vacuum is a different operation. Vacuum is saying, “Hey, this MVCC thing is great. I like to be able to time travel, but I don’t want to be able to time travel back infinitely because I don’t have infinite money to spend on storage costs.” And so what vacuum does is it takes all of the files that are out there and the current state of the transaction log, and it actually does this big anti-join. So it discovers files that are on the storage system, but are no longer referenced by the Delta log. And then it cleans them up. And this is of course a configurable operation.

Michael Armbrust: 19:23
You basically think about a retention window. How long do I want to keep data around for? By default, we keep it for seven days. That way, any streams that are running or any active transactions that are running, won’t be interrupted by you vacuuming, because it’s very unlikely that they’re going to be running for more than seven days. But based on different use cases, I have some people who vacuum retain zero hours, which gives you basically, it says, turn this table into a normal Parquet table, remove all other versions. So now it’s no longer multi version currency controlled. It’s just a single version residing out there. And I also have some customers who never run vacuum. They do want to spend arbitrary amounts of money to be able to reproduce any table version that has ever existed, often for regulatory reasons.

Denny Lee: 20:10
Well, then that naturally leads to our next question. Was Delta here specifically to address GDPR or CCPA compliance type reasons?

Michael Armbrust: 20:21
Actually, yeah, no, I think the timing was very fortuitous. Right around the time that those laws were passed Delta was kind of coming into its own, but no, we didn’t actually create it just for that, but it’s incredibly useful. If you think about GDPR and CCPA, they have you do these kind of different modification patterns that you didn’t really see in Data Lakes before, where rather than modifying just a single partition, you actually need to scan an entire table to remove the entire history of somebody when they invoke their right to be forgotten. And doing so just on a file system is incredibly dangerous. So what people used to do, and the reason it’s dangerous is if your job crashes in the middle, you don’t know what you’ve deleted, what you haven’t deleted. You can often leave your table in a corrupt state.

Michael Armbrust: 21:05
So I had customers who were basically writing up to the very limits of this law. Only once every 30 days they would take their entire Data Lake and copy it, removing the users who wanted to be forgotten during that time period. And while that works, it was pretty slow and it was incredibly costly. And so what Delta does is by bringing ACID transactions, you can now do these modifications on top of the table in a safe way. And then once the modification is complete, you vacuum and now that person is actually gone forever. So I think the ability to run delete and merge in a safe way over your entire Delta table really makes dealing with these regulatory requirements significantly easier.

Brooke Wenig: 21:48
So going back a little bit to when you were talking about how you should be optimizing your Delta table, you had said you could use z-ordering. When should you use z-ordering versus partitioning?

Michael Armbrust: 21:59
Yeah, that’s a good question. I think in general, the difference between z-order and partitioning is partitioning is enforced. We guarantee that for these set of files, there are only a certain set of values for those partition columns. And it allows us to do very efficient operations because we can do things at a metadata only level. I think a good way to think about this is I think partitioning by date is a canonically good idea. So when you partition by date, these files will only have data from that particular date. So now when you do an operation, like select start from where, I know these files definitely have those records. I don’t have to look anywhere else for it.

Michael Armbrust: 22:39
But you can also do something like retention. You say, “I want to delete all data that is older than two years.” I don’t even have to scan through those files to see if they contain these records. I know for a fact, all data from those dates is there. And so we can do that delete operation as metadata only, without even running a Spark job. So it ends up being significantly faster. Partitioning has limitations, though, for exactly that reason that it forces data to be separate. If you partition by a column with too high a cardinality where there’s too many distinct values in that column, what that’s going to do is it’s going to force the data into lots of tiny files.

Michael Armbrust: 23:15
The example I always get here is when people partition by user ID, you end up with a different file for every user. And if you’re a successful company, that’s probably too many files for you. And so I think partition by date, Z-order by user ID, is a good rule of thumb. And if you want to kind of think about this more generally, the rule I usually use is if you have less than a gigabyte of data in a partition, you’re probably over partitioning. At a gigabyte, having more files doesn’t really matter anymore. Loading and processing that file is large enough that it makes sense.

Brooke Wenig: 23:51
That’s a super helpful rule of thumb to recommend for other folks going forward. Speaking of Delta Lake, what are some new things in the roadmap that are exciting to you? What are some new features that you want everyone to start using in the next few months or in the next year?

Michael Armbrust: 24:04
Yeah, so I think the features I’m most excited about are about getting data into the Delta Lake. So we have this new feature inside of Databricks called copy into. If you’ve got data coming from Fivetran or Stitch or any of our other ingestion partners, they’ll use this to exactly once ingest data into the Delta Lake so you can start querying it with confidence. And its streaming cousin is this thing called the autoloader where you point it at any directory of files, whether they’re JSON, CSV, whatever, stored on ADLS or S3, and we will automatically ingest it into Delta exactly once in a streaming fashion over time. And there’s a bunch of cool tricks under the covers here. If there are lots and lots of files, we will subscribe to notifications so we don’t have to list those directories. Again, coming back to kind of avoiding slow operations on these cloud storage systems.

Michael Armbrust: 24:56
And the feature that we’re working on right now is schema inference, with support for schema drift. So if I don’t even know what the schema of this JSON data is, the system can help you figure that out and can allow you to provide rules for what to do when the schema changes. Do I want to just ingest everything as a string so I can adjust everything always? Do I want to reject new columns and raise an error for the user to do something about it? It kind of all depends on your particular application. We’re going to give you those tuning knobs to make it possible to get whatever data you have into the Delta Lake easily.

Denny Lee: 25:30
Well, this has been a great session around demystifying Delta Lake with Michael Armbrust. We really appreciate you taking time out of your really busy schedule for joining our Data Brew vidcast today. So, Michael, thanks very much.

Michael Armbrust: 25:42
Yeah. Thanks for having me.