Achieving Lakehouse Models with Spark 3.0

Download Slides

It’s very easy to be distracted by the latest and greatest approaches with technology, but sometimes there’s a reason old approaches stand the test of time. Star Schemas & Kimball is one of those things that isn’t going anywhere, but as we move towards the “Data Lakehouse” paradigm – how appropriate is this modelling technique, and how can we harness the Delta Engine & Spark 3.0 to maximise it’s performance?

This session looks through the historical problems of attempting to build star-schemas in a lake and steps through a series of technical examples using features such as Delta file formats, Dynamic Partition Pruning and Adaptive Query Execution to tackle these problems.

Speaker: Simon Whiteley


– Hello, and welcome to another Data+AI Summit. My name is Simon Whiteley. Hello. So I work for AI company in the UK who does consultancy around advanced analytics. So all sorts of data engineering, data science and that kind of thing, largely in lakes. And I didn’t start off in lakes, I started off in the world of warehouses and BI and all that kind of star schema type stuff. And jumping into the world of lakes is very different. And recently, only in the past year or so, we reached a turning point, where now actually all those things that we used to do in a warehouse, you can now start doing them in the lake. And we call that the data lakehouse, and you’ve been hearing a lot about it in this conference. And that’s what we’re gonna talk about today. We’re gonna talk achieving the lakehouse using Delta and Spark 3.0. So let’s get started. Okay, so we need to cover a couple of things. So firstly, why lakehouse? What this thing is, why we’re even talking about this approach paradigm technology. We’ll talk Kimball. So why traditionally you couldn’t do Kimball, why people say you can’t do Kimball in a lake, what the problems are that prevent you? And then there’s three main bits of technology I wanna talk through. So just having a look at SCD merge, you know, how do we do a slowly changing dimension, that core fundamental tenant of gimble in the Spark ecosystem. And that’s actually Delta, not just Spark 3.0. Well, then have a look at dynamic partition pruning, which sounds really boring, but it is so fundamental to how people have to work with Kimball, and how you get people from the warehousing world to work inside a Spark. We’ll talk about AQE, as the Adaptive Query Execution, which one of the big key features in Spark 3.0 in terms of how you get performance to work without having that deep level of knowledge and kind of spark engineering, and how to bring those people and get that so that people from the warehousing background can work effectively, and just work the way they always did without worrying about performance. And then we’ll talk about how we put it together. So what that means for the lakehouse? And why the lakehouse is such a big thing for that kind of person, the warehousing kind of person. Okay, so let’s start off the data lakehouse, this whole approach, what what is it? Where did it come from? On how did we get here? ’cause it’s kind of important to notice, there’s been a few shifts, one two misfires, in terms of how we actually got to this point in time. Okay, so we started off in the world of warehousing. And then there’s kind of this knee-jerk reaction we shifted and went into lights. And now at that point, lakes weren’t that mature, but they solved a problem. And we’re dealing with CSVs, and then we got a bit more mature, we move to Parquet. And now we’re talking about being more mature than that and moving to Delta. And Delta is that key thing that has enabled the Lakehouse approach. And it’s that kind of move that kind of evolution, which are kinda what we’re talking about. So in that lake area, when we’re talking about CSVs, CSV is a schema on read, they don’t have that structure. Trying to build a fairly kind of robust managed data model using just say CSVs was painful, is really, really hard. And now we’re moving to slightly more mature environment got parquet which provides structure, we have the structure embedded inside the file. And it means that we can actually build out fairly fixed, fairly managed data models, but then doing things like writing SQL, doing updates, doing merges, there’s lots of things that we can’t do. So Parquet was fantastic at legitimizing the data lake, but in terms of making it accessible to people from outside of the Big Data ecosystem, it wasn’t, it was really hard. And so Delta gives us that maturity, that manageability, the accessibility for people coming from that outside world. So that has been huge in terms of how do we actually talk about lakes. But the real big thing for me is that shift from people going from warehousing to lakes, where people going warehouses are too slow, they’re too inflexible, they’re too hard to manage. Let’s build a lake which is super flexible, super scalable, super cheap. But actually it was kind of like it was a moving away from warehousing. And what we’re talking about when we’re saying data lakes, that’s not the same when we’re shifting to the lake house. We’re not saying let’s abandon lakes and start something new. Instead, what we’re saying is actually, we need to be looking at a combination of these things. So it’s this idea of saying not one then the other then the other. It’s a actually, when we’re talking about a data lake house, or we’re talking about saying, we need to have a lake based platform that has all the good bits, all the excellence of data warehousing, all the maturity and the learnings, and the tradition of warehousing, applied inside that lake structure. So we’re getting the best of both worlds. And that’s not a new story, we’ve been trying to achieve that for quite some time. Okay, so this thing, the modern data platform, isn’t it we’ve been building for years? It’s this idea of saying we wanted to have the best bit of a lake and the best bit of a warehouse, and so we build a solution that has both. We kind of pile both of them into the same thing. And all of our business users, all of our people in that data ecosystem are working in different sections. So we’ve got our early data preparation part, using Databricks, getting data in a way that our data scientists can be involved. So they’re at the top, they’re working in database in the modern data warehouse. And they’re doing their flexible, interesting ad-hoc data exploration, all that stuff. And then we’re taking data out of that lake, and we’re putting it into a warehouse, where our BI developer, our analysts, and all those kind of like traditional BI people get involved, building out a warehouse, building out reporting systems, which they then expose to our end users right at the end, going through BI system. And that’s fairly common that we’ve been doing it that way for quite some time. And the shift to lakehouse is taking that and saying, Well, actually, there’s a whole step there that maybe we can combine. So this is our lakehouse approach. This is saying, we’re no longer talking about a lake layer and a warehouse layer, we’re talking about a single layer that can achieve both. So it’s saying, Well, actually, our data scientists or data engineers, or bi analysts, they can all use the same platform. We’re doing traditional warehousing, and we’re doing ad-hoc next generation advanced analytics, all from the same platform. And then we’re also using that to serve out to our report consumers, to the people who aren’t that data savvy, and just wanna drag and drop and slice facts using some dimensions through a BI tool. And it’s fundamentally important that, that tool can actually be accessible to all those different users for that approach to be successful. And that’s what we’re talking about is about how do you actually achieve that challenge, taking all these different people, taking all these different ways of working, and saying we’ve got a single platform that can do all of them. And historically, it’s not been possible, there have been some barriers, which have stopped us in the past from achieving this approach. So let’s talk about it. Let’s talk about what those boundaries are. And there’s a few different things, it’s kind of like just, there’s a few different warehousing activities that people would normally try and do, which is just a lot harder in traditional Spark, or at least it was a lot harder in traditional Spark. And that’s why it can be really excited. Now, there’s a lot of these problems have now been fixed. So we can talk about these challenges. But then for each challenge, we’re going to talk about well, this is how you overcome it these days. And big, big part of that is Delta and Spark 3.0. Okay, so the traditional warehousing approach, when we say we’re gonna build a warehouse, we do this, we build a star schema, honestly, and Kimball is so baked in to warehousing, or maybe you’re doing data vault or , whatever your BI preference happens to be. But this idea of saying, I’ve got a huge, big, narrow, but deep fat table, and I’ve got a whole constellation of dimensions. And then I’m writing queries, which join off that fact table, they filter it by those dimensions, but it’s so common in the world of warehousing. Now, when you take that pen, and then you say, wait, I wanted to take just that data modeling approach on all of the good things around slowly changing dimensions, about degenerate dimensions, junk dimensions, all of that real mature thinking about data management, and say, “I’m gonna go and put it into a lake”. And there’s a common thing that people say, that is, “you can’t do Kimball intellect”. And it’s Angel, everyone says that, that’s not a real quote, don’t quote me on my bad foot. But it’s just a common knowledge, people just automatically go, yeah. But you can’t do that in a lake, because historically it led to bad performance. It just meant that, you know, you try and do things and you were taking a square peg to a round hole, you were taking a set of approaches built for relational engine, and trying to apply it to something that no longer has the same performance constraints. And that’s why historically, people have kind of written off taking that approach. They’ve said, you just don’t do that, that’s not something that we try and do, because it’s not meant for it. And yet, if we’re saying lakehouse, if we’re saying we’re gonna open up this world, for our BI analysts, for our report consumers, for this whole wealth of data specialists, then we have to talk about how we take those principles and apply them, how they can work in the same way that they always work, but in our lake environment. Okay, so these are the three historical challenges. And the irony is our first historical challenges is tracking history. So slowly changing dimensions, this classic Kimball idea of saying, I want to apply an upset essentially, I’m getting some change data and applying it to a table. But I want to keep both copies of the records, the one before the change the one after the other change, and be able to write queries that tag different records back to different versions of that same dimensional record. it’s huge that is so fundamentally core to part of what we do as Kimball. And that used to be quite hard to achieve. Trying to write that in Python is completely fine, you can do it, is just it, take so many steps, it’s convoluted. So I’ve got filtering dimensions. So again, the idea, I’ve got my big fact table, I’ve got all my little dimensions, and I just wanna say, filter that dimension and apply it to the factor. Now you can do that. Absolutely. That’s just an easy Spark query. But doing it in such a way that it’s performance, that it hints partition filters, that are actually it relates a good query, and you’re not having to drop down and start writing out join hints and partition logic and all of that kind of stuff yourself, making it easy for people to do that same activity that they’re going to do whether or not you want them to do it, they’re gonna do it, how do you make that perform properly? And then general secret performance? How can people just take any query that they normally write and have it just run better without having to have a huge amount of underlying knowledge? Okay, so let’s talk our first example. So slowly changing dimensions, this idea of building history tracking inside a table. Okay, so this is a fairly critical thing, I’ve got my table at the top, that’s my original table with my original records. And I’m applying a change. I’m saying I want to come in, I’ve got a new record, and I want to mark the old record, as updated, have changed the current flag, so it’s now getting marked as it’s no longer current, and I’ve given it an end date. And I’ve added my new record. So anytime someone query says time, they can choose which version of history they go to. That’s all SCD is, in this case, this is type 2, managing the history inside our table. And it’s so fundamentally common to everything that is warehousing, that everyone will know it. But doing that in Python, means I need to read my entire table up into memory, I need to perform a join and form that logic. I then need to write out those changes back down to disk, and then need to kind of switch out the old file with the new file. And it’s just this little circle of logic, which is not hard. But saying to someone coming from a warehousing background, “oh, it’s fine, you can do it, but you have to go around five or six different steps”, makes it unapproachable, makes it hard to adopt. So the big thing that’s been enabled now in Delta is the merge functionality. So here we have a straightforward merge command. And I’ve used the SQL command here. Now this same merge command is available in Python and Scala. So as a data engineer, I can write something using a Python based merge command. And I can throw a load of metadata to it. And I can have the same command able to merge in any data set to any other table with a whole variable driven thing. But that’s not why we’re here. We’re here to say, my BI analysts, my people who are warehousing experts, my data modelers, they can take lift and shift a lot of the existing code they’ve got, a lot of the existing merge commands, and they can use those same commands that they’ve been using in their SQL warehouse, those same commands work. And that is huge. Again, it’s all about accessibility. Now, again, this isn’t the Spark 3.0 thing, this is all to do with Delta. So you, if you’re using Delta as your file format, suddenly all of this becomes enabled, you can suddenly do all of this stuff, just by adopting Delta. And again, that is huge. Next example, I’m gonna talk about dynamic partition pruning. So how do we actually work out partition filters have got partitioning to work when we’re talking about a star schema, when we’ve got a big giant , small dimensions, historically, that was really, really bad for performance and spot. And this is a big change, this is a big thing that has changed in Spark 3.0, to enable us to do this kind of activity. Okay, so classic Spark Partitioning. We’ve got a giant folder, it split into sub folders for a hive style partitioning. So we’ve got month equals one month, equals two month, equals three, etc. And you can see we’ve ran a query saying select star with month equals three. And that’s come down, and that’s isolated the single file, the single folder that contain the data that we care about. And that’s just baked into spark that is old hat Spark, the way it’s always worked. We all know and love hive style partitioning. And that’s great, ’cause essentially, we don’t have an index, that’s our only way of really adding performance to a Spark query is to make sure we hit that partition filter to make sure we’ve included that partition column in our search predicate of our SQL query, so that it goes and limits the data to what we need, that’s really, really important. However, it starts to fall apart when we talk about having multiple different tables in the mix. So that’s great for a single type guy in this example, we’ve got up partition fact table and we’ve got a dimdate, we’ve got a dimension table. Now if we’re using Pre-Spark 3.0, then if we do this query, we can see at the top, we’re now joining to our date have ignored the date, the Detroit criteria, but it’s fine. We’re joining to our date. And we’re adding a search predicate on the date table. So we’re filtering the joined table, not on the main fact. Now in that case, it won’t realize it’s coming across to try and partition effect, and therefore, you’re gonna get a table scan of the entire factor. And that’s obviously not great performance, that’s fairly terrible. And that’s just one of the accepted reasons as to why you don’t build a star schema in a lake, because dimensional filtering has historically been terrible. Now, I’m glad to say that’s one of the things that’s now fixed in Spark 3.0. So we’ve got this thing called dynamic partition pruning, which is attacking that particular problem. Okay, so now, that same example, that same query over in Spark 3.0, we’re gonna filter through date dimension, and it’s gonna pass that across, it’s going to realize, Oh, actually, the filter you’ve applied to the date, that means that we don’t need all the fat table, therefore, we’ll just bring back the partitions that we need. And that happens automatically, it’s just out of the box. And that is absolutely huge, in that you can just run your same query with Spark 3.0. And it’ll just run that much faster, because it’s gonna hit partitions. And the amount of time I have spent trying to trick users into hitting the right partition column just by how I’ve named it, how I’ve structured it, where I placed it in the table, and that is kind of a thing of the past, people can actually have a nicely modeled data set with a central fact, and then dimensions and filter dimensions, and that’s gonna work in a performant way, then suddenly, that becomes so much more approachable, somewhat accessible to all of our user base. Okay. Finally, the last kind of major area is the Adaptive Query Execution. So this is when we’re talking about how are we attacking just general SQL query, people coming in and being able to write the queries that they would normally write, without having to have a real deep specialist spark performance knowledge. And there’s a few different bits and pieces that AQE actually does for us when it’s turned on in spark 3.0. Okay, so three main cases talking about coalescing shuffle partitions, and that’s all about when we’re actually inside a particular job. So we submitted the Spark job, it’s worked out the various different stages that has to go through. And in each stage, we’re having a variable number of body D-blocks, how many tasks are needed for each of those stages of our main job? And that’s kind of getting into the kind of spark performance and execution performance tuning. Now, what this does is, it looks at it all the way through, each time it finishes a stage, it checks and says, it’s a smarter way I can be doing this, actually I need fewer partitions at that point, I need fewer RDD blocks, fewer tasks. And we’ll all go through all the way through making sure that is optimized, making sure that is actually performing each query in the most optimal way. Now, we could always do that, but only by manually interjecting, by telling it at various different states. Actually, we partition by this, coalesce by this, I want you to go down to two on the E block. So I want you to re partition by this column. And we need to know that. So we need to have upfront knowledge about the size of our data, and what needs to be done in terms of performance. And that’s a big ask for people coming in from the SQL community who don’t necessarily have that background in Spark Engineering. From a switching join strategies, so I was expecting to a sort merge, oh, it’s actually a lot smaller than I thought it was I can just do a broadcast, and that is huge. I’ll talk about optimizing skew. So in the middle of a query, if it realizes one of the RDD blocks is way, way bigger than expected, it can actually adapt and change its query plan. And it’s the first one I wanted to talk about quickly the change of coalescing shuffle partitions. Okay, so this is the classic example, right? I’ve got a couple of tasks. And then it doesn’t shuffle because I did a join, because I had a group by whatever paused my data shuffle, and it goes to 200. Now in old school spark as in 3.0, that was our default shuffle partitions number, and that’ll just force it to 200. So I’d write a query that only needs a couple of different RDD blocks, and it will force it to 200, which is almost never the right answer. And then you’ve got to force it back down when I’m writing out in this specific format. And that knowledge of what that’s doing why that’s actually happening inside the query, is a big asked for people who have fairly fresh into spot. Now we’re gonna see after AQE. So as soon as we’re in Sparks 3.0, you can turn AQE, on the Adaptive Query Execution, from Databricks Runtime 7.3, looks like it’s turned on by default. So you don’t even need to know this. This is what’s gonna happen, is that when you run that same query, we’ll be looking at all the way through and going, do I need that, oh, I don’t need 200 there. Actually, I can get away with two or six or eight or whatever aligns right to the cluster at the time. So it’s gonna dynamically change what it’s doing to improve your query, without you doing anything. And again, that is huge. So you can just approach this right? Run the same queries you will always run without underlying knowledge of exactly how you need to shape it to avoid some of these column performance pitfalls. So essentially, you can just write some SQL on it, and it will work a lot better. And again, it’s all about making it more accessible to people coming from that warehousing in that SQL world to be able to deal with this kind of stuff. Okay, so here we’re looking at SQL merge first. Now, this is on bedrich Runtimes 7.3. And most importantly, we’re using a SQL notebook. So this is entirely SQL. And we don’t have to expect any Python or anything else in here. Now to start off with, we’re doing a quick little cleanup. So we’re getting rid of an existing table, so we can start off cleanly. But the main thing is, we’re gonna go and create a sample table. So I’m using data and AI addresses, few columns in there talking about addresses. The main thing is we’ve got those SCD columns they’re slowly changing dimension columns. Now we’re using the Delta format, that’s very important for being able to do the merge. And it’s going into our location lake. Now we’re inserting three quick records, again, with a primary key and address record. But most importantly, those three bits of slowly change dimensions. So saying true, it’s currently an active record, it’s got an effective date, and it doesn’t have an ends day. So real simple, we’ve just setting up some sample data. And we can run that we can have a look at what that actually looks like. And just see a real straightforward sample bit of data that we can then work with. Okay, well simple, nothing particularly special there. Now, what we need to do is do an update to that. So I’m just creating a little temporary view called updates, and got two dummy records in there. There’s one for 11, which is the existing record. So we wanna do an update to that record. And we’ve got some STD information within this is gonna be true, we’ve got a new effective date. And then we’ve got another record, record 99 doesn’t exist currently, so it’s gonna be an insert. This is a straight new record. Now I’ve got a bit of a weird query. So I’m saying give me the results from that update table. But also join that updates table to my existing table so I can work out, I’ll have two records for anything that isn’t update. And that enables me to set the existing records, set them as closed, set their end date and not worry about it. So I’ve got like two copies of everything is going to be new. But that’s fairly common in terms of how people work with merged statements. Okay, I’ve then got my SQL merge. So I’m saying merge into my address table, I’m using that same query, so that’s my union query to get two copies of the update data into my merged updates. I’m joining it based on primary key to a merge key, so I can control when or not it matches. The ones that do mention that are currently current, we can update the SCD information. So saying set the current defaults at the end date when your effective date and closing off those historical records. And I’m inserting everything else, so all my new records, and the new copies of those updates, I’m inserting as new records into my SCD file. Okay, so we just run that. And it’s so classic for how people come into the warehousing background, people working in SQL Server work. So we can see for record 11, I’ve got my brand new record which is now effective and has no end date. And I’ve got my old record which has been closed and has an end date. I’ve also got record 99, so got my new record, real straightforward merge, but that enables so much for people coming from the warehousing background. Okay, we’ll switch over, we’ll start on have a look at the dynamic partition pruning. So we’re using NYC Taxi everyone’s favorite sample data for big data experiments. And let you do a quick describe let’s have a look at that table, loads of information in there. And then if we scroll down to the bottom, we can see we’ve got partitioning in there. So that big, big big table is partitioned by year, month and day. Now, if anyone comes in queries that, they’re gonna have a real slow query unless they write a query that includes year, month or day as a search predicate, until dynamic partition pruning came in. So now we can actually write this query. So I’m saying select from NYC Taxi, doing a join to my date dimension, on my pickup date, calendar day. And I’m including a filter based on calendar month. So I’ve got a search predicate, which is not a tool related to my fact table on a completely different table, I’m going through that join to evaluate. Now because of dynamic partition printing, this is hitting my partition filter, which is great. So we have a look at the Spark UI, we can go into the associated SQL query and have a look at what that’s actually doing, which gives us such a nice small query. So first thing it does, is going and doing a quick scan of our date dimension. So it’s reading a few files, it’s working out what’s included in there, it’s doing that filter, so and then works out well, how many were in that month. It then takes that and does a quick exchange, and then uses that filter date table to determine which partitions from my fact table I need. And that’s automatically done that as part of my query. So you can see, we’re gonna read 900 and so files, it’s actually reading 54. It’s done partition pruning as a part of this query, without me having put a search predicate on the partition key. And we can go back and look at another way, we can write an explained plan, just to say what’s actually happening in here, what’s going on in terms of my solution plan. And we can see a few things in here, we’ve got this dynamic partition pruning, dynamic pruning is now part of my explained plan inside the partition filters on that particular tape. That’s with no code changes, I’ve just written a straight query. And because I’m using Spark 3.0, it’s understood that. So in AQE, you’ve got similar idea. So this is before AQE, this is an old runtime. And we’re doing a query which is year, month, day, again, from our NYC Taxi. And I’m doing a group by. And again, you can see it’s built this job into a couple of stages, about 50 tasks, one stage 42 tasks and another stage, and it’s going through any time to do a fair amount of work together. So it’s crunching through a lot of data. Now, if we actually take that, and we switch and say, well, what happens when we turn AQE on, we should see an drastically different thing. So running that exact same query, this time on Databricks 7.3, we can see we just got one stage, and it’s got 16 tasks in there. So by having AQE turned on, it’s just actually made a vast difference to how quick that query runs, because it’s changed what it’s doing, it made a different plan. Okay, so those bits and pieces all come together, and all spells that same message of making it accessible. So whether we’re talking merges and the ability to bring that existing code that we’ve already got, and run it in that same place, where we’re talking dynamic partition pruning, and AQE, both of which just give massive performance benefits, to people working the way they normally would. They’re not having to change their behaviors, they’re not having to re learn all the things that they used to know, and actually, as it worked in Spark? How does it work in that, do I need to learn a load of piping? Do we need to consider all the way through my query, when I should coalesce and re partition and all that stuff. Now meeting that goes away, we’re still gonna have the problem when they’re trying to do really, really big, problematic queries. And then I’ll need some specialist performance tuning. What we’re trying to say is that 80% of queries, people are tryna write in SQL that are just doing your normal BI style activities, are gonna get a lot faster, they’re gonna just hit a lot of these things, which are safeguards against performance pitfalls. And so we’re talking about this familiarity of SQL, take what you know, apply it inside this lakehouse, means we can do that we don’t need a separate system, just for the SQL people to be able to bring their own stuff. Means removing technical barriers. So we’re saying you don’t need to learn a ton of info about partitioning, a ton of background about RDDs and tasks and stages and how it’s working inside that. Sure, you’ll be a better Spark developer, if you do know that stuff, absolutely. It’s not gonna go away. However, your report consumers, your people accessing it through BI tools, the kind of people who aren’t that data savvy, we’re now saying actually, this is gonna be accessible to them. And if they can do their normal warehousing activities, if they can use and consume reports in the same way they always did, then adoption is gonna be a lot better. And that’s a key thing behind the lakehouse model. So Spark 3.0, and Delta through enabling all these common behaviors, through thinking about what people who are warehousing people who are BI people, what are they going to build when they adopt this platform? Suddenly, it all becomes a lot more accessible. It’s about accessibility. And that is how we make a successful lakehouse.

Watch more Data + AI sessions here
Try Databricks for free
« back
About Simon Whiteley

Advancing Analytics

Simon is a Microsoft Data Platform MVP, awarded in recognition of his contributions to the Microsoft Data Platform Community. Simon is a seasoned Cloud Solution Architect and technical lead with well over a decade of Microsoft Analytics experience. A deep techie with a focus on emerging cloud technologies and applying "big data" thinking to traditional analytics problems, Simon also has a passion for bringing it back to the high level and making sense of the bigger picture. When not tinkering with tech, Simon is a death-dodging London cyclist, a sampler of craft beers, an avid chef, and a generally nerdy person.