Delta Lake を開始する

Making Apache Spark Better with Delta Lake

Michael Armbrust. Principal Software Engineer at Databricks
Michael Armbrust is committer and PMC member of Apache Spark and the original creator of Spark SQL. He currently leads the team at Databricks that designed and built Structured Streaming and Databricks Delta. He received his PhD from UC Berkeley in 2013, and was advised by Michael Franklin, David Patterson, and Armando Fox. His thesis focused on building systems that allow developers to rapidly build scalable interactive applications, and specifically defined the notion of scale independence. His interests broadly include distributed systems, large-scale structured storage and query optimization.

Series Details

This session is part of the Getting Started with Delta Lake series with Denny Lee and the Delta Lake team.

Session Abstract

Join Michael Armbrust, head of Delta Lake engineering team, to learn about how his team built upon Apache Spark to bring ACID transactions and other data reliability technologies from the data warehouse world to cloud data lakes.

Apache Spark is the dominant processing framework for big data. Delta Lake adds reliability to Spark so your analytics and machine learning initiatives have ready access to quality, reliable data. This webinar covers the use of Delta Lake to enhance data reliability for Spark environments.

Topic areas include:

  • The role of Apache Spark in big data processing
  • Use of data lakes as an important part of the data architecture
  • Data lake reliability challenges
  • How Delta Lake helps provide reliable data for Spark processing
  • Specific improvements improvements that Delta Lake adds
  • The ease of adopting Delta Lake for powering your data lake

What you need:
Sign up for Community Edition here and access the workshop presentation materials and sample notebooks.

Video Transcript

– [Denny] Hi, everybody. Welcome to our webinar today, Making Apache Spark Better with Delta Lake.

Before we get started with today’s presentation, we wanted to go over a few housekeeping items to ensure that you have the best possible experience. Please note that your audio connections will be muted for the webinar for everyone’s viewing comfort. If you have any concerns or questions, please pose those questions in the question panel or chat. In that panel we encourage you to use this time to ask as many questions and clarify any doubts that you may have on today’s topic. Our key presenter today, Michael Armbrust, is the original creator of Spark SQL and Structured Streaming, and one of the primary creators of Delta Lake. He’s the principal engineer at Databricks, and so without any further delay, take it away Michael. – [Michael] Thank you, Denny. I’m super excited to be here today to talk about how you can make Apache Spark better by using Delta Lake. However, before I jump into that, I wanna start by talking about this concept of a data lake and why so many people are excited with it, and also why there’s a lot of challenges when they try to set these things up as well.

The Promise of the Data

So first of all, what is a data lake, and what does it mean to me? So the promise of a data lake is basically this, organizations have a lot of data. It might be kind of carefully curated customer data in your OLTP system. It might be raw click streams coming from your web server, or it might be kind of unstructured data coming from a bunch of sensors. And the promise of a data lake is you can take all of that and just dump it in the data lake. And this is actually really powerful when you compare it to a traditional database, because in a traditional database, you have to start by coming up with a schema and doing a lot of cleaning. This is often called kinda schema on write, and what a data lake allows you to do is it allows you to kind of forego that process and just start by collecting everything because sometimes you don’t know why data is valuable until much later, and if you haven’t stored it, then you’ve already lost it. And so with the data lake, it’s just a bunch of files out in a file system. It could be S3 or HDFS or Azure Blob storage, and you can just dump everything there and then come back and look at it later. And the idea is when you’re done, once you’ve collected it all, then you can actually get insights from it. You can do data science and machine learning. You can build powerful tools for your business, like recommendation engines or fraud detection algorithms. You can even do crazy things like cure cancer using genomics and DNA sequencing. However, I’ve seen this story many, many times, and typically what happens is unfortunately the data at the beginning is garbage. And so the data that you store in your data lake is garbage, and as a result, you get garbage out from these kind of more advanced processes that you try to do at the end. And why does that happen? Why is it so difficult to get quality and reliability out of these data lakes? And what does this kinda typical project look like? So I wanna walk you through a story that I’ve seen kind of happen over and over again at many organizations, when they sit down and try to extract insights from their data.

Evolution of a Cutting-Edge Data Lake

And it typically goes something like this, and this is kind of what is considered cutting edge today, but before Delta Lake. So a pretty common pattern is you’ve got a stream of events. They’re coming into some system like Apache Kafka, and your mission is to do two things. You need to do streaming analytics, so you can know what’s going on in real time in your business. And you also want to do AI and reporting where you can kind of look at a longer period of time and do a longitudinal analysis, and actually look at kinda the history and trends and make predictions about the future. So how are we gonna do this? So kinda step one, I sit down at my computer and I know that Spark has really good APIs for reading from Apache Kafka. You can use data frames, data sets, and SQL and Spark SQL to kind of process and do aggregations and time windows and all kinds of things, and come up with your streaming analytics. And so kind of, we start with that and off the bat, it’s working pretty well, but this brings us to challenge number one, which is historical queries.

Challenge #1: Historical Queries?

Kafka is great for kinda getting real time analysis, but it can only store a day or a week worth of data. You don’t want to be storing years and years worth of data in Kafka. And so we’ve got to solve this problem as well. Real time is really good for what’s happening at this moment, but it’s not so good at looking for trends historically. So I’ve been reading a lot of blog posts and a pretty common pattern that happens here is there’s this thing called the Lambda architecture, which as far as I can tell is basically you just do everything twice. You have one real time thing that is kind of doing an approximation and giving you kind of exactly what’s happening at this very moment, and you have another pipeline that’s maybe a little more curated. It runs a little bit more slowly, but it’s archiving all of that data into your data lake. And so that’s kind of, that’s step number one. So if we want to solve this historical query problem, we’re gonna also set up the Lambda architecture on top of just kind of vanilla Apache Spark, and once I’ve got all that data in the data lake, the idea is now I can run Spark SQL queries over that as well, and now you can do AI and reporting. That was a bit of extra work, some extra coordination, but fortunately Spark has unified API for batch and streaming. And so it’s possible to do it and we get it set up, but that brings us to challenge number two.

Challenge #2: Messy Data?

Like I said before, data in the real world is often messy. Some team upstream from you changes the schema without telling you, and now you have problems. And so kinda a pattern that I see here is you need to add validations. So you need to actually write extra Spark SQL programs that are just checking to make sure that your assumptions about the data are correct. And if they go, if they’re wrong, it sends off an email so that you can correct it. Now, of course, because we’ve done the Lambda architecture, we have to do validations in two different places, but that’s, again, that’s something that we can do. We can use Spark to do it. And so now we set up validation to handle the messy data. That unfortunately brings us to challenge number three, which is mistakes and failures. Those validations are great, but sometimes you forget to put one in place or there’s a bug in your code, or even harder is your code just crashes in the middle ’cause you’re running on EC2 and your Spot Instances died or whatever, and now you have to worry about, “How do I clean that up?” The real problem with using these kinds of distributed systems and kinda distributed file systems is if a job crashes in the middle, it leaves garbage results out there that need to be cleaned up. And so you’re kind of forced to do all of the reasoning about correctness yourself. The system isn’t giving you a lot of help here. And so a pretty common pattern is people, rather than working on an entire table at a time, because if something goes wrong, we need to recompute the entire table. They’ll instead break it up into partitions. So you have a different folder. Each folder stores a day or an hour or a week, whatever kind of granularity makes sense for your use case. And you can build a lot of, kinda scripting around it, so that it’s easy for me to do recomputation. So if one of those partitions gets corrupted for any reason, whether it was a mistake in my code or just a job failure, I can just delete that entire directory and reprocess that data for that one partition from scratch. And so kind of by building this partitioning and reprocessing engine, now I can handle these mistakes and failures. There was a little bit of extra code to write, but now I can kind of sleep safe and sound knowing that this is gonna work.

Challenge #4: Updates?

That brings us to challenge number four though, updates. It’s very difficult to do point updates. It’s very easy to add data, but it’s very difficult to change data in this data lake and how to do it correctly. And you may need to do this for GDPR reasons. You may need to do retention. You might have to do anonymization or other things, or you might just have kind of errors in the data. And so now you have to end up writing a whole other class of Spark jobs that do updates and merges. And this can be very difficult, and typically because it’s so difficult, what I see people do is rather than do individual updates, which would be very cheap, they actually just any time they need to do something, whenever they get a set of DSRs once a month, they will copy the entire table, removing anybody who’s asked to be forgotten due to GDPR. And they can do it, but it’s another Spark job to run. It’s very costly. And there’s kind of a subtlety here that makes it extra difficult, which is if you modify a table while somebody is reading it, generating a report, they’re gonna see inconsistent results and that report will be wrong. So you’ll need to be very careful to schedule this to avoid any conflicts, when you’re performing those modifications. But these are all problems that people can solve. You do this at night and you run your reports during the day or something. And so now we’ve got a mechanism for doing updates. However, the problem here is this has become really complicated. And what that means is you’re wasting a lot of time and money solving systems problems rather than doing what you really want to be doing, which is extracting value from your data. And the way I look at this is these are all distractions of the data lake that prevents you from actually accomplishing your job at hand.

Data Lake Distractions

And to kind of summarize what I think these are, a big one here is no atomicity. When you run a distributed computation, if the job fails in the middle, you still have some partial results out there. It’s not all or nothing. And so atomicity means that when a job runs, it either completely finishes correctly or if anything goes wrong, it completely rolls back and nothing happens. So you no longer leave your data in a corrupt state requiring you to kind of tediously build these tools to do manual recovery. Another key problem is there’s no quality enforcement. It’s up to you in every job to manually check the quality of the data that’s coming in against all of your assumptions. There’s no help from the system, like in variants in a traditional database where you can say, “No, this column is required,” or “This must be this type of schema.” All of that stuff is kind of left up to you as the programmer to handle. And then finally there’s no control for consistency or isolation. And this means you can really only do one right operation to any data lake table at a time, and it makes it very difficult to mix streaming and batch to do operations while people are reading from it. And these are all things that you kind of, you would expect from your data storage system. You would want to be able to do these things, and people should always be able to see a consistent snapshot automatically.

So let’s kind of take a step back now and look at what this process looks like with Delta Lake instead.

Challenges of the Data Lake

And the idea of Delta Lake is we take this relatively complicated architecture, where a lot of these correctness and other things were left up to you manually writing Spark programs. And we kind of changed it to something like this, where you’re thinking only about data flow, where you bring in all of the data from your organization and flow it through, continually improving the quality until it’s ready for consumption.

The A

And the kind of hallmarks of this architecture here are, first of all, Delta Lake brings to Apache Spark full ACID transactions. And what this means is every Spark job that runs will now complete either the entire job or nothing at all. People who are reading and writing from the same time are guaranteed to see consistent snapshots. And when something is written out, it’s definitely written out and it will not be lost. Those are kind of the hallmarks of ACID. And this allows you to focus on your actual data flow rather than thinking about all of these extra systems’ problems and solving kind of this known thing over and over again. Another key aspect of Delta Lake is it’s based on open standards and it’s open source. So it’s a full Apache license, no kind of silly Common Clauses or anything like that. You can take it and use it for whatever application you want completely for free. And personally, that would be really important to me if I was storing petabytes of data, right? Data has a lot of gravity. There’s a lot of inertia when you collect a lot of data and I wouldn’t want to put it in some black box where it’s very difficult for me to extract it. And this means that you can store that mass amount of data without worrying about lock-in. So both is it open source, but it’s also based on open standards. So I’ll talk about this in more detail later in the talk, but underneath the covers, Delta is actually storing your data in parquet. So you can read it with other engines and there’s kind of a growing community around Delta Lake building this native support in there. But worst case scenario, if you decide you want to leave from Delta Lake all you need to do is delete the transaction log and it just becomes a normal parquet table. And then finally, Delta Lake is deeply powered by Apache Spark. And so what this means is if you’ve got existing Spark jobs, whether they’re streaming or batch, you can easily convert those to getting all kinds of benefits of Delta without having to rewrite those programs from scratch. And I’m gonna talk exactly about what that looks like later in the talk. But now I want to take this picture and simplify it a little to talk about some of the other hallmarks I see of the Delta Lake architecture, and where I’ve seen people be very successful. So first of all, I wanna kind of zone in on this idea of data quality levels. These are not fundamental things of Delta Lake. I think these are things that people use a variety of systems, but I’ve seen people very successful with this pattern, alongside the features of Delta.

The A DELTA LAKE

And so these are just kind of general classes of data quality, and the idea here is as you bring data into the data lake, rather than trying to make it perfect all at once, you’re going to incrementally improve the quality of your data until it’s ready for consumption. And I’ll talk about why I think that’s actually a really powerful pattern that can actually help you be more productive. So starting at the beginning is your bronze level data. This is a dumping ground for raw data. It’s still on fire, and I actually think that’s a good thing because the kind of core idea here is if you capture everything without doing a lot of munging or parsing on it, there’s no way that you can have bugs in your parsing and munging code. You’re keeping everything from the beginning and you can often actually keep a year’s worth of retention here. And I’ll talk in a little bit about why I think that’s actually really important, but this means you can collect everything. You don’t have to spend a bunch of time ahead of time deciding what data’s gonna be valued and what data’s not. You can kind of figure that out as you go, as you do your analysis. Moving on from bronze, we move on to kind of silver level data. This is data that is not yet ready for consumption. It’s not a report that you’re gonna give to your CEO, but I’ve already done some cleanup. I filtered out one particular event type. I’ve parsed some JSON and given it a better schema or maybe I’ve joined and augmented different data sets. I kinda got all the information I want in one place. And you might ask, if this data isn’t ready for consumption, why am I creating a table, taking the time to materialize it? And there’s actually a couple of different reasons for that. One is oftentimes these intermediate results are useful to multiple people in your organizations. And so by creating these silver level tables where you’ve taken your domain knowledge and cleaned the data up, you’re allowing them to benefit from that kind of automatically without having to do that work themselves. But a more interesting and kind of more subtle point here is it also can really help with debugging. When there’s a bug in my final report, being able to query those intermediate results is very powerful ’cause I can actually see what data produced those bad results and see where in the pipeline it made sense. And this is a good reason to have multiple hops in your pipeline. And then finally, we move on to kind of the gold class of data. This is clean data. It’s ready for consumption at business-level aggregates, and actually talk about kind of how things are running and how things are working, and this is almost ready for a report. And here you start using a variety of different engines. So like I said, Delta Lake already works very well with Spark, and there’s also a lot of interest in adding support for Presto and others, and so you can do your kind of streaming analytics and AI and reporting on it as well.

So now I want to talk about how people actually move data through the Delta Lake, through these different quality classes. And one of the patterns that I see over and over again is streaming is actually a really powerful concept here. And before I go too deep into streaming, I want to correct some misconceptions that I often hear. So one thing that people usually think when they hear streaming, they think it’s gotta be super fast. It’s gotta be really complicated because you want it to be really fast. And Spark actually does support that mode if that’s an application that you have. There’s continuous processing where you continually pull the server for new data, kinda holding onto that core, that supports millisecond latencies, but that’s actually not the only application where streaming can make sense. Streaming to me is really about incremental computation. It’s about a query that I want to run continuously as new data arrives. So rather than thinking about this as a bunch of discrete jobs and putting all of the management of those discrete jobs kind of on me or some workflow engine, streaming takes that away. You write a query once. You say, “I want to read from the bronze table, I’m gonna do these operations, I went right to the silver table,” and you just run it continuously. And you don’t have to think about the kind of complicated bits of what data is new, what data has already been processed. How do I process that data and commit it downstream transactionally? How do I checkpoint my state, so that if the job crashes and restarts, I don’t lose my place in the stream? Structured streaming takes care of all of these concerns for you. And so, rather than being more complicated, I think it can actually simplify your data architecture. And streaming in Apache Spark actually has this really nice kind of cost-latency tradeoff that you can too. So at the far end, you could use continuous processing mode. You can kind of hold onto those cores for streaming persistently, and you can get millisecond latency. In the middle zone, you can use micro-batch. And the nice thing about micro-batch is now you can have many streams on the cluster and they’re time-multiplexing those cores. So you run a really quick job and then you give up that core and then someone else comes in and runs it. And with this, you can get seconds to minutes latency. This is kind of a sweet spot for many people, ’cause it’s very hard to tell if one of your reports is up to date within the last minute, but you do care if it’s up to date within the last hour. And then finally, there’s also this thing called trigger once mode in Structured Streaming. So if you have a job where data only arrives once a day or once a week or once a month, it doesn’t make any sense to have that cluster up and running all the time, especially if you’re running in the cloud where you can give it up and stop paying for it. And Structured Streaming actually has a feature for this use case as well. And it’s called trigger once where basically rather than run the job continuously, anytime new data arrives, you boot it up. You say trigger once. It reads any new data that has arrived, processes it, commits a downstream transaction and shuts down. And so this can give you the benefits of streaming, kind of the ease of coordination, without any of the costs that are traditionally associated with an always running cluster. Now, of course, streams are not the only way to move data through a Delta Lake. Batch jobs are very important as well. Like I mentioned before, you may have GDPR or kind of these corrections that you need to make. You may have changed data capture coming from some other system where you’ve got a set of updates coming from your operational store, and you just want to reflect that within your Delta Lake and for this, we have UPSERTS. And of course, we also support just standard insert, delete, and those kinds of commands as well. And so the really nice thing about Delta Lake is it supports both of these paradigms, and you can use the right tool for the right job. And so, you can kind of seamlessly mix streaming and batch without worrying about correctness or coordination.

And one kind of final pattern here that I want to talk about is this idea of recomputation. So when you have this early table that keeps all of your raw results and when you have very long retention on that, so years worth of the original data. And when you use streaming in between the different nodes of your kind of Delta Lake data graph, it’s very easy for you to do recomputation. You might want to do recomputation ’cause there was a bug in your code, or you might want to do recomputation because there’s some new thing that you’ve decided that you want to extract. And the really nice thing here because of the way that streaming works is this is very simple. So just to kind of give you a mental model for how Structured Streaming works in Apache Spark, we basically have the model that a streaming query should always return the same results as the batch query over the same amount of data. So what that means is when you start a new stream against the Delta table, it starts by taking a snapshot of that table at the moment that the stream started. And you kind of do this backfill operation where you process all of the data in that snapshot, breaking it up into nice little chunks, and checkpointing your state along the way, committing it downstream. And when you get to the end of that snapshot, we switch to tailing the transaction log and only processing new data that has arrived since the query started. And what this means is that you get the same result as though you had run the query at the end anyway, but with significantly less work than running it over and over and over again from scratch. So if you want to do recomputation under this model, all you need to do is clear out the downstream table, create a new checkpoint, and start it over. And it will automatically process from the beginning of time and catch up to where we are today.

So that’s actually a pretty powerful pattern for kind of correcting mistakes and doing other things. So now that we’ve kind of gone over the kind of high level, I want to talk about some specific use cases where Delta Lake has been instrumental in both reducing costs and easing the management of using Apache Spark on top of these data lakes. So Delta Lake, I want to give a little bit of a history here.

Used by 1000s of organizations world wide

So Delta Lake is actually two years old. We kind of had it inside of Databricks for the last two years. It was a proprietary solution and we’ve got some of our largest customers using it. So I’m gonna talk in particular about Comcast, but also Riot Games, and Jam City, and Nvidia, a bunch of big names that you know. They’ve been using it for many years. And about two months ago at the Spark Summit, we decided to open source it so everybody, even people running on prem or in these other locations could get access to the power of Delta Lake. So I want to talk about one particular use case that I thought was really cool. This is Comcast. So their problem here is they have set-top boxes around the world, and in order to understand how people are interacting with their programming, they need to kind of sessionize this information. So you watch this TV show, you change the channel, you go over here, you go back to this other TV show. And with this they can create better content by understanding how people consume it. And as you can imagine, Comcast has many subscribers, so there’s petabytes of data. And before Delta Lake, they were running this on top of Apache Spark. And the problem was the Spark job to do this sessionization was so big that the Spark job, the Spark scheduler would just tip over. And so, rather than run one job, what they actually had to do was they had to take this one job, partition it by user ID. So they kind of take the user ID, they hash it, they mod it by, I think, by 10. So they break it into kind of 10 different jobs, and then they run each of those jobs independently. And that means that there’s 10x, the overhead, in terms of coordination. You need to make sure those are all running. You need to pay for all of those instances. You need to handle failures and 10 times as many jobs, and that’s pretty complicated. And the really cool story about switching this to Delta was they were able to switch a bunch of these kinds of manual processes to streaming. And they were able to dramatically reduce their costs by bringing this down into one job, running on 1/10 of the hardware. So they’re now computing the same thing, but with 10x less overhead and 10x less cost. And so that’s a pretty kind of powerful thing here that what Delta’s scalable metadata can really bring to Apache Spark. And I’m gonna talk later in the talk exactly how that all works.

But before I get into that, I want to say, I want to show you exactly how easy it is to get started if you’re already using Apache Spark with Delta Lake.

Get Started with Delta using Spark APIS

So getting started is trivial. So it published on Spark Packages. All you need to do to install Delta Lake on your Spark cluster is use Spark packages. So if you’re using PySpark, you can just do dash, dash packages and then Delta. If you’re using the Spark Shell, same thing. If you’re building a kind of Java or Scala jar, and you want to depend on Delta, all you need to do is add a Maven dependency, and then changing your code is equally simple. If you’re using the data frame reader and writer in Sparks SQL, all you need to do is change the data source from parquet or JSON or CSV or whatever you’re using today to Delta, and everything else should work the same. The only difference is now everything will be scalable and transactional, which as we kinda saw before, can be very powerful.

Data Quality

So everything I’ve talked about so far has been mostly about these kinds of system problems of correctness. If my job crashes, I don’t want it to corrupt the table. If two people write to the table at the same time, I want them to both see consistent snapshots, but data quality is actually more than that. You can write code that runs correctly, but there can be a bug in your code and get the wrong answer. And so this is why we’re kind of expanding the notion of data quality to allow you to kind of declaratively talk about the quality constraints. So this is work that’s kind of coming in the next quarter or so, but the idea here is we allow you to, in a single place, specify the layout and constraints of your Delta Lake. So first we can see kind of some important things like where the data is stored. You can optionally turn on a strict schema checking. Delta Lake has two different modes here, and I kind of often see people use both of them as they move through their data quality journey. In the earlier tables, you will use kind of schema imprints where maybe you just read a bunch of JSON and just put it exactly as it is into the Delta Lake. We have nice tools here where we will automatically perform safe schema migrations. So if you’re writing data into Delta Lake, you can flip on the merge schema flag, and it will just automatically add new columns that appear in the data to the table, so that you can just capture everything without spending a bunch of time writing DDL. We, of course, also support kinda standard strict schema checking where you say, create table with the schema, reject any data that doesn’t match that schema, and you can use alter table to change the schema of a table. And often I see this use kind of down the road in kind of the gold level tables where you really want strict enforcement of what’s going in there. And then finally, you can register tables in the Hive Metastore. That support is coming soon, and also put human readable descriptions, so people coming to this table can see things, like this data comes from this source and it’s parsed in this way, and it’s owned by this team. These kind of extra human information that you can use to understand what data will get you the answers you want. And then finally, the feature that I’m most excited about is this notion of expectations. An expectation allows you to take your notion of data quality and actually encode it into the system. So you can say things like, for example, here, I said, I expect that this table is going to have a valid timestamp. And I can say what it means to be a valid timestamp for me and from my organization. So, I expected that the timestamp is there and I expect that it happened after 2012 because my organization started in 2012, and so if you see data from, say, 1970 due to a date parsing error, we know that’s incorrect and we want to reject it. So this is very similar to those of you who are familiar with a traditional database. This sounds a lot like a variant where you could say not null or other things on a table, but there’s kind of a subtle difference here. I think if you, so the idea of invariants are, you can say things about tables, and if one of those invariants is violated, the transaction will be aborted, will automatically fail. And I think the problem with big data, why invariants alone are not enough is if you stop processing every single time you see something unexpected, especially in those earlier bronze tables, you’re never going to process anything. And that can really hurt your agility. And so the cool thing about expectations is we actually have a notion of tuneable severity. So we do support this kind of fail stop, which you might want to use on a table that your finance department is consuming ’cause you don’t want them to ever see anything that is incorrect. But we also have these kinds of weaker things where you can just monitor how many records are valid and how many are failing to parse and alert at some threshold. Or even more powerful, we have this notion of data quarantining where you can say any record that doesn’t meet my expectations, don’t fail the pipeline, but also don’t let it go through. Just quarantine it over here in another table, so I can come and look at it later and decide what I need to do to kind of remediate that situation. So this allows you to continue processing, but without kind of corrupting downstream results with this invalid record. So like I said, this is a feature that we’re actively working on now. Stay tuned to GitHub for more work on it. But I think this kind of fundamentally changes the way that you think about data quality with Apache Spark and with your data lake.

So now that I’ve been over the high level, what is Delta, why should you care about it? I want to go into the nitty gritty details of how Delta actually works. ‘Cause it sounds almost too good to be true that we can bring these full ACID transactions into a distributed system like Apache Spark and still maintain good performance.

Delta On Disk

So first of all, let’s start by looking at what a Delta table looks like when it’s actually stored out on disk. So it’s gonna look, to those of you that have a data lake already, this should look really familiar. It’s just a directory stored in your file system, S3, HDFS, Azure Blob storage, ADLS. It’s just a directory with a bunch of parquet files in it. And there’s one extra bit that is very important, and that is that we also store this transaction log. And inside of the transaction log, there are different table versions. So, and I’ll talk a little bit kind of about those table versions in a moment, but we still store the data in partition directories. However, that’s actually mostly for debugging. They’re kind of also modes of Delta where we can work directly with storage systems in the most optimal way. So for example, on S3, they recommend if you’re going to be writing a lot of data out regularly, that rather than create date partitions, which create kind of hotspots of temporal locality, instead you kind of randomly hash partition, and kind of because of the power of Delta’s metadata, we can do that as well. And then finally, standard data files, which are just normal and coded parquet that can be read by any system out there.

Table = result of a set of actions

So what is actually in those table versions? How do we reason about what the current state of a table is? So each one of those table versions has a set of actions that apply to the table and change it in some way. And the current state of a table, at this moment, is the result of the sum of all of those actions. So what kind of actions am I talking about? Well, for one example, we can change the metadata. So we can say, this is the name of the table. This is the schema of the table. You can add a column to the table or something. You can set the partitioning of the table. So one action you can take is change the metadata. The other actions are add a file and remove a file. So we write out a parquet file, and then to actually make it visible in the table, it needs to also be added to the transaction log. And I’ll talk about why that kind of extra level of indirection is a really powerful trick in a moment. And another kind of detail here is when we add files into Delta, we can keep a lot of optional statistics about them. So in some versions we can actually keep the min and max value for every column, which we can use to do data skipping or quickly compute aggregate values over the table. And then finally you can also remove data from the table by removing the file. And again, this is kind of a lazy operation. This level of indirection is really powerful. When we remove a file from the table, we don’t necessarily delete that data immediately, allowing us to do other cool things like time travel. And so the result here of taking all these things is you end up with the current metadata, a list of files, and then also some details, like a list of transactions that have committed, the protocol version for that.

Implementing Atomicity

So how does this allow us to get ACID? To actually kind of get these nice properties of transactional databases? So one detail here is when we’re creating these table versions, we store them as ordered atomic units called commits. So I talked about this before. We create version zero of the table by creating this file, 0.json. And the idea here is when Delta constructs that file on the file system, we will use underlying atomic primitives. So on S3, in order to guarantee atomicity all you need to do is upload to the system. And the way they do this is you start your upload by saying, I expect to upload this many bytes. And unless you actually successfully upload that many bytes, S3 won’t accept the write. So you’re guaranteed that you’ll either get the whole file or none of the file. On other systems like Azure or HDFS, what we’ll do is we’ll create a temporary file with the whole contents and then we’ll do an atomic rename, so that the entire file is created or not. So then you can kind of have successive versions. So version one, we added these two files or sorry, in version zero, we added these two files. In version one, we removed them and put in a third. So for example, you could be doing compaction here where you atomically take those two files and compact them into one larger file.

Ensuring Serializablity

Now, another kind of important detail here is we want atomicity for each of these commits, but we also want serializability. We want everybody to agree on the order of changes to this table, so that we can correctly do things like merge into for change data capture and kind of other things that require this property. And so in order to agree on these changes even when there’s multiple writers, we need this property called mutual exclusion. If two people try to create the same version of a Delta table, only one of them can succeed. So just to kind of make this a little bit more clear, user one could write version zero of the table, user two could write version one, but if they both try to write version two, then one of them can succeed. But the other one must get an error message saying, sorry, your transaction didn’t go through.

Solving Conflicts Optimistically

And now you’re probably saying, wait a second, but if anytime two people do something at once it fails. That sounds like I’m wasting a lot of time and a lot of work. That sounds like a lot of complexity for me. And fortunately, this is where we use a third kind of cool trick called optimistic concurrency. And the idea of optimistic concurrency is when you perform an operation on the table, you’re just going to optimistically assume that it’s going to work. And if you have a conflict, you’ll just check to see if that conflict matters to you. And if it doesn’t, you’re allowed to optimistically try again. And in most cases, it actually turns out that the transactions are not overlapping and you’re allowed to kind of automatically remediate this. So to give you a kind of a concrete example here, let’s say we have two users and both of these users are streaming into the same table. So when both of them begin their streaming write, they start by reading the version of the table at that moment. They both read in version zero. They read in the schema of the table. So they make sure that the data that they’re appending has the correct format. And then they write some data files out for the contents of the stream that are gonna be recorded in this batch. And they record what was read and what was written from the table. Now they both try to commit, and in this case, user one wins the race and user two loses. But what user two will do is they’ll check to see if anything has changed. And because the only thing they read about the schema, of the table with the schema and the schema has not changed, they’re allowed to automatically try again. And this is all kind of hidden from you as the developer. This all happens automatically under the covers. So they’ll both try to commit, and they’ll both succeed.

Handling Massive Metadata

Now, the final trick that we have here is tables can have massive amounts of metadata. And those of you who have tried to put millions of partitions into the Hive Metastore are probably familiar with this problem. It can actually, once you get to those data sizes, the metadata themselves can actually be the thing that brings the system down. And so we have a trick for this, which is actually, we’ve already got a distributed processing system capable of handling massive amounts of data. We’ll just use Spark. So we take the transaction log with its set of actions. We read it in with Spark. We can actually encode it as a checkpoint in parquet. A checkpoint is basically the entire state of a table at some version. So when you’re reading the transaction log, rather than have to read the entire transaction log, you can just start with the checkpoint and then any subsequent changes that happened after that. And then this itself can be processed with Spark. So when you come to a massive table that has millions of files, and you ask the question like, “How many records were added yesterday?” What we’ll do is we’ll run two different Spark jobs. The first one queries the metadata and says, “Which files are relevant to yesterday?” And it’ll get back that list of files, and then you’ll run another Spark job that actually processes them and does the count. And by doing this in two phases, we can drastically reduce the amount of data that needs to be processed. We’ll only look at the files that are relevant to the query, and we’ll use Spark to do that filtering.

Road Map

So before we end and go to questions, I wanna talk a little bit about the roadmap. Like I said before, while this project has been out there for a couple of years now, it’s just recently been open source. So we have a pretty exciting roadmap for the rest of the year. Basically our goal is for the open source Delta Lake project to be fully API compatible with what’s available inside of Databricks, and so our roadmap for the rest of the quarter is basically open sourcing a lot of cool features that we have. So we actually, a couple of weeks ago released version 0.2.0 that added support for reading from S3 and also reading from Azure Blob Store and Azure Data Lake. And then this month, we are planning to do a 0.3.0 release. That is going to add Scala APIs for UPDATE, DELETE, MERGE, and VACUUM, and Python APIs will be following shortly. And then for the rest of this quarter, we have a couple of things kind of on our plan. We want to add full DDL support, so that’s create table and alter table. And we also want to give you the ability to store Delta tables in the Hive Metastore, which I think is very important for data discovery in different organizations. And we want to take those DML commands from before, UPDATE, DELETE, and MERGE, and actually hook them into the Spark SQL parser, so you can use standard SQL to do those operations as well. And then moving forward kind of, let us know what you want. So if you’re interested in doing more, I recommend you to check out our website at delta.io, and it has kind of a high level overview of the project. There’s a quick start guide on how you can get started, and it also has links to GitHub where you can watch the progress and see what our roadmap is, and submit your own issues on where you think the project should be going. So I definitely encourage you to do that, but with that, I think we’ll move over to questions. So let me just pull those up and see what we got.

Okay. So the first question is will the material and recording be available afterwards? And for that, I’m hoping Denny can actually let us know. Denny, are you here? – [Denny] I am, no problem at all. So yes, just as a quick call-out, for everybody who signed up for this webinar, we will actually send out both the slides and also the recording out. It takes about 12 to 24 hours for the process to complete. So you should be receiving the email either later today or early tomorrow.

– [Michael] Awesome, thank you very much. So yeah, all that should be there, so you can check this out later. There’s also videos on YouTube. So do stay tuned for more stuff about Delta Lake. Moving on to other questions. The first one is, does Delta Lake add any performance overhead? which is a really interesting question. I want to kind of break that down. So first of all, Delta Lake is designed to be kind of a high throughput system. So each individual operation, there is a little bit of overhead in performing it. So you’d basically because rather than just write out the files, we need to write out the files and also write out the transaction log. So that adds a couple of seconds to your Spark job. Now, the important thing here is we designed Delta to be massively parallel and very high throughput. So you get a couple of seconds added to your Spark job, but that is mostly independent of the size of your Spark job. So what Delta Lake is really, really good at is ingesting trillions of records of data or petabytes of data or gigabytes of data. What Delta is not good at is inserting individual records. If you run one Spark job, one record per Spark job, there’ll be a lot of overhead. So kind of the trick here is you want to use Delta in the places where Spark makes the most sense, which are relatively large jobs spread out across lots of machines. And in those cases, the overhead is negligible.

The next question is, since it has ACID properties, will my system be highly available as well? And that’s actually a really good question I want to unpack a little bit. So Delta again, it’s designed specifically to take advantage of the cloud and to do the kind of, take advantage of these nice properties. So to me, there’s a couple of nice properties of the cloud. One is the cloud is kind of very stable. You can put tons of data into S3, and it kind of just handles it arbitrarily. It’s generally pretty highly available. So you can kind of always read data from S3, no matter where you are. If you really, really care, there’s even things like replication, where you can kind of replicate your data to multiple regions, and Delta plays very nicely with that. So reading from a Delta table should be very highly available, ’cause it’s really just the availability of that underlying storage system. Now, those of you who are familiar with the CAP theorem might be saying, “But wait a second.” So for writes, when we think about consistency, availability, and partition tolerance, Delta chooses consistency. So we will, if you cannot talk to kind of the central coordinator, depending on whether you’re on S3, that might be kind of your own service that you’re running on Azure. They’ve taken kind of the consistency approach (indistinct) we use an atomic operation there. The system will pause. But the nice thing here is because of that kind of optimistic concurrency mechanism, that doesn’t necessarily mean you lose that whole job that you might’ve been running for hours. It just means you’ll have to wait until you’re able to talk to that service. So I would say in terms of reads, very highly available, in terms of writes, we choose consistency, but in general, that actually still works out pretty well.

The next thing was you keep all levels of data. Well, I think I want to kind of clarify the idea behind that bronze, silver, gold. Not everybody keeps the raw data around. Not everybody keeps all of the data. You might have a retention requirements that say you’re only allowed to keep two years of data. So really, I think it’s kind of up to you to decide what data makes sense to hold on to. The only thing I would say is I think that the nice thing about data lakes and kind of how Delta applies to them in general is you are empowered to hold on to the raw data and as much of it as you want. And so, there are no technical limitations allowing you to keep all of the data, and as a result, many organizations that I work with do actually keep everything that they are legally allowed to keep for a very long time. And only remove it when they have to get rid of it.

The next question is what do you write that logic in? Are we able to write logic in Scala? So Delta Lake plugs into all of the existing APIs, so Apache Spark, and that means you can kind of use any of those. So if you’re a Scala programmer, you can use Scala. If you are a Java programmer, that works as well. We all also have bindings in Python, and if you’re kind of an analyst and you don’t want to program at all, we also support pure SQL. So really kinda our idea here is the underlying engine is written in Scala and Delta is also written in Scala, but your logic can be written in whatever language you’re comfortable with. This is another case where I think you need the right tool for the right job. So personally, I do a lot of my stuff in Scala, but when I need to make graphs, I switched over to Python and use that platform. But still Delta gives me the ability to kind of filter through massive amounts of data, shrink it down to something that will fit into Pandas, and then I do some graphing with it.

So the next question is, is Presto part of Delta Lake or is it all only Spark? And that’s a great question. That’s actually something that’s evolving pretty quickly right now. So there’s a couple of different answers to this. So I’ll tell you both where we’re at and where we’re going. So right now, there’s a feature inside of Databricks that we’re working on open sourcing, which allows you to have writers for Delta, write out these things called manifest files that allow you to query a Delta table in a consistent way from Presto or Athena or kind of any of these other Presto based systems. However, we’re working deeply with Starburst, one of the companies behind Presto, to build a native connector for Presto. We’ve also got active interests from the Hive community and the Scalding community, so there’s a bunch of interest in building connectors. So today, the core of Delta is built in Spark, but I think the really powerful thing about open source and open standards is that means anybody can integrate with it. And also, the project we’re committed to growing that ecosystem and working with anybody. So if you’re a committer on one of those projects, please join our mailing list, join our Slack channel, check it out, and let us know how we can help you build these additional connectors.

Next question, can we experiment with Delta Lake in the community edition of Databricks? Yes, you can. Delta Lake is available in community edition, check it out. Everything should be there. Let us know what you think.

The next question is, can Delta tables be queried with Hive? Yeah, so basically same answer to Presto. There’s active interest in the community to building this support. It’s not available today, but that’s definitely something that we’d like to build. Next question, how does Delta Lake handle slowly changing dimensions going from raw to gold?

Yeah, so well, that’s a good question, and there’s actually a blog post on databricks.com. If you Google slowly changing dimensions, Delta, it walks you through all of the details, but I think really the right answer here is with the merge operator and plus the powers of Spark, it’s actually pretty easy to build all of the different types of slowly changing dimensions. And that the magic thing that Delta is adding on top of Spark that enables this is those transactions. Modifying a table in place would be incredibly dangerous without transactions and Delta makes that possible, and therefore kind of enables this type of use case.

Next one was, we usually deal with Azure. We’d like to know whether Delta Lake has any different behavior when it’s running on Azure Event Hub instead of Kafka. And yeah, I’m gonna answer this question a little bit more generally. So I think, I talked about one of the powerful things about Delta being its integration with Spark. And one of the big reasons there is I kind of use Spark as the skinny waste of the big data ecosystem. There are Spark connectors for almost every big data system in the world. And so if Spark can read from it, it works with Delta Lake. And so Event Hub, in particular, has both a native connector that plugs in through Spark data sources and also has a Kafka API that works with Sparks Kafka. So you can very easily read from Event Hub and kind of do all the stuff I talked about today using Event Hub instead of Kafka. And really that applies to any system that Spark could read from.

And just in general to kinda answer Azure a little bit more, Delta fully supported on Azure, including ADLS. We just recently kind of improved our support for ADLS, gen two. So it’s available both for you to download, and it’s also part of the Azure Databricks kind of out of the box.

And yeah, so the next question is what exactly is the Scala API for the DML commands like updates? And the answer was, does it look like the kind of Spark SQL, where you do Spark SQL and you pass in a string that does that update? And the answer is we’re actually gonna support both. So if you actually go to the GitHub repository, I believe this code has already been merged. So you can see the Scala API, if not, there’s a design doc that talks about the details there on the ticket for adding an update. But the idea here is there will both be a Scala function that’s called Update, that you can kind of use programmatically without having to do string interpolation, and there is also kind of a SQL way to do it. So you’ll be able to kind of create a SQL string and pass that in. So again, this is like, you use the language that you are most comfortable with that is already part of your toolkit, and Delta should work with that kind of automatically.

Next question was, does Delta Lake work with HDFS? Yes, it fully works with HDFS. HDFS has all of the primitives that we need, so you don’t need any kind of extra details, and what I’m talking about there is HDFS has support for an atomic rename that fails if the destination already exists. So as long as you’re running a new enough version of HDFS, which is, it’s not even that new, that should work automatically. And if you check out the Getting Started guide in the Delta docs at delta.io, it has all the different storage systems that we support and details for what you need to do to set that up.

Next question, are update, delete at single row or record level? And there’s kinda two answers to this. So yes, Delta does allow you to do kind of fine-grained, individual row updates. So you don’t necessarily have to do your updates or your deletes at the partition level. If you do them at the partition level, they’re significant. If you do deletes, for example, at the partition level, those are significantly more efficient because we can just drop the metadata. We don’t actually have to do any manually rewriting. But if they’re not at the partition level, if you’re doing a fine-grained single row update or delete, what we’ll do is we’ll actually find the relevant parquet files, rewrite them, commit the adds and deletes to make that operation happen, and then that’s kind of the transaction that does it. So it does support it, but it does involve rewriting individual files. So what I’ll say here is, Delta’s definitely not designed to be an OLTP system. You should not use it if you have lots of individual row updates, but we do support that fine granularity use case.

Do you know exactly when the Scala APIs for Delta Lake will be available? Well, so there’s a couple of answers to that. So Delta Lake reading and writing and streaming and batch already work in Scala. That is available today. If you’re talking specifically about update, delete, and merge, I believe most of that code has already been, already put into the repository. So if you download it and build it yourself, it’s there. We are hoping to make the release in July. So hopefully this month, there’ll be the next release that contains, those extra Scala APIs.

Let’s see.

Yeah, so the next question was about data quality. Can we have any other fields for validation purpose apart from timestamp? Yes, so the expectations that we talked about before are just general SQL expressions. So any expectation that you can encode in SQL is allowed. So it could be, in that example, it was a very kind of simple comparison operation with some specific date, but it can be anything you want. It could even be a UDF that is checking the quality of the data. So really the important thing here is that we just allow you to put those in as properties of your data flow, rather than as manual validations that you need to remember to do on your own. So that kind of enforces that globally across anybody that is using the system.

Does Delta Lake support merging from a data frame instead of a temporary table? Yeah, so once the Scala and Python APIs are available, then you can pass in a data frame. Today inside of Databricks, the only thing that is available is kind of SQL DML, and in that case, you do need to register it as a temporary table. But like I said, stay tuned for the end of the month. We’ll have a release that has our Scala APIs, and then you’ll be able to pass in a data frame yourself.

I’ve seen this question a couple of times, so I’ll just answer it one more time. We support both ADLS gen one and gen two, although gen two is going to be faster because we have some extra optimizations there.

The next one is in the checkpointing example, is the Spark job computing the Delta Lake checkpoint internal or required to be handwritten? That’s a great question. So when you’re using streaming to read from, or write to a Delta table or both, if you’re just using it in between two different Delta tables, the checkpointing is handled by Structured Streaming. So you don’t need to do any extra work to construct that checkpoint. That’s kind of built into the engine. The way Structured Streaming works in Spark is every source and every sync, there’s a contract that allows us to kind of do that checkpointing automatically. So the source needs to be able to say, I’m processing the data from here to here, and those notions of kind of where they are in the stream, we call them offsets, those need to be serializable. We just store those in the checkpoint. We basically use the checkpoint as a write ahead log. So we say, batch number 10 is going to be this data. Then we attempt to process batch number 10, then we write it to the sync, and the guarantee here is the sync must be idempotent. So it must only accept batch number 10 once, and if we try to write it twice due to a failure, it must reject that and kind of just skip over it. And by putting all of these kind of constraints together, you actually get exactly once processing with automatic checkpointing without you needing to do any extra work.

Great question here. Why not use polyglot persistence and use an RDBMS for storing asset transactions? That is a great question, and we actually tried this. In fact, one of the early versions of Delta used MySQL and the problem here is MySQL is a single machine, and so just getting the list of files out for a large table can actually become the bottleneck. Whereas when you store this metadata in a form that Spark itself can natively process, you can leverage Spark to do that processing. So there’s nothing stopping you from implementing the Delta kind of transaction protocol on top of a storage system. In fact, there’s a pretty long conversation on the GitHub repository right now, that’s kinda going back and forth about what it would take to build a foundation DB version of Delta, and that’s certainly possible, but in our initial scalability testing, we found that Spark was the fastest way to do this, at least out of the systems we tested, and that’s why we decided to do it that way.

Another question, does that mean we don’t need data frames and can do all transformations on Delta Lake instead? And I would say no. Well, I think you can only use update, delete, and merge without using any kind of actual data frame code. You can use pure SQL, but really, I think this is kind of right tool for the right job. Delta Lake does integrate deeply with Spark data frames. And personally, I find that to be a very powerful tool for doing transformations. It’s kinda like SQL plus plus ’cause you have all these relational concepts, but embedded in a full programming language. And that actually I think can be a very productive way to write your data pipeline.

How does Delta Lake manage newer versions of Spark? Yeah, so Delta Lake requires Spark 2.4.3, which is a pretty recent release. And that’s because there were actually bugs in earlier versions of Spark that prevented data sources from correctly plugging into it. So, but in general, we’re working on Spark compatibility. That’s actually one of our kind of core projects for this quarter is making sure that everything in Delta plugs into nice public stable APIs of Spark, so we can work with multiple versions in the future.

One more question, does Delta Lake support ORC? Yeah, that’s actually a really good question that I get quite a bit. Again, there’s a discussion on GitHub about adding the supports. I encourage you to go check that out and vote on that issue, if this is something that is important to you. And there’s kind of two answers to this. One is the Delta Lake transaction protocol. The thing that actually goes in the transaction log actually does support specifying the format of the data that is stored. So it actually can be used for any different file format, txt, JSON, CSV. So that is built into the protocol already. Today, we do not expose that as a choice. When you’re creating a Delta table, we only do parquet. And the reason for that is pretty simple. I just think less tuning knobs is generally better, but for something like ORC, if there’s a good reason why your organization can switch, I think that support would be really, really easy to add and that’s something that we’re discussing in the community. So please go over to GitHub, find that issue, and fill it in. And then I’m going to take one final question since we’re getting close to time. And the question here is, what is the difference between the Delta Lake that’s included with Databricks versus the open source version? And that’s a question I get a lot. And I think, the way to think about this is I’d like to kind of talk about what my philosophy is behind open source. And that is that I think APIs in general need to be open. So any program you can run correctly inside of Databricks should also work in open source. Now that’s not entirely true today because Delta Lake is only, the open source version of Delta Lake is only two months old. And so what we’re doing is we are working hard to open source all of the different APIs that exist. So update, delete, merge, history, all of those kinds of things that you can do inside of Databricks will also be available in the open source version. Managed Delta Lake is the version that we provide. It’s gonna be easier to set up. It’s gonna integrate with all of the other pieces of Databricks. So we do caching, we have a kind of significantly faster version of Spark, and so that runs much faster, but in terms of capabilities, our goal is for there to be kind of complete feature parity here ’cause we’re kinda committed to making this open source project successful. I think open APIs is the correct way to do that. So with that, I think we’ll end it. Thank you very much for joining me today. And please check out the website, join the mailing list…

Advanced: Diving Into Delta Lake

Dive through the internals of Delta Lake, a popular open source technology enabling ACID transactions, time travel, schema enforcement and more on top of your data lakes.

動画を見る