Delta Lake を開始する

Simplify and Scale Data Engineering Pipelines

Denny Lee. Developer Advocate at Databricks
Denny Lee is a Developer Advocate at Databricks. He is a hands-on distributed systems and data sciences engineer with extensive experience developing internet-scale infrastructure, data platforms, and predictive analytics systems for both on-premise and cloud environments. He also has a Masters of Biomedical Informatics from Oregon Health and Sciences University and has architected and implemented powerful data solutions for enterprise Healthcare customers.

Series Details

This session is part of the Getting Started with Delta Lake series with Denny Lee and the Delta Lake team.

Session Abstract

A common data engineering pipeline architecture uses tables that correspond to different quality levels, progressively adding structure to the data: data ingestion (“Bronze” tables), transformation/feature engineering (“Silver” tables), and machine learning training or prediction (“Gold” tables). Combined, we refer to these tables as a “multi-hop” architecture. It allows data engineers to build a pipeline that begins with raw data as a “single source of truth” from which everything flows. In this session, we will show how to build a scalable data engineering data pipeline using Delta Lake.

Delta Lake is an open-source storage layer that brings reliability to data lakes. Delta Lake offers ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. It runs on top of your existing data lake and is fully compatible with Apache Spark APIs.

In this session you will learn about:

  • The data engineering pipeline architecture
  • Data engineering pipeline scenarios
  • Data engineering pipeline best practices
  • How Delta Lake enhances data engineering pipelines
  • The ease of adopting Delta Lake for building your data engineering pipelines

What you need:
Sign up for Community Edition here and access the workshop presentation materials and sample notebooks.

Video Transcript

– [Instructor] Hi everybody. Welcome to Simplify and Scale Data Engineering Pipelines with Delta Lake. My name is Denny Lee, I’m a Developer Advocate here at Databricks.

Subscribe Today!

I want to let you know that we’re also livestreaming this on our Databricks YouTube channel. So you can go to dbricks.co/youtube here. So if you want to go listen to this livestream if you have to jump off a little bit early or if you want to listen to a little bit later on today. This and other of these Delta Lake online tech talks are gonna be available at the Databricks YouTube channel.

Today’s Speakers

Well, allow me to introduce myself a little bit before we dive into it. My name is Denny Lee, as noted. I’m a Developer Advocate at Databricks. I’m a hands-on distributed systems and data science engineer with experience in internet-scale infrastructure, data platforms, and predictive analytics systems. I used to work at Microsoft and helped build what is currently known as HG Insights, worked with SQL Server customers, and now I’ve been working with Apache Spark since 2005. So gives you a little background in my data engineering experience. Some quick logistics. This recording and the slides will be available after this tech talk, after this webinar. As noted, it’ll be eventually also posted onto YouTube, so the Databricks YouTube channel. Since everybody’s muted, please put your questions in the Q&A panel, not in the chat panel. I’ll be looking for questions inside there. And we will also provide a link to anybody that logged in who saved a spot through Zoom or through the YouTube channel. So that will give you all the information you need. This does include, by the way, any of the notebooks that we’re using for this demo. It’s actually included in this presentation, the link, as well, we’re going to provide it inside the YouTube channel, okay?

So, let’s get started.

The Data Engineer’s Journey…

If some of you have been attending some of the previous sessions, we did talk a little bit about the data engineering journey, or the data engineering pipeline.

I’m going to do just a quick call-out just to provide people with context, but if you want to dive deeper into it we had last week’s Delta architecture introducing, basically, going beyond Lambda architecture, introducing the Delta architecture. And the previous week, we also had getting data ready for data science with Delta Lake and MLFlow, where we discussed a little bit about this. But the quick context here is, if you look at this particular diagram on the left side you see the event, it’s going into Apache Kafka or some other streaming mechanism, like whether that’s Kinesis or Azure Event Hubs or Cosmos DB, irrelevant of which approach, typically you have to build a Lambda architecture because Lambda architecture allows you to deal with real-time processing in the top part of it and batch processing at the bottom. The top is in Apache Spark or Structure Stream, that’s processing the data from Kafka, and then pushing this data into this unified view to the right of the database. So that’s what goes for AI reporting. But then that’s for what you’re constantly streaming per se. Then there’s also the batch data, which is to your left here. And the left data basically streams down and gets written continuously into some table that you can do normal batch processing against. To validate this data you’re gonna need to actually build a unified view that does the validation between the streams to the top and the stream to the left between the processing you’re doing in the stream, structure stream, and the processing you’re doing ultimately that goes into the tables so that way there’s a reconciliation process to ensure that the data’s actually the same.

Once you get your data written to this table, as it’s continuously written there, you’re gonna run batch processing. This batch processing allows you to reprocess data into yet another table, and it’ll get compacted every hour because the size of your files, you have a lot of small files that are generated from the streams, so we’re gonna go ahead and compact those files together so that that way we can ensure better performance. When you basically have too many small files in any distribute system, basically it’s gonna slow down the performance of the system.

It’s here where you’ll be able to do updates and merges against that data, and then also if there’s any late arriving data, you’re able to go ahead and reprocess it at this bottom table that’s right in the middle. Yet another Spark batch process will then take that data from the table, place it into this unified view, so that way you can do your AI reporting. That’s what traditionally a Lambda architecture would look like. And the usual question you want to ask is, can this be simplified? And so, this is basically what the data engineer’s dream is, right?

A Data Engineer’s Dream..

You want to be able to process data continuously and incrementally as the new data arrives in a cost-efficient way without having to choose between batch or streaming. When we created Spark streaming, or structured streaming, those few years back, the whole premise of us creating data frames, when we called it things like static data frames versus dynamic data frames, was that you didn’t actually have to think of a difference between how to deal with low-latency data and batch data. In other words, you would treat and run the queries exactly the same whether you were running against a streaming data set or running against a batch data set. So that’s what you really want to do, so that way it’s the same APIs or the same SQL syntax that you’re applying whether it is a streaming data set or a batch data set. Now, it would significantly simplify not just the maintenance of the code, but just as important, the type of thinking that’s involved that you mentally don’t actually have to change your thought process just because you’re doing things for batch or streaming. And so that dream, then, basically is on the left side, your Kafka, Kinesis, Event Hub, your Data Lake, doesn’t matter if it’s a streaming source or a batch source, it doesn’t matter, Spark takes it, props it into some store, and this is where Delta Lake gets involved, just a little hint there, and then Spark processes it again, and then you have your AI reporting. So, that’s what you want to do.

What’s missing?

What’s missing though in order to be able to do this? Right now, the things that Spark is pretty close to solving all of these problems, but you’re going like, but there’s a bunch of things that are missing to be able to actually achieve this goal. So let’s talk about the issues here. The first one’s the ability to read consistent data while data is being written. As noted in some of our previous sessions, we were really talking about the concept of ACID transactions. ACID, back to this concept is atomicity, consistency, isolation, and durability. That’s what ACID stands for, and this concept of transactions is that you want to be able to trust the idea that when data is written to the store or to the disk or whatever it is being written to you are sure that it actually happened, and there is no chance of corruption. If you look at traditional distributed systems when you write them to disk, whether it’s as their blob storage or ADLS or Google storage or S3, it’s the same concept. These are these are BASSE in comparison to ACID, so there’s a chemistry joke inside here, but the concept of BASSE here is actually basically available soft state eventually consistent, and the most important aspect here is that eventually consistent and what that means is that, you know, traditionally when we have these cloud storage or even with the Hadoop system, by default there’s three copies of the data written to disk or into storage. So the idea is that potentially client one hits node 1 and client two hits node 2 of this, basically, the three copies of the data. Well eventually consistent in this environment basically means that I’ve written the data to node one but I haven’t written to note two yet because there’s some delay, but it’s possible that client one, client two hit node one, node two at exactly the same time. So node for client one that hits node one, they’ll see the data. For client two it hits node two, they don’t see the data. All right, this is this concept of eventual consistency. You’re not consistent in whether the data exists or not. Well this is important, this concept of consistency is extremely important if you want to be able to look at streaming and batch data concurrently. Because if you’re streaming data and you’re constantly writing to disk extremely quickly with all these small files, right, the distribute storage that you’re writing to, whether it’s on Prem Hadoop, whether it’s your own environment, whether it’s cloud storage, doesn’t really matter. There’s multiple copies of that data sitting somewhere else, and if, again back to the example I use, client one is hitting node one and client two is hitting node two, and client one sees that data and client two does not see the data, those clients are about to perform some action against the data in which you’ve got inconsistent data. So for example, if I’m gonna do an update or delete or something else, client one sees it, client two does not see it, their actions are gonna be quite different or the results of those actions are going to be quite different. So this ability to have consistent data is extremely important.

The second call-out, the ability to read incrementally from large tables with good throughput. That’s a super important concept, right? The larger the table, the more resources it takes. You want to actually have some mechanism to have consistent processing that data. You want to have that ability to roll back in case of bad writes, you know. For example, I write the data to attempt to write the disk. There are let’s just say five tasks that have to write as part of this one job. The fourth task, the fourth and fifth task fails.

What happens normally in this case? Well the other three, and possibly the the fifth one, will write to disk and the fourth one will roll back and fail. Well what happens then, that means four of the five tasks have written to storage while the fourth one, like one of the four, one of the five, ends up actually not being able to write it. Okay, so it fails writing, so you get the error except now your data is left in a bad state. So in other words, you’re not sure what change has just happened to your data, and that’s already bad enough if you’re just inserting data, because if you’re just inserting data maybe you could just say, I failed a checkpoint or I failed a queue. I know that I ran this by this batch ID or at this point in time, just roll everything back, delete everything manually myself. And that’s cool, but it gets a little bit more complicated, especially, firstly it’s already a pain just to do all that maintenance, but even if you had no problem doing it running the updates, or running the deletes against that data, once you’ve done that and somewhere halfway through the system it breaks, how do I rollback that update? How do I rollback that delete? So you want that ability to rollback so in case there is an error, whether it’s error the disk level or error in the source system or the business logs. Doesn’t matter where the error is, but in case there’s a bad write of some type, you’re able to rollback.

You also want to be able to replay historical data along with new data that arrives. So in other words, take all historical data and also, in other words, have a single table per se, that actually looks at both the new data that’s coming in as it’s coming and replay historical data at the exact same time. So again you, as the data engineer, are looking at data from the standpoint of, here’s all of my data at the point the query, whether it’s brand new or old as opposed to, let me take the new data and let me take the old data, let me go merge them together, let me go ahead and write a query with that. And just as important with all those concepts that we’re talking about, the ability to handle late-arriving data without have actually having to delay downstream processing.

So. What is the answer?

So what’s the answer to this? The answer really basically is combining this concept of structured streaming and Delta Lake together in order to create this Delta architecture. We talked a lot about this last week, but as we noted this session is about simplify and scale data engineering pipeline, so I just want to provide this real quick context first before we go into how do we do that. But the real call-out here is basically unified batch and streaming with a continuous data flow, infinite retention to replay and reprocess historical events as they’re needed, and you can actually have independent elastic compute and storage. They can skill independently from each other, so that way you can balance costs. That’s actually how we want to do this. So let’s try this concept all with Delta Lake instead.

The A

So what we implied with the data engineering journey is basically broken out in this concept of bronze, silver, gold data quality levels. So when you look at the data engineering pipeline, we actually showed like one database icon, but really it’s broken down into different data quality levels. Now, some people gonna ask the question, do I actually need to have three separate tables? Are these three physical implementations? And the quick answer is that in some cases, yes, in some cases, no. It depends on the cleanliness of your source data. If you had already processed and cleansed data upstream, you probably don’t need these three levels. You probably can just go right to gold, but if you’re taking the raw data source where you do need to ingest and keep the data because the store, the REST API that you’re calling from is actually non-persistent. In other words, it only holds the data for a short time. Like, Kafka will, let’s say you set up your Kafka, Kinesis, or Event Hubs to only hold data for about a day or for a few hours, you need somewhere to have that data sit. So that’s what this concept of a bronze table is. It’s the raw ingestion. Make sure you have a source where that data resides. Then you go to this concept of silver. Now that you ensure that you’ve written the data to storage, to disk, let me go ahead and filter it. I don’t need all these login calls, or these IP traces or whatever else. Clean it, so in other words there’s data that actually has, with simple business logic I have to remove this description or I have to change this ID or I want to filter out all from this particular geo region. This cleansing concept, so that’s what silver data does Oh, and also augment it. So in other words if I have other sources of data that I want to join with this data, this is where you would normally do that. And then finally you’re at the gold level. The gold level is basically, now that I’ve filtered, cleansed it, and augmented it, I also will possibly aggregate it as well. So in other words I don’t need to look at the eight million transactions that just happened per hour. I can look at them by minute, as opposed to PI second, using that as an example. Because I only need minute latency for the purpose of my reporting. So if I only need minute latency for the purposes of reporting, I can shrink the minutes down to hours, yeah actually, the other way around. I only need hour reporting as opposed to minute reporting. And so, irrelevant of the whatever business logic requires, the idea is that this goal table is smaller, more compact, has exactly what you need for your streaming analytics or your AI reporting to go ahead and piggyback off of. And so this concept of basically these data quality levels for the Delta architecture, which is what you see in terms of these data quality levels, it allows you to incrementally improve the quality of your data until it is ready for consumption. That’s the important call-out.

So if you’re a data engineer, I would love to be able to, online, be able to just ask you, raise your hands, how many of you are data engineers and how many of you are enterprise data warehousing folks?

The Data Lifecycle

What does it remind you of? Well this concept reminds you of basically, okay, your standard data lifecycle. The bronze table is more like your data lake, you just dumped all the data in, and you’re you’re good to go. Then you build a staging database, or staging table, which basically allows you to go ahead filter, cleans, and augments the data. Sorry, and then you build a data mark that actually has those business level aggregates. So this concept that we’re talking about is very similar, very close to this idea of a data lifecycle. It’s just that with the Delta architecture it’s not just a single table in a single database.

The ideas that we’re talking about different tables applied in a distribute manner in a distributed system. So that’s what the architecture is about, and it also allows you to handle both streaming and batch data concurrently. All right, so how do we transition from this traditional data lifecycle to the Delta Lake lifecycle?

The A

So like we, said raw ingestion, the bronze is this idea of dumping ground of raw data. You often will have a very long retention, errors, that often is in the years. You want to avoid error prone parsing, so in other words, it’s really about just storing that data. That’s really this concept of the bronze.

As we know with this silver, you’ve got intermediate data with some cleanup applied. It’s query-able for easy debuggings, so for example if you are a data scientist and you want to go ahead and, not necessarily work with the business level aggregates, but you want to go one level deeper or more detail, you can potentially just run your analysis on the silver data. This is also common for debugging. So for example, I’m just gonna use this as an example, I’m an airline, I have data, and I want to be able to debug or provide customer support for somebody who logged into my website to order a ticket. This is that filtered and cleansed augmented data. Usually there’s enough information inside there so you could debug your way out, and it’s all inside here.

And then once you’re good to go, you’ve got business level aggregates, you’ve got clean data consumption, it’s now ready to read with Spark or Presto. Actually, the stars there, I should remove the star, but the call-out is that as of Delta Lake 0.5.0, we’ve included the ability to create manifest files and with those manifest files, both Athena and Presto actually are able to go ahead and read a Delta Lake table as well. So the idea is that Delta Lake is not just for Spark. It certainly started with Spark, and I’m gonna show you an example using it. But other systems are absolutely able to make use of Delta Lake because they’re able to read the manifest file.

All right, and then as we’re sort of noting here, the streams move data through the Delta Lake whether it’s low latency or manually triggered, it eliminates the management of schedules and jobs. So I forgot if I included this particular slide in this deck here, but one of the cool examples I like talking about which was covered in Spark and AI summit in San Francisco last year, was Comcast, and Comcast went ahead and dropped from 80-so jobs down to three because even though they were doing a sessionization process that did not necessarily require them to have everything streaming, because they were able to run low latency streaming jobs, they were able to replace all the 80-plus plus batch jobs and shrink it down to three jobs. The combination of ACID transactions at Delta Lake and they’ll be able to to look at the problem, whether it was streaming or batch in the same manner, allow them to decrease the complexity of their job structure and they’re able to now maintain that system in a much easier way. So this is an important aspect. Even if you cannot necessarily take advantage of the streaming, per se, because you don’t have a streaming job in a traditional sense, you can certainly break it down into small micro batches, so that way you have a job that’s consistently running to reduce the complexity of what you’re building. Now Delta Lake, in order to be able to provide you all this capacity, now it basically revives your DML, the ability to do insert, updates, deletes, merges, overrides. Now, you can certainly do inserts and gold and do overrides in bronze, so this isn’t necessarily meant as a catch-all, it’s just simply stating that traditionally in a bronze level you’re either inserting, updating traditionally, in silver you’re deleting data so you can shrink it down, and traditionally in gold at this point, you’re merging or overriding data. But those DMLs, those data manipulation language, those statements can actually be applied all throughout. It allows you to do retention, allows you to do corrections, and it also even allows for GDPR, general data privacy regulations. And this is actually an important concept within newer systems where if you’re holding a lot of data CCPA, GDPR, these concepts of GRC or governance risk and compliance, risk management and compliance, it’s an important aspect of how to ensure your data is not just safe but you ensure the privacy of the individual behind that data. And so that’s an important aspect which we actually are going to cover next week, by the way. So in next week’s session of tech talk, we are going to be talking about how to address GDPR or CCPA by utilizing Delta Lake. So please do join us next week for that session, as well. So a little shout out to that session. But with Delta Lake the concept is that, you know, I can run, if I need to recompute, if the business logic changes, I can clear the silver, I can clear the gold, I could just run deletes, and then I could restart the streams or reprocess the data. I could scale the environment or scale the systems to go ahead and process more data and then you’re good to go. And so now, let’s talk about demos. The remaining 20 minutes or so, 25 minutes, are purely demo. So now that you’ve been patient enough to let me give you some of the context, let’s go right into the demo part. Okay, all right.

So like I noted, this notebook actually is available for you to download and use. I’m actually only going to be using a small portion of it, but just to sort of give you some context about how to build these scalable pipelines. So I’m going to stick to just streaming and processing. That’s all I’m actually gonna do. So this notebook is currently running in Databricks Community Edition, and because we’re doing it here, there we go, that’s better, actually the data set’s actually in Databricks data sets, but you can actually run this yourself as in a Jupiter notebook if you wanted to, and the data set actually, we include a link here on where you get the data. So you can certainly go run this data on your own environment. You do not actually have to run this on Databricks, but if you do, you can run this on Databricks Community Edition and it’s free. All right, so right now what we have, I just want to start off, we’ve got some data and it has a schema like this. So you have a load ID, a funded amount, a paid amount, and address state. This is loan data that we downloaded from Kaggle. Like I said, we included the link there. All right, and so how many records does it have? Right now this particular one has only 14,000 rows inside there. So it’ll be important to call this out later, but that’s a that’s a quick important call-out there. Okay, so I’m gonna go ahead and run this particular function here.

And refresh.

All right, there we go. Okay, I’m gonna run this function. Now I’m not gonna dive too deep into it, but the whole point here is that I’m actually gonna create this, generate and append data stream. The purpose of the stream is basically, I’m going to take data that was based off of the loan data that I have, but I’m gonna insert this into the same location but I’m gonna do it in Parquet. So, originally near the top here, I’m just gonna scroll back here just to give you some concept, I create this Parquet path. So basically I created this particular folder. So my table, this loans Parquet table, is based off of the data that’s residing inside here, this temp SAS EU 19 demo loans. So I’m gonna go ahead and run this generate function, which basically does that, generate and append data stream. It’s gonna put data into that Parquet location. All right, so now when I run it basically a structured streaming job is gonna kick off. So we’re gonna wait a few seconds for it to kick off.

And so we’re good, so we’re gonna initialize a stream, and what’s happening here is that the reason I’m showing this in Databricks Community Edition is that it has a cool little thing where I’m actually able to show you the input processing rate and batch duration in terms of, okay, we’re processing about 20.6 records per second, so we’re putting data in. So far, so good, all right. So let’s see if any data is being added into it. So I’m gonna go ahead and just run a quick count against that same Parquet path, or where that loans Parquet table is. So I’m just gonna run it directly off of here, and when I run this you’ll notice that there’s 170 rows inside here. Okay, that number seems a little off and I’ll talk to you about that in a second, but what happens if I try to run a second stream? And so this is an important aspect of, if I’m trying to go ahead and scale my data engineering pipelines, I’ve got more than one source kicking in.

(sneezes) Sorry, I hope you did not hear me sneeze. Apologies for that. So I have more than one source because it’s a distributed multiple REST APIs or distributed source or whatever, and so I want to run a second stream to go to that exact same table. What happens if I try to do that against a Parquet table?

So it’s gonna kick in, it’s gonna try to start writing to that same location. Except you what you’ll notice is that, sure the batch duration is almost a second, but there’s zero records per second. And so that is nothing’s going in. All right, so the second stream can’t write to the same location as the first stream. So, data is still going in from the first stream. It did jump up to 570 as you can tell from here, so the data is still going in as you’ll notice from this one but it’s not going in here. It’s just not happening. And why is that happening? Because ultimately what’s happening is that when you’re writing data from two disks, there is this concept of those ACID transactions, which every single write is protected and there’s a transaction around them protecting this information. What’s actually happening is in essence there’s a lock, if you want to think of traditional EDW, or in essence as a lock to the table it’s writing to disk, so the first stream is able to do it but the second stream just can’t do it. So what ends up happening more times than not, you end up creating multiple tables and then you have to merge those table together, I.e. increase the operational complexity of your system. But wait, remember how I started off? It said that there are 14,705 rows? And right now there’s certainly less than that. I mean, I’m streaming data in, so it’s probably a little higher now, so we’ll just run it, 870, but certainly not the 14,705 that we talked about. So if I look at the data and look at it, you’ll notice that in addition to loan ID, funded amount, paid amount, and address state, I also have timestamp and value inside here. Okay so the schema changed. Well that’s our problem here. So I’m gonna stop the streams as I explain this concept.

Well, what’s basically happening is that because we had the streaming query, one of them because a second wouldn’t work, write into the Parquet table because it’s a structured streaming job, automatically structured streaming jobs, we include the additional column of timestamp and value. So for example if I go back and look at the code here, so I’ll just open up here, my stream data is actually right here.

So I’m reading this data, I’m asking for the loan ID, funded amount, paid amount and address state. I did go ahead and do a write stream, which is basically, you’ll notice that a stream data dot write stream format table format, that was one of the input parameters which was Parquet. The option is… The checkpoint location it’s just a location so we can ensure that our stream runs correctly, so we can skip that for now. We do a dot trigger, which is every 10 seconds we’re gonna go process the data, I.e. write the data, and then basically it goes into that table path, that table path is the same Parquet path that we had listed originally. So that’s great. You’ll notice that I never actually specified timestamp and value, so it was automatically added as part of the streaming process. So because I actually automatically added these columns, this timestamp and value, what happened is that I have two different schemas. I’ve one schema which is the old one that only has four columns, and then we have a new schema with six columns, and so because I have six columns, in essence I basically overwrote my original table. So that’s why when I query the data, I’m only seeing the 850 or less than a thousand. I’m not seeing my original 14,000. So this is a problem of Parquet There’s no concept of schema enforcement to ensure that the data actually coming in is actually going to be the schema that’s already existing inside the table. And there there’s no interoperability, between not just back to streaming workloads, I can’t actually even have two streaming workloads write into the same table concurrently, which sort of sucks. So let me just restart this process over again. I’m just gonna go ahead and clear out the data, and I’m gonna republish the data, so I’m just gonna run that process.

And once this is finished, I’m gonna go ahead and run this step, with just a basic create this as a Delta table. So in other words here’s the original Parquet file. Sorry, Parquet file that we have here. I’m gonna go ahead and run this now, instead of the Parquet path, I’m gonna go ahead and do it as a Delta path. So I’m gonna go ahead and store this as a Delta table. That’s what this line is.

So as you can tell, it’s pretty easy to switch from Parquet to Delta. In other words, I read from Parquet and then I write with Delta. So exact same format, read dot format Parquet, write dot format Delta. If I want to read a Delta table and it’d be read dot format Delta. Pretty straightforward. So now I’ve created this Delta table. I also happened to created a temporary view as well, just in case. So let’s see the data one more time.

Okay, and we should be back to 14,705 as you see here, and we want to look at the schema real quick.

All right, so the schema is in fact four columns. So I’m back to where I used to be now, but the only difference is that I’ve got a Delta table as opposed to a Parquet table, and one thing I’m gonna go do is that I’m also gonna create a Delta loan still to stream. So I have a loans Delta table, which is for batch queries. I’m also gonna create a little Delta stream, which is for streaming queries, but they both go to the exact same location, this Delta path. So it’s the same file system whether I’m running a batch query or if I’m running a streaming query, same thing. All right, so now that I’ve got that, so this is giving my quick count, woo-hoo, 14,705 let’s go ahead and try to run this query again. So same one again, but remember I actually had six columns not four columns. So with this one, I have six columns, not four columns, but guess what, because I’m using a Delta table, I’m seeing a schema mismatch now. It tells you right here, if I want to merge the schemas together I can use dot option word schemas true. So in other words if I wanted those additional columns to be included all I had to do was change the code that I was using to include that option, and I could have allowed the two additional columns to go into my data. But I don’t, for now, want to do that, but it’s a good call out. So in other words, if I have good business justification or good business reasons to do so, there you go. But it’s calling you out right here. Here’s your table schema, these four columns, loan ID funded amount, paid amount, address state, and here’s my current data schema. I’ve got the timestamp and value. So it’s warning me that there’s a problem here. All right, so let’s instead just simply go fix this. So in this case, what I’m doing is that the stream data, because I know the stream data automatically includes the timestamp and value because it’s streaming, let me just go ahead and specifically specify dot select where I’m only going to include the four columns. Now that I’ve only done that, when I write this data down to my Delta path location, it’ll only write it with four columns, not six.

So that’s a great thing about Delta Lake because there’s both this concept of schema enforcement, I.e. we prevent data from going in to potentially corrupt the data you have, and also we allow schema evolution. So if you have a good reason that you want to change over time, we can merge the new schema, particularly if you want to include timestamp and value, we can merge the new schema with the old schema, but it won’t corrupt the data. The old schema will still be there. So that’s what’s pretty cool. So now let’s go ahead and run this. It’ll take a couple seconds for it to kick in, but so now we have the streaming query kick in.

So let’s go ahead and kick this off. Boom, so now we have a dashboard. It’s trying to process records, so now it’s trying to put data inside it. It’s right now running about 54 or 55 records per second, which is cool. Let’s go back up here and you’ll notice right away the number changed, 15,255. And over time, as that stream is kicking in, it’ll be able to put more data in, obviously it’ll increase. But what’s also cool about it is that because I’m using a Delta Lake table, I’m actually able to go ahead and run this again. So in other words, I have a second stream. So this is my stream query 3. Same concept, same code, I’m just gonna be running, for sake of the argument, this would be representative of two different sources, but they have the same schema, same data that are all trying to write to this thing at the same time. And sure enough, right away, stream query 3 is jumping up to 147 records per second, and if you go back up you’ll notice the numbers keep on steadily increasing here. So, this is what we mean by simplifying what you create. It’s simplifying your data engineering pipelines. The fact is that, irrelevant of a single source, multiple sources, whatever, if I decide that I need to put everything down into a single table, which is what this example is, I can because I have multiple streaming jobs that can hit the table concurrently, and because I have ACID transactions, I’m protecting the data underneath this at the entire time. And let’s go ahead and run this. Remember I said that we were looking at loans underscore Delta underscore read stream, that’s actually telling me what the read stream looks like, but I can also look at the batch. So in other words, the loans for Delta. This is a batch table that’s looking at the exact same source. And again now I have a batch query read, which is telling me right now it’s 23,455. I’ve got two streaming writes all happening at the same time. So this is how powerful it is, that you can actually, because I can have multiple streams read and write, multiple batches read and write, now I can actually simplify everything because everything’s shown in a single view. I can then organize my jobs to be streaming jobs as opposed to just a bunch of batch jobs. Then instead of having 20 batch jobs, I could potentially run a bunch of single micro batch or streaming job to actually simplify things.

So let’s take a look at the file system underneath the covers. Remember, like I said, we’re gonna stop all the streams, and then let’s take a look at it. And so this is the Delta path that we were talking about before.

What you’ll have here is basically, if you’ll notice, it’ll be just a bunch of Parquet files. So here we go. Same thing as you’re used to working with Parquet. The main difference is there is this Delta log folder. Otherwise that, all the original Parquet. All these little Parquet files are from the stream, and you probably could see this big one, larger one, that’s the original Parquet file that had the original data.

So if you look at the log, what is the log? Basically it’s showing you all the different versions of the data, so every single insertion has a JSON around it, that JSON describes and tells you exactly what happens inside that transaction. Sort of cool that way. In fact, let me go ahead and just choose this one for the fun of it.

You open up the JSON file and take a look at it. So it tells you the timestamp, the commit information. who it was that did it, for example that’s me, what what type of information, like it was a streaming update, and all the transactions, this is a transaction that happened, and all the values associated with that. All that’s stuffed inside here. That’s actually what’s inside that JSON. So underneath the covers, we’re creating this ACID transaction to basically protect the data underneath the covers. And actually to make it look a little nicer, we actually create this history, so you just go describe history, and then you see all the versions of the data, so all the streaming versions, all of it right here.

So before I jump into any more of this, let me go back to the slides really quick. So let me do that, oops, sorry, there we go.

Connecting the dots…

So, let’s connect those dots back together, since we’ve got about seven minutes left for the presentation, I still want to leave some time for Q&A. Let me connect the dots. Am I able to read consistent data? With Delta Lake, I am able to because I’ve got snapshot isolation between the writers and the readers, just as you saw I was able to go ahead and run multiple stream writes and read all at the same time. Am I able to incrementally read from a large table? Yes I can. Actually, I’ve optimized file source with scalable metadata handling. This scalable metadata handling, actually let me go back and just show you this real quick. Sorry, I gotta get this right. You’ll notice that there’s this checkpoint Parquet here.

So every tenth JSON inside here actually has a dot Parquet inside it. So there’s a question that just came in asking me, you know what what is that Parquet? They managed to know it. So they saw the JSON, but still wondering about the Parquet. So let me answer that right now. The Parquet here basically is to go ahead and say, all right, after every tenth transactions, I’m going to convert this to Parquet. So instead of actually trying to read each individual JSON file, I’m just gonna, Spark will be able to read the Parquet file, such that later on, if I shut down the cluster and then I’ll relaunch the cluster at a later date, instead of trying to read every single JSON file, I’m just gonna read the Parquet files. And so that’ll make things a lot easier. It’s already in the format, it will stream into memory and boom you’re on. So basically, instead of reading 31 JSON files, I’m gonna read three Parquet files and then I’m actually gonna read the one additional JSON to be up to speed on what the current transaction is.

All right perfect.

Okay, and so am I able to roll back? Yes, you can. I actually just have this concept of time travel. I don’t have time in today’s session to actually go ahead and show you time travel, but if you look at the previous YouTube. it’s on YouTube right now, excuse me, previous Tech Talk on getting data ready for data science, we actually show a lot about roll backs and time travel. This idea that because you are able to see the history of the table, it’s not just you see the history, the data of that table still resides there. So that means I can actually roll back to that previous view of the data, which is pretty cool.

So for example I can go ahead and roll back to like the nineteenth version of that data. So for sake of the argument, I could do something like, I forgot how to write the version statement. This is what happens when you try to do stuff live. Fortunately I actually have the version statement down here, so I’m just gonna copy it.

Sorry.

So, the good news of what I’m showing here is I’m actually showing you the full notebook, that full notebook actually shows you the roll backs and everything inside here, so it’s pretty cool. But let me go ahead and actually get the time travel correct.

You know what, probably with the time that’s left, I’m probably gonna skip this, but let me go ahead and actually finish off what I was talking about, and then I’ll get back to the demo. Cool. So back to presenting here. Sorry about that.

All right, so we can also replay historical data, stream the back-filled historical data through the same pipeline, which is sort of nice. So in other words, whether it’s historical data or streaming data, I can actually merge the two concepts together without any problem whatsoever. And this is important because I want to be able to look at the data without any problem whatsoever. I want to be able to look at the data, irrelevant if it’s new data or old data, I want to be able to look at it without actually going in and saying that. I have to build a separate table for streaming and a separate table for batch. No, instead I can just go ahead and look at the same table that has both my batch data and my streaming data at the same time, which is sort of nice. So that’s an important concept as well. All right and then also just as important, I want to be able to stream any late-arriving data and add it to the table as they get added. This is basically this concept of like, well I have data that’s coming in. When the data is coming in, I want to be able to go ahead and make updates to the data or I wanna be able to do deletes to that table. Well guess what, with Delta Lake I’m actually able to do that now because the I have ACID transactions protecting this data the entire time. Because I’m able to protect the data without any problem, then sure enough I’m able to go ahead and reprocess it, and this goes back to why we talked about the silver, gold, blue, silver, sorry, bronze, silver, gold concept because that way I can go ahead and look at the data, reprocess the data from the original bronze concept, and delete it and reprocess it all over again if I had to. All right, so altogether this allows Delta Lake to basically put everything, allow us to put everything together. How we can build this Delta architecture, is because Delta Lake allows us to do all of these things to allow us to simplify our data engineering pipeline. So some quick questions. Who’s using Delta Lake?

Used by 1000s of organizations world wide

Well there’s tons of organizations, tons of customers. Last month, there’s actually two exabytes processed, not one, two exabytes processed in last month just alone. So I talked about the Comcast Universal case. There are many other customer scenarios that you can check out on the Spark and AI summit, which actually has that information. Oh here we go, I did include the slide.

COMCAST

So for example, the Comcast was sessionization with Delta Lake.

The petascale jobs because in order to improve their reliability they had these petascale jobs that they need to make more reliable by using Delta Lake. Not only did they get the 84 jobs down to three, they also have the data latency and just as important, if not more important, they actually had 10x lower compute. Instead of 640 instances, they were really able to knock it down to 64, which is pretty cool.

So how do you use Delta Lake? Right, to use it, you can basically add a Spark package, you can use Maven, or like I said with dataframe instead of dot format Parquet, just switch to doc format Delta and bam now you’re using Delta.

Get Started with Delta using Spark APIS

So you can build your own Delta Lake right now if you want to. And then I actually wanted to just do the time-travel little trick now that I remember the versioning.

It helps if I actually go ahead and save this.

Helps if I go to Delta.

So this is actually supposed to run against a Delta table, so again I guess in this case I screwed it up. So my bad on this one. I’ll actually have to give you a different version of the code base in a second when I actually have a chance to go ahead and switch to the API. Looks like the SQL context actually isn’t working, so my bad on that one, haha.

Nevertheless we’ll go back to the session here.

Notebook from today’s session

So, you want to use this notebook. Well this notebook actually is relatively straightforward. You just download the notebook at dbricks.co/sais-eu19-delta. We actually are gonna go ahead and put this link inside the YouTube channel link, and also if anybody who signed up for the session, they’ll get to go ahead and log into it. So either way I think you’ll be good to go. So by doing that then I think you’ll be able to go ahead and try it out and give it a whirl.

Delta Lake Connectors

All right, I did want to call out some quick other things. Delta Lake is, as you can see here, is rapidly becoming a standard for us, and so because it’s rapidly becoming a standard there’s a question here, will it work with Hive? Yes, in fact we’re working with the community right now to go ahead and figure out how to basically work with Hive. So this is right now in private preview right now. And so now that it’s in private preview, we’ll make it to public preview shortly, but the idea is that we’ll be able to go ahead and make it available in Hive to work with. We can also, give me one second here, like I noted in the Delta Lake 0.50 blog, we’re actually gonna be able to go ahead and showcase it using with Presto and Athena. We’re also working with Redshift and also Snowflake, and this is just a current numbers that we have right now. So we actually will have more shortly, but as you can tell here there actually is a great number of connectors already able to work with Delta Lake, which is pretty sweet.

Delta Lake Partners and Providers

Okay, also Delta Lake providers. Those partners and providers are already working with Delta Lake. You can go ahead and, as you can see here, working with Tableau. You can also work with Privacera, Attunity, talend, Streamsets, Qlik, WANDisco, informatica, and in addition to that, Google Dataproc. Google Dataproc recently made an announcement to actually be able to go ahead and work with Delta Lake, as well. We’re expecting more and more announcements to come out shortly.

So basically, take your time and we’re gonna go ahead and be able to showcase that.

Users of Delta Lake

And then, users of Delta Lake. This is just a small sample, small example of that, but there’s a lot of cool examples of customers that are currently using Delta Lake. whether they’re using Databricks or they’re not using Databricks, doesn’t matter they’re all using Delta Lake, so it’s a pretty cool thing.

Subscribe Today!

So saying that, I’m gonna leave a few minutes left to go ahead and answer some questions, but if you want to listen to this session and you can subscribe today by using dbricks.co/youtube or relisten, and in between all of that, I did want to go ahead and rerun that demo because I kept on screwing up. So you notice originally I have, let’s see here we go, at least 23,000 inside my loans Delta table, so I’m gonna go ahead and run this little code snippet instead. This will say what version 19 looks like, so now I’m gonna go run that.

This is at the nineteenth version of our stream here.

So I’m actually down to 22,455. What was it look like when it’s version 0? I’m gonna run that again. So this is when the table was initially just created the first time, and it’s 14,705. So this is this concept of time travel that I was talking about. So my apologies for the delay here, but that’s more or less the concept, that you’re able to run time travel to roll back, also it’s very handy for GDPR purposes and that’s also, like I said, a great segue for us to go ahead and talk about next week, we actually have a session on how do you address GDPR and CCPA using Delta Lake? So a few questions left. So let me go ahead and try to answer them. If you do have questions, like I said, place them in the Q&A. The first question I’m gonna answer is, can I use Delta Lake for OLTP workloads? And the answer quick answer is it is not designed for OLTP purposes, it’s really designed for the purpose of data warehousing or BI-type queries, the data warehousing and data workload.

If you want to use it for OLTP, I would actually just suggest an actual TP system. We’re not actually trying to simplify that part of the process. We’re trying to simplify the process where you get to analyze the data and go ahead and run machine learning against it. I believe there was also another question about, can we create this using Hive? As noted, the Hive connector is currently in private preview. It will become publicly available shortly.

Can I use Delta Lake on-premise? That’s a great question, and you absolutely can use Delta Lake on-prem. Delta Lake itself actually is, basically, for all intents and purposes, just a jar that you add to your Spark process. So for example, you go to stash packages so you can basically include the jar or include the Maven coordinates, or basically use Maven if you’re compiling or SVT or whatever else, but the point is that once you’ve done that, now the Delta jar is included. So what’s instead as if you’ve got an on-premise Spark environment, you absolutely can go do that, just include the jar, the latest Delta Lake job, that matches your Spark version and your Scala version, and then you’re pretty much good to go.

And I’m gonna finish this up with the answer, how likely can we use this in production? And the fact is actually this project Delta Lake has already been in production for the last two years. When we originally created Delta, the back story for Delta was that we were trying to build it to address some of the oops, like as in, I made a mistake, errors that people made into their data, and/or we’re trying to address the issues that we had for streaming. Because we were running into those issues, we actually built Delta as originally a Databricks project, which is sort of an add-on to Spark. But due to its popularity due to all the questions asked, due to everything that people really wanted to see, we realized that this project made a lot of sense, not just for the Spark community or, for that matter, Databricks, but not just for the Spark community but for the data engineering community as a whole. That’s why we open-source the project last year, actually. So it’s an open source project you can use on-prem, you can use it for your own environment, EMR, HG Insight, you just have to make sure everything’s matching. Just a long as you’re using the correct versions of Spark, you’re lining up the Spark version and the Scala versions, you will be able to use it. Now saying all this, if you want to learn more about how we got here, in terms of like how we can use in production and things like that, the other thing that I want to actually ask you to do is go to dbricks.co/youtube, our Databricks YouTube channel and check out the genesis of Delta Lake. This is where I interviewed Burak Yavuz, senior software engineer at Databricks, who actually was part of the creation of this project. So you can learn a little bit more about the back story behind it, as well. Okay, well that’s it for today. I realized that there are other questions. I apologize I couldn’t get into all of them, but I want to be a cognizant of the timing here. So I do thank you for your time, for attending the session. Please do, number one, be patient with me for my mistake in trying to get that particular time travel query done for you, but at least I got it done, so thank goodness for that. Number two, you’ve got comments. Go ahead and ping me directly at my Twitter handle, @dennylee, or just as likely, go ahead and go to the Databricks YouTube channel and go ahead and chime in, and put your comments directly on the YouTube channel. I will regularly go login and answer those questions. And finally, also go to Delta.io, that’s the Delta Lake page. So the latest videos, the latest notebooks, tutorials, all of them are actually there.

Advanced: Diving Into Delta Lake

Dive through the internals of Delta Lake, a popular open source technology enabling ACID transactions, time travel, schema enforcement and more on top of your data lakes.

動画を見る