Delta Lake: Optimizing Merge

Download Slides

This talk will break down merge in Delta Lake—what is actually happening under the hood—and then explain about how you can optimize a merge. There are even some code snippet and sample configs that will be shared.

Speaker: Justin Breese


– Hello and good afternoon, Data and AI Summit Europe. My name is Justin Breese, and today we’re gonna talk about optimizing Merge on Delta Lake. I’m a senior strategic solutions architect at Databricks, and I live in the Los Angeles area. I work with some pretty big and fun customers here on the West coast of the United States. And when I’m not hanging out and having fun with them, you can hear me playing drums, guitar, see me on the soccer field or football, and I love old Porsches. So if you ever wanna talk about old Porsches or engines, please feel free to reach out. But in the meantime, today, we’re gonna talk about Delta and optimizing Merges. And to do that first, we’re gonna start at a high level of what is actually happening under the hood when we do a Merge within Delta Lake. We will then talk about partition pruning and file pruning, operation metrics, and understanding how we can use operation metrics to make informed decisions. We’ll also talk about large Merges and some tips, tricks and considerations. And I will also provide you with some sample configurations. Throughout, I’m gonna be sprinkling in various ramblings and observations. Let’s get going. So a Merge within Delta Lake. It can really be broken down into three key phases. Phase one, is we need to find the input files in the target that are touched by the rows that satisfy the joint condition. Then we wanna verify that no two source rows will modify the same target row. And we do this via an innerJoin. The second phase, is we read those touched files again, and we write new files with updated and or inserted rows. The third phase, we leverage the Delta protocol to atomically remove the touched files and add the new files. So again, three phases, first phase, an innerJoin to find out which files need to be modified. The second, is we actually modify those files, and the third is the atomic commit. But, let’s double click on the second phase because that can vary depending on what you’re trying to do. So the second phase, that’s reading those touched files and then write the new files with the updated and or inserted rows. The type of join, well at the end of the day, there’s a join happening underneath the hood here. And the type of join varies depending on the conditions of the Merge. For example, if we’re doing an insert only Merge, then, meaning we have no updates or deletes, then we’re gonna do a leftAntiJoin on the source and the target to understand what the actual inserts are going to be. So again, that’s a leftAntiJoin. If we’re doing a matched only clause, so when matched. That’s gonna be a rightOuterJoin. Else, that means we have updates, deletes and inserts. In that case we do a fullOuterJoin. The consideration is because if you know Spark well, you can understand that a leftAntiJoin can be broadcasted, so can a righOuterJoin, but a fullOuterJoin may not be broadcasted. So if we understand, “Oh, now there are things “can be broadcasted, sounds good.” So Merge is really about these three phases. Phase one, find the files that I need to do something to. Phase two, do something to those files, and phase three, make the atomic commit. Now that we understand these three phases, we can now figure out how to optimize each of them. But before we even jump into that. Before we even talk about any Delta or Spark configuration, let’s use the write instance type. For example, these windows that you see here, it’s the same exact Merge statement, the same exact data, brand one after another, the only difference is the instance type. The top window, I used a 16x large over here. The second is a 2x large. So both with 1200 cores. And let’s take a look at these windows. Let’s read from the bottom up. On the first phase, we can see it took 1.1 minutes with the 2x large. With the 16x large, the first phase took us 4.7 minutes. That’s a big difference. Let’s compare. Second phase, 14 minutes versus 34 minutes. Third phase, 26 minutes versus 42 minutes. All I’ve done was change the instance type, and this is a pretty material impact considering not much was different. So really the TLDR here is, choose the write instance type. Personally, big fan of 2x large and 4x large. I wouldn’t really go above that for Merge. So remember how we were talking about we were breaking down Merge into these three phases. So in this case we have the inner join and then I have inserts, updates and deletes. So I have that full outer join. Now that we understand that, we can understand what is actually going on here. So the first part down below here is the inner join. That’s what we’re doing. The second part is the full outer join and the optimizedWrite. And then finally we are actually writing to s3. And if we want to go faster, that’s when we need to start considering things like partition pruning and file pruning. And also before we even get to that, let’s be good data stewards. So let’s make sure we own persist data frames within Spark that you don’t need. Clear up your memory. I see it a lot of times with customers. There are random data frames that are pegged to memory and chewing it up. It’s unnecessary, so be good, df.unpersist system gc, clear up memory, be good. The other things we need to think about is depending on the use case, right? If it’s write optimized, or if we’re reading optimized, we may need to consider changing the Delta file size. By default, the Delta file size is one gigabyte, but if we’re doing a lot of writes, for example, we need to distribute data throughout many, many small files versus coalescing them into larger one gigabyte sized chunks. So if it’s write intensive, don’t be scared, you can use 32 megabytes or less of a file size. I wouldn’t really go less than 16 megabytes, but at the end of the day, you have your own free will. For read intense workloads, one gigabyte, the default file size is more than enough, and yes, we are working on changing that for you in the future, automatically, depending on your use case, modifying those Delta file sizes. Also, normal Spark rules apply. Partition size, shuffle partitions. We’re not gonna dive into those today, but feel free to check out either some sessions during Spark Summit on that or Daniel Toms on YouTube, a great friend and a great peer of mine has some awesome hour and a half long, just helping you demystify what’s going on in Spark under the hood, and really gets into the nitty-gritty on partition size and shuffle partitions. Prunes, not just the delicious juice that my grandparents love. Really in all seriousness, partition pruning, it’s not a new concept to Spark, to Delta or databases in general or data warehouses, but what a partition pruning does, it allows us to disregard specific partitions. And a file prune is very similar to that besides it lets us disregard specific files within a given partition. And currently within Spark and Delta Lake, we have to be very, very explicit about both of these. And I’ve linked to a kb on this topic, but just know for right now with Delta Lake, we have to be very explicit, if we need to partition prune, you have to tell it, same thing with file pruning. And we’re gonna go into an example in a minute on that. We are also going to improve this in the future to be more automagic, but the reality is right now, be explicit. And when we can, we wanna prune on the left. So we prune in our source, and we wanna prune on the right in the target. Let’s go through a partition pruning example. So, all I’m doing here is within my source data frame. Let’s assume my target is partitioned by date. So I wanna go into my source and I wanna say, “List all the dates in the source data frame.” Then I wanna make a string of those dates. So now all that I’m left with is a long string of dates. 2020-10-12, 2020-10.18, for example. Now, I wanna go to my source, and I wanna filter and make sure I only use those given, make sure I only have data for those given dates ’cause again, we wanna prune on the left. Now the next phase, I wanna prune on the right. So I’m actually going to use an in-clause here. So I’m gonna say, “I’m gonna Merge.” And in partition prune strength. So now I’m just saying, “Hey, go to the target/baseline, “and retrieve only the partitions “that are part of the string.” This lets me skip all the other partitions that are happening within that table. And this is gonna be great for your performance. The next thing is I just have the matching primary key. So that Merge knows what effectively is that joining condition, in this case where baseline PK equals my inputs PK. And I know, I’m using source, inputs, baseline, and target interchangeably, but I’m sure you’re following along. The other thing I’d like to point out here is broadcast. Because remember I was saying, we can actually broadcast the inner join we can broadcast, and the anti join we can broadcast within the second phase. So we have to think about these things. So if we can actually broadcast, let’s take advantage of it. Remember, there is a Spark configuration to change the amount that’s broadcasted. By default, that’s about 10 megabytes, but you can definitely increase that much, much higher. And ultimately, how do we know if things are actually happening and being pruned? Well, we can actually go to the physical plan and we can take a look and we can see there’s something all the way on the bottom called partition count. So let’s just assume, I have a hundred partitions in this given table, and if my partition count is less than the a hundred, that means I’m successfully partition pruning. So on the bottom I listed out, if partition count is less than total partitions then congrats, bro, you’re partition pruning. Alright, that’s partition pruning. File pruning, very, very similar. The code is gonna be pretty much the same until we get to here. So remember, I still have my normal baseline date in with a list of strings of the dates that I want, but Delta Lake also has the ability to do zOrdering. So, zOrdering, we can kind of co-locate and cluster like IDs within a similar file. And so let’s just say, I have a column that I’ve zOrdered, I’m calling it zOrder column. And I can say, and I want to, I have my partition pruning, but now I also want where the z0rder column is less than one, two, three. So what we do for that Merge is we’re going to the target and we’re actually querying the column statistics to look where the min-max range, if one, two, three falls into that and we retrieve those files accordingly. So we’ve partitioned pruned, so we’re grabbing only a subsegment. And now we’re file pruning, we’re grabbing a segment or a subsegment of the subsegment. So this just allows us to be super efficient and fast with our Merges. Operation metrics. You may have noticed if you do a SQL command, describe history and the Delta table name, you’ll see a column called operation metrics. As of Databricks runtime 6.5, the operation metrics got a lot better, so definitely use 6.5+. To me personally, they are the source of truth for DML events. So my updates, inserts, but especially Merge, let’s really talk through what’s happening with the Merge. And here are some things to think about. numTargetRowsCopied. Personally, this is the one I focus on a ton about. TargetRowsCopied. Let’s just say we have a million rows and we have two files. Each file has 500,000 rows in a file. If I need to modify one row in one file, I am copying 499,999. Now that may not seem like a big deal, but let’s extrapolate to much larger tables. And if I’m only modifying one single row in every given file, I’m now gonna rewrite the entire table and you don’t wanna do that. So, to me, this is a data point. I like to think, do the numTargetRowsCopied relative to the total amount of rows in the table. If it gets pretty high, to me, that’s a good data point, and we’ll actually talk about that in the next slide. numOutputBytes. So this is about a 2.8 terabytes, that means this is what was written in this given Merge command. Again, I like to think, two point terabytes relative to the size of the table. If it’s 100 terabytes, I mean we’re talking about 2%. If it’s a 10 terabytes, we’re talking about 27%. So we just need to keep track of these types of things. The numTargetFilesAdded. This is also a good one right there. It’s about just under 900,000 files that have been added. If I’m using smaller Delta file sizes like I was talking about before with 32 megabytes, again, not necessarily a big deal, but it’s a data point. And then finally, numTargetRowsInserted/Updated/Deleted. These are crucial because you’re expecting to insert, update or delete within each micro batch or each batch that you’re doing so you should really understand, do these equal out your expectation of rows inserted to rows actually inserted in Delta and understand if there is a difference. So really, I mean, my takeaway is if I look at inserted, updated and deleted, I have 1.2, 1.3 and about 900,000, so I’ve got about 3.2 or 3.3 million rows. So the TLDR is to modify 3.3 million rows. I’m copying, yeah, I’m copying billions. So maybe that to me is telling me, perhaps I need to lay out my data differently on disk. This is where I was talking about. If my numTargetRowsCopied is insanely high relative to the total amount of rows in the entire table, this tells me that maybe we need to partition differently, maybe we need to leverage a zOrder, maybe we need smaller Delta file sizes, or maybe it’s a combination of all three. But again, when I look at the operation metrics, it is the source of truth. It’s providing me with some data points so I can make informed decisions. Large Merges. I told you we’d talk about it and now we’re going to do it. So, me, personal opinion, when you have a large table, we’re talking about many terabytes of data, give each table its own s3 bucket. And if you’re doing a structured stream, give that checkpoint its own s3 bucket as well. And the main reason is because s3 parallelization or parallelism, is defined by the prefix. So let’s take a look and understand what is a prefix. This is an actual s3 bucket that I have, and no it’s not public, jbreese-databricks-bucket. Underneath that, I have a partition, year=2019, and year=2018. This is good because year 2019 is a prefix, year equals 2018 is another prefix. An s3 parallelism is defined at the prefix. The prefix limit is 3,500 writes per second and 5,500 or 3,500 reads per second and 5,500 writes per second. So each prefix here, which is also a partition, can get these limits. So that means in total this table right now, I can do 7,000 reads per second and 11,000 writes per second. Now let’s look at a bad architecture. Same bucket, jbreese-databricks-bucket. I have a prefix under that called data. Underneath that, I have table A and table B, and same thing year equals 2019, 2018. But the prefix definition is what is directly under the s3 bucket, so in this case it’s data. So now, data, everything under data doesn’t matter how nested it is, is subject to these s3 prefix limits of 3,500 reads and 5,500 writes. So that tells me for data, so that’s table A, B, and all these other partitions underneath, I’m subject to 3,500 reads and 5,500 writes. So that’s why to me, when we have a large table, give it its own s3 bucket and partition just under that. So now you get all of these prefixes and partitions and all of this glorious, glorious parallelism. Now I know, someone is probably writing in right now into the comments or an email or messenger pigeon, that s3 will eventually re-partition. And I completely agree, they will eventually re-partition, but that can take time and I’m not the most patient person in the world. And so I just despise getting a s3 throttling. So that’s why to me, just give each each huge table its own s3 bucket. The alternative you can do is also reach out to your AWS TAM. They are super helpful in helping you pre-partition. But again, these are just extra steps that you have to go through. I’d rather just give each large table its own s3 bucket, a lot easier, a lot cleaner. Some other large Merge tips, especially if we’re using large clusters and we’re talking more than 900 cores. In addition to what we were just talking about is optimizedWrites. Delta has a great feature called optimizedWrites and random prefixes and as well as writing at the root. An optimizeWrites ensure that one core writes to one partition. And it does this via a final shuffle. So if I, and it does it up to the bin size that you specify. So by default, it’s a 512 Meg. So if I need to write a gig to a given partition, we’ll actually use two cores because we pack one core full of data and we shuffle all the data going to that partition to one core up to the bin size of roughly 512, then we’ll grab a second core for that given partition. Doing this helps prevent lots of small files and it also helps s3 from throttling you by having all 900 cores writing tiny, tiny files to the same partition at the same time. So super helpful feature. So remember, huge clusters, optimizeWrites. The Delta random file prefixes is big as well. Delta file prefixes. If we were to LS on an s3 bucket, we would see instead of year=2019, we’d see a random prefix, series of letters like three or four numbers and letters. Then the Delta log keeps track of which files belong in which partition. And we do this is because we wanna really prevent hotspotting within s3 here. Each of these random prefixes gets all that lovely parallelization that we were just talking about. And so by doing that, we now really help spread the load throw at s3 so we don’t get throttled. So, for huge clusters, optimizeWrites, random prefixes and write at the root, just like we were talking about. Some other configs that you’re going to want to consider; the multi-part threshold, super helpful. 204 is completely fine, but it really depends on the file sizes that you’re you’re writing about. optimizeWrite, yes. The num shuffle blocks by default is about 50,000. You could definitely out that 200,000, if you have many terabytes that you’re trying to do, but I really wouldn’t go above that. So don’t go above a hundred thousand. Randomized file prefixes, we just talked about it. And make sure you enable dynamic file pruning. And to prove that this works, I had a 2.7 terabyte change set and ended up writing it with 2,400 cores in 1700 minutes. More importantly, zero s3 throttling or complaining because I followed these three things above; optimizeWrites, random file prefixes and writing at the root. Final recap. We talked about Merge basics. We talked about the three phases that are actually happening during a Merge. And we talked about partition pruning and file pruning, operation metrics and using those to make informed decisions and questions for that matter. We went through large Merge tips, sample configs. So, I can’t thank you enough for attending this. Feedback is super important. Reach out to me personally if you’d like, at I hope you have a wonderful rest of your day, evening, morning, wherever you are. And thanks for attending.

Watch more Data + AI sessions here
Try Databricks for free
« back
About Justin Breese


Justin Breese is a Senior Solutions Architect at Databricks where he works with some of the most strategic customers. When he isn't working you'll find him on the soccer field, working on his old Porsche, or playing the drums and guitar. He lives in Topanga, CA.