The Delta Architecture pattern has made the lives of data engineers much simpler, but what about improving query performance for data analysts? What are some common places to look at for tuning query performance? In this session we will cover some common techniques to apply to our delta tables to make them perform better for data analysts queries. We will look at a few examples of how you can analyze a query, and determine what to focus on to deliver better performance results.
Speaker: Franco Patano
– Hello everyone and thank you for joining me today. My name is Franco Patano and I’m a Senior Solutions Architect with Databricks for enterprises. And today I’m gonna be showing you some common strategies to improve performance on your Delta Lakehouse. So what we’re gonna be covering today, I’m gonna be showing you some strategies with table properties, like how to structure your tables at each layer of the process and how to use Z-Ordering to your advantage. I’m gonna be showing you configurations with Spark and Delta, that will help improve the performance of your execution time for ETL and for queries against Delta Lake tables. And then I’m gonna be showing you some query optimizations for hints and joint strategies, to help move the query execution along. Alright, so table structure. Here’s the thing with Delta tables, stats are collected on the first 32 fields, this is configurable, there is a config called dataSkippingNumbIndexColumns, that essentially… Spark and Delta will attempt to collect statistics on the first 32 columns or whatever you have that config set to. And essentially what we wanna do, is take advantage of this operation and how do we do that? Essentially, we’re gonna restructure our data accordingly. Stats collection is really good on numerical fields, so we wanna move numbers or key fields, so anything that’s used in a join or in a predicate or high cardinality columns, numericals, that are used in query predicates, we wanna move those fields over to the left, into the first 32 positions or if you’ve adjusted that that config, within the columns of that config. And what we also wanna do, is we wanna move long strings that really don’t differ that much to the after the 32nd position or that that setting, wherever you have that column to and the reason here is that long strings are kryptonite to stats collection and we really don’t wanna waste time, if it’s not gonna help us with query execution. So, knowing these things, if we restructure our tables accordingly, it’ll actually help boost performance on our Delta Lake tables. Table properties, alright, so here’s the thing, there’s this awesome new feature called Auto Optimize, Optimized Write and essentially what it does when you break it all down, there’s an adaptive shuffle that actually happens before the write operation, that organizes the data accordingly, so that it’s doing a more efficient write to the partitions in the Delta tables. And this works for all of the different types of write operations, so if you’re writing back to persisted store, you wanna turn this feature on, you can turn it on with tables, setting the table parameter that you see there or you can set it as a default on the cluster. The way that this feature works and how I see it in the wild, I highly advise just turning this on on the cluster level, unless you find performance to be… You have an edge case or something where, it’s really not taking advantage of it. And where I really see the benefit of this is in all of the write operations, but also, if you’re not running… So if you have a stream that’s coming in constantly adding data to your Delta tables and maybe you’re running optimized every day or it’s on a schedule, with Optimized Writes, when it does that adaptive shuffle before the write, it actually organizes the data, so, on streaming queries, we actually get better performance on selects in between optimize runs. So, in general, in the wild I’m seeing really good performance with this setting, so I’m advising to give it a shot and unless you have a really weird edge case, you’re gonna see increased performance on your INSERTS, MERGE and UPDATES. Alright, so if you are one of those individuals or organizations, the few, the proud that have a really, really high velocity use case, where, you have data that’s coming in at the thousands of requests per second, against S3, we have some tricks to make this a little bit more performant. What we do is we randomize prefixes on S3 so we can avoid the hotspots in the metadata. And what we do is we dedicate an S3 bucket to a Delta table, so you have to put it at the root and then you turn on this table property randomizedFilePrefixes, what this does is it helps even out the requests, so that it’s not putting hotspots on S3 and you’re not getting errors from S3 during production runs, so, highly advise, if you do have use cases where you’re in the thousands of requests per second read or writes, should definitely look into this to speed up your high velocity tables. Alright, OPTIMIZE and Z-Order, I often get questions about this in the field, when should I be doing this, how should I be doing this? So, what you wanna be able to do is… Essentially what OPTIMIZE and Z-Order is, is OPTIMIZE will bin pack our files for better read performance and Z-Order will organize our data for better data skipping. Today’s skipping is already included, where with the stats collection that we talked about a little bit earlier, but what Z-Order does is it actually organizes that data, so that range selections are more effective, so data skipping becomes more effective when we organize that data. How should you be doing this? How should you be organizing that data? So if you’re in the data warehousing realm or if you have those types of ETL jobs, what you wanna do is Z-Order by the keys, so the primary foreign keys in your dim and facts tables, so common join operations and remember, on numericals, so, keys are very common to be numbers between tables, so we wanna Z-Order by these keys. So on very large fact tables, we wanna take a look at those at those as prime candidates. And then if you have like ID fields, if you don’t have traditional dim and fact tables, but if you have keys that are being joined on by different tables, right? We want to… Maybe we want an increase in performance on those joins, you wanna Z-Order by that key that’s gonna make the execution more efficient. And then, very commonly, if you have query predicates that have high cardinality, you wanna Z-Order by those, ’cause you’re gonna get excellent performance improvements by leveraging Z-Order.
– Alright, so we talked about high cardinality, what exactly does that mean? How should I be thinking about this? Where does partitioning come into play? Like how should I balance these things? So essentially, this is how I see the world and I provided a little bit examples here to be able to visualize and understand what’s going on here. So, essentially with partitioning, most commonly people use date, that is a very good candidate, ’cause if you think about how data comes in, usually there’s a cadence to date, right? You’ll get an number of equal records per period of time and so date is a very common thing to do. But let’s say you have an instance where date isn’t an option or something like that, essentially, partitioning is good when we create about a gigabyte partitions a piece, per partition, so we want regular cardinality, we want something in the middle for partitioning effectiveness and you could see some examples there of common repeatable data, but essentially what you’re aiming for is to create nice even chunks of data of about a gigabyte in size and that’s where you’re gonna get the most effectiveness for partitioning. Z-Order on the other hand, is a different case, what you wanna do to optimize for Z-Order effectiveness, is high cardinality columns and in particular, items that will increase or decrease, that have ranges, you can order them, that’s where you’re gonna get the most effect ’cause essentially with data skipping, which is the key to performance on these types of queries, takes advantage of min/maxes on your numericals, so Z-Order will help with the high cardinality columns and then partitioning we want regular cardinality and in general, this is how things work and how you can think about those two constructs. Alright, so now we’re going into some parameters, there is some new things that came out in Spark 3, one of my favorite is Adaptive Query Execution and I’m gonna go through a couple… A few of the optimizations and how they work, in order to explain a little bit about what’s going on behind the scenes and to let you know why you should be turning these things on if you have those types of use cases. So Adaptive Query Execution, you should turn it on, first off, right? So you wanna enable that. It is default on Databricks runtime 7.3 +, awesome. But just in case you’re using a different runtime, if it’s previous to 7.3, has to be on 7.0 +, because it’s a Spark 3. What we wanna do next is… There’s a new feature in adaptive pre-execution called coalesce partitions, if you’ve ever had to mess with the SQL partitions in order to tune performance on your job, you know that you have to query the cluster to figure out how many cores you have and then you want a multiple of that, there’s a lot of math that goes on, but this can be made significantly simpler. Essentially what happens here is that the optimizer will attempt to coalesce the SQL partitions down in order for it to make sense for the execution of that query. So, I just set this parameter to turn it on, because then I just let Adaptive Query Execution manage my SQL partitions, which is excellent. So, if you know you might have skewy data, where, there’s more data on one side of a join than the other, what you wanna do is, play around with the adaptive skew join configuration, I have found that turning this on really helps with queries that would have skew in the data, it’s less for me to manage, I can just turn it on and if it’s there, AQE will just of manage it, if not, it really doesn’t hurt very much for the testing that I found. So I suggest turn it on and take advantage of that. It’s also another feature that I really like, especially if you use different types of cluster instance types and I’m gonna be explaining a little bit of this in the demo section, but essentially the local shuffle reader, instead of communicating over the network on shuffle, if the cores are within the same box, it’ll attempt to use the local shuffle reader instead of going over the network, would actually improves performance when you do have shuffle in your jobs, so I recommend turning that on. And then the Join Threshold, so, very commonly the broadcast hash join is one of the… Turns out can be very, very performant in a lot of our queries. So, the key to that is we have to broadcast the smaller side, right? There’s a big table, think of dim, fact tables, right? Fact table large, dim table small, a lot of times that dimension table could be larger than the default, default here is 10 megabytes. But the cluster actually has enough memory to broadcast more, so, if we increase this value, we can actually increase the likelihood that the plan will broadcast that smaller table. So I recommend bumping this up to 100 megabytes, you can play around with different configurations, but what I’ve found in most use cases in general, 100 megabytes is a good place to start. And then finally, not to prefer sort-merge join. Sort-merge join was great, for big data, it provides an excellent execution mechanism to manage that at execution time, unfortunately, most often than not with a lot of the most common queries, what we find is that we don’t actually need sort-merge join, we actually need a broadcast hash join or something like that and we don’t want it to use that. So here, we’re going to say, hey, you know what? Don’t prefer sort-merge join and see how that works. And I’ll show you a little bit about how these parameters can help out a little bit later. Alright now the Delta configs. First things first, turn the cache on, especially if you’re doing ETL or if you’re doing queries, this is enabled by default on cluster types in Databricks that say, “Delta cache enabled,” but some cluster instant types that I’m gonna talk about a little bit when we get to the demo, don’t have this turned on by default, but they actually do have locally cached fast disks, which Delta cache really… The proponent of it is the locally cached fast disks. So, if you are using a cluster instance that does have that, but it’s not in the Delta cache accelerated, turn this on, you will get much better performance. And then, the key here is the faster the disc the better the performance. You can enable it on clusters that don’t have fast disks for some odd reason has a hard drive or something, it’s still is a little bit more performance, but, on NVMEs you’re gonna get much better performance. Alright Delta cache staleness, this was an interesting, sometimes because the Delta cache is loaded, right? when you do a subsequent query or your query is taking advantage of the Delta cache, oftentimes at the beginning of the query it will do a check to see if the cache is still valid, meaning that there hasn’t been any new data coming in, this can actually hurt execution time and oftentimes, if you think about ETL orchestration, you won’t be getting your data refreshed very frequently, so if you have a batch job, where maybe your data is getting refreshed every day or every six hours or maybe every hour, you don’t need to check the cache that frequently, you don’t need to call back to the files. So you can actually increase performance on BI use cases or analytics use cases, by increasing what’s called the staleness limit. I generally set this to an hour, if you think about the workflow of a common analyst, they’re gonna be querying successive queries, oftentimes trying to find data or trying to understand a problem and so a lot of it… Within an hour is like when that work stream of holds, so that’s how I do it by default But if your ETL orchestration only updates the Delta table once a day, you can turn this on to once a day and you would get pretty good performance throughout the day for your analyst. Finally, use Databricks runtime 7.3 LTS+. There was an excellent new feature that was released for enhanced checkpoints, you can turn it on on 7.2 with the Delta config checkpoint, writeStatsAsJson, essentially it eliminates a deserialization step in the checkpoint process, speeding up the latency on short queries, so definitely turn this on in BI use cases and make sure when you’re writing Delta tables, you’re using 7.3 + or you’re gonna have to mess with different levels of support for this, I just recommend using 7.3 +. Okay, so now what happens if Adaptive Query Execution isn’t getting the hint, right? There might be… I wanted it to prefer broadcast hash join, but for some odd reason, in execution time it’s doing sort-merge join. I’m gonna give it a hint this time, so we’re gonna strongly suggest, hey, take a look at this and you could do that by putting these query hints inside your SQL statements. Broadcast hash join or nested loop join, one side has to be, to be small, right? So this doesn’t work if both tables are really large. Shuffle hash join, this is when you have to shuffle data but you don’t really wanna sort it, it can do large tables, but you have some risks about running into out of memory errors if your data is skewed, so be careful there. And then, sort-merge join, right? The most robust one, but you have the shuffle and the sort and it can be slower when you have small data or both sides aren’t large data. And then, finally cartesian, I don’t recommend this, unless you have a use case that requires it, I can think of a few, but generally you wanna stay away from that when you’re joining data. Alright, so we’ve talked about a lot, right? Now let’s think about the Delta architecture pattern that we have in front of us and align these tables structures and these tips that we’ve been been talking about. Now how do I structure my layers of my Delta architecture? And what I can suggest is these tips. For your Bronze tables and this is raw data, this is data that’s coming in from the source system, you wanna land it raw, don’t change anything and turn off stats collection, because at this point in time, it’s just a extra overhead on data processing and we’re not really querying Bronze very often, we’re actually just using it as a staging area, so, let’s diminish what we have to do there. But when we get to Silver or the next layer, filtered or clean layer, this is when we wanna start moving data around, like I made the suggestions earlier, restructure your columns, right? Move the keys and the numericals to the left of the data skipping index columns or 32 columns and then move your long strings to the right of that property and that’s gonna help out with stats collection and then when you wanna to do optimize and Z-Order on those fields, they’re gonna be within those columns so that Z-Order is effective. You wanna make sure… Turn on Optimized Writes, right? At the cluster level, that way all have our INSERT, UPDATE and MERGE operations are efficient. And then, we want to optimize and Z-Order by the keys between the Bronze layer and the Silver layer and on our Silver tables and this is gonna help with our ETL processing, right? Our merge operation is gonna get boosted, if those keys are organized. And then when we get to the Gold layer, our business level aggregates or our consumable layer, again, here we want to optimize and Z-Order by common join keys or common high cardinality query predicates and what that’s gonna give is our analysts really good performance on these Delta tables. Again, turn on Optimized Writes, because we want the merge from Silver to Gold, to be efficient as possible. We want to enable Delta cache and we want as fast a disk as we have available to us. And then, on these clusters let’s turn that staleness limit up to align with our ETL orchestration, so if it’s every hour, you could do one hour, if it’s every day, you could do one day, this is gonna help manage that Delta cache and make that experience more performant for our analysts at runtime. Alright, finally some pro-tips. Always use the latest Databricks runtime, we’re constantly improving performance and adding features, you wanna make sure you’re taking advantage of all that engineering brilliance and use the latest run time. The key to fast UPDATE, MERGE operations, is to rewrite the least number of files, Optimize Writes help here. You can also configure the max file size for OPTIMIZE, which helps with rewriting the least amount of data, so we recommend 32, you can go between 16 and 128, depending upon how your operations are working. The key to fast select queries is Delta cache, OPTIMIZE and Z-Order and Adaptive Query Execution. So you wanna make sure you take advantage of those configuration items and settings. Finally, the Hilbert curve for OPTIMIZE, this is something that’s new, so it’s a little bleeding edge, you might not wanna put this in production, but you might wanna test it out in your pre-production systems to see what kind of performance you’re getting. The multidimensional curves that we use right now is the Z-Order curve and if you look at it in dimensional space, it looks like a Z, that’s why it’s called Z-OrderING. Hilbert curve is actually… It’s like a U, it’s a different curve, different math, essentially, what we’re finding is that Z-Order curve is really effective, for like one two or three columns that you’re Z-OrderING by and then the effectiveness falls off, that’s why you have this bell curved to how many columns are you’re optimizing and Z-Ordering. Hilbert’s curve, we actually see a more even distribution with performance, for doing up to five or six columns that you’re Z-Ordering by. So, you know, take this as you will, it’s something that we’re continuing to test and work on, more to come as we figure out how to best guide everyone. Okay, so now let’s take a look at some code and actually see this stuff if it really works, do these tips actually work? Let’s check it out. Alright, thank you everyone. So, now we’re gonna take a look at actually in practice, how are these configurations really gonna help me with my ETL performance and how can I… and BI performance? And what can I be doing? Can I see this in real time? Absolutely. So let’s take a look at a common use case that I see in the field, I’m gonna be using TPCH data, it’s just benchmarking test data, it’s really just synthetic data, it’s not really meant to be real. But what I’m gonna be doing here is I’m gonna be loading that data into a database, in Delta Lake tables and I’m gonna be performing ETL operations, so INSERT operations and then we’re also performing some queries. So here I’m gonna be creating that database in the meta store and then I’m gonna be creating my tables. So here are all the tables, metadata and I’m gonna be using Delta on those tables. And then what I’m doing here is just looking at all of my files, I’m going to be collecting a list of all the tables, I have a simple function here to help me insert the data and what I’m doing is just reading the file and then creating a temp view on it and then inserting that data into the table. And then I also have a function for optimizing all of my tables, I’ll show you a little bit about that in the following steps. So what we’re doing here is we’re just importing the data, right? We’re running through all of the tables and we’re creating a data frame from CSV files and then we’re inserting them into the Delta table that we created in the previous step. And you can see here that this ETL operation, the loading, took about 19 minutes, right? So that’s just taking the files from where they are in object store and loading them into Delta tables. So, now we want to run some queries on it, right? And before OPTIMIZE, this query it took about 2.7 seconds, which hey, is not that bad. But what I wanna do now, is I wanna actually take a look at what OPTIMIZE and Z-Order really provide me in execution time, right? So, let’s take a look and what I’m gonna do here, is I’m gonna use that OPTIMIZE function that I created and I’m just going to create a table of a… I call this a configuration table, that essentially, if I need to add new optimization keys, I can just add it to this table, instead of having to rewrite code, which is very key for when you think about how to manage your operations for the day. So I’m just gonna create a simple table and in each column, here is the table name and then these are the keys of that table, the join keys. I’m just gonna run through those and I’m gonna optimize them, you can see here it took about 17 minutes to perform on this cluster. And then, after my OPTIMIZE and Z-Order, my query went down to about 1.4 seconds. So if you think about before the OPTIMIZE and Z-Order, was about double the time, than it was to perform it after the OPTIMIZE and Z-Order. So that’s the type of effect you can have, on just leveraging OPTIMIZE and Z-Order. Now you might be thinking to yourself, is that really the best that it can get? Absolutely not, we need to optimize all those configurations, I haven’t done any of those here, this is just a simple default configuration for Databricks, right? So now what happens if I apply some of those tweaks to my configurations? So here I’m creating a new database, same step as before, so I’ll just skip over a lot of the intro material, but essentially this one is gonna be optimized. So what are we doing to optimize it? I’m using an instance type called C5D9XL, so these are compute optimized but they actually include locally cached fast disks and instead of doing auto-scale, I’m gonna do static, I’m gonna accept eight workers to this cluster. And then I’m gonna be activating a few of the optimizations that I talked about today, as well as some of the optimizations around OPTIMIZE itself. So, let’s take a look, I created all my tables, I created my functions and I processed all of my data. This time, it took 1.03 minutes. Let’s just go back. How long did that take on the previous cluster? 18 minutes, it’s a significant performance improvement. But what does this mean for… Performance is one thing, I might’ve had better nodes, is this really accounting for cost performance? Let’s take a look at something. The instance types I used for the previous step was I3XL and that’s the information about it, what you want to know is, it’s about $3.312 per hour, right? If I were at the same job that took 12 minutes running up to 20 minutes and 2 minutes at this point, even though it really wasn’t 2. If I look at cost performance, right? Running it on the better cluster type actually is cheaper, if you consider execution time, ’cause you remember folks, in the cloud, it’s all you pay, everything is metered, so you pay infrastructure over time, so if I can finish faster, it could be cheaper and in this instance, it was about half as cheap. So you wanna make sure that you solve for the right things, not only did we solve for time, but we also solved for cost. So let’s take a look at what the execution time was for doing this, this query we just inserted the data, right? And we’ve turned Auto Optimize on. And here we see before we even run OPTIMIZE yet, we’re getting performance of about 1.4 seconds, which is actually pretty decent. So let’s actually optimize and Z-Order those vital keys and see if we can improve performance a little bit. And what we get is performance going down to about… These are nanoseconds, so it’s less than a second, but 332 nanoseconds, so we’re talking a significant performance improvement. So this is great. So we covered ETL performance, cost and time performance. Now let’s take a look at some SQL analytics use cases, like actually querying the data itself and some of those optimizations. So here what I’m doing is I’m using those optimized tables and I’m saying select the data, so this is before doing any of this configuration items that we talked about and first run on this, it took about six seconds, so, 6279 on this query. So this is just a cold boot, it’s not using Delta cache, I removed the cache beforehand and that’s how long it takes, not bad, can we get any better? So this is an optimized cluster, so using those optimized Delta Lake files that I generated in the ETL optimize step, now I want to analyze that data, right? So here I’m not using the C5Ds, I’m using the I3XL, it’s a very important thing to keep in mind and then I’m doing all of those other configurations. So, before the Delta cache is even loaded, I have about a second, this query took about one second to execute, which is pretty good if you’re thinking about a cold boot, right? So that’s what those configurations really gain, it’s really good instantaneous query execution. And then now what happens if I leveraged some of the Delta cache? Wow, so I’m getting down to the 300s again like I did before. So the key here is that the Delta cache is really, really important and even if you’re not using a bigger instance type, even if you have locally cached fast disks, which the Delta cache-accelerated types have, that you’re still gonna get good performance. So that’s why I always recommend to use the right instance type for each type of operation, when we’re talking ETL and OPTIMIZE, compute optimize instances with locally cached discs are really good. Thank you everyone for joining me today and I hope you have a great summer.
Franco Patano is a Senior Solutions Architect at Databricks, where he brings over 10 years of industry experience in data engineering and analytics. He has architected, managed, and analyzed data applications both big and small, with open source and proprietary software, utilizing SQL, Python, Scala, Java, and Apache Spark, as well as experimenting with data science. Prior to Databricks, Franco worked as a Data Architect and Analyst in the Commercial Real Estate, Banking, and Education industries for organizations large and small.