Building Lakehouses on Delta Lake with SQL Analytics Primer

May 27, 2021 03:15 PM (PT)

Download Slides

You’ve heard the marketing buzz, maybe you have been to a workshop and worked with some Spark, Delta, SQL, Python, or R, but you still need some help putting all the pieces together? Join us as we review some common techniques to build a lakehouse using Delta Lake, use SQL Analytics to perform exploratory analysis, and build connectivity for BI applications.

In this session watch:
Franco Patano, Senior Solutions Architect, Databricks

 

Transcript

Franco Patano: Hello everyone, and thank you for joining me today. My name is Franco Patano and I’m a senior solutions architect with Databricks. I’m here to talk to you about building lakehouses on Delta Lake and using SQL analytics for an analysis NBI. This is sort of a primer, so you’ll be able to take everything that you learn today and hopefully build lakehouses of your own. I like to start these things out with a quote. “If you believe it will work out, you’ll see opportunities. If you believe it won’t, you’ll see obstacles.” Keep this in mind when you’re working through some of those complex merged statements.
What are we going to be talking about today? First, we’re going to be talking about what is lakehouse. Then we’re going to cover the Delta Lake architecture. Then we’re going to review the optimizations for the Delta engine. And I’m going to show you the SQL analytics product and how to use it for analytics. And then we’re going to have an implementation example where I show you how to use everything that we’re talking about today and actually implement a lakehouse of your own. And then I’m going to cover a concept I like to call frictionless loading. Your feedback is important to us. So remember to rate and review the sessions after you’ve seen them.
So, what is this lakehouse thing? It’s one platform to unify all of your data in AI workloads. But what does this really mean? Essentially, what we have found is that when working with data teams across these organizations, they have a lot of data. This could be a clickstream data, it can be point of sale data, log data, data from databases, or even other applications like Salesforce, Image Data or Internet of Things. And essentially, data teams have all this data and they need to bring it into their data lake or data management ecosystem. And they could be streaming it or doing batch loading into this type of architecture. And so what they need to be able to do is use tools like Scala, Python, or SQL to load in unstructured, semi-structured, or structured data into their data lake. Then, they need to create an optimized query layer for data science and machine learning and analytics.
And traditionally we call this the silver layer. So you would clean up the data from bronze and you would use the tools and all of these different libraries and packages to derive value out of the data. And then, your business users need to be able to report and analyze this data. And so they would be using tools like SQL analytics, Tableau, or Power BI or Looker to be able to analyze all of that data. And all of these operations work on Delta Lake, which is an open source format that uses Parquet with a JSON transaction log on top of your open data lake on S3, ATLs, or Google cloud storage. And the open source variant works on HDFS.
So what is this lakehouse architecture pattern with Delta Lake? Essentially, what we see a lot of these organizations doing is loading the data into the bronze layer. And the bronze layer is broad data and you don’t really want to apply a lot of changes here because you want to maintain provenance to your source. So you land the data raw. So you always have a record of what happens to the data. Then we’re going to clean the data up. And this is where a lot of data wrangling or data munching happens. Essentially, you’re going to clean up nulls, bad dates, and handle text fields. And a very common techniques to do with streaming is to multiplex your stream with a lot of different tables. And so here we see a lot of Demux or Demultiplexing the streams and fanning out a lot of different tables.
And then finally, if you’re using systems that have funky names for these data fields like JD Edwards or SAP where the fields don’t make any sense, you want to apply friendly names so your business knows what they’re querying when they’re inside of the system. Then you want to create a layer that all these different tools can do really fast analytics on. And so this is where we see in the gold layer, a lot of analytics engineering, business, business model creation, and you’re going to create aggregates for visible dimensions. And here’s where we’ll apply business friendly names. So most often the friendly names in the silver layer differ from what the business refers to them as. So we want to make sure that this is a business friendly layer. And finally, we’re going to create common logical views for this data so that it can be queried relatively performing and understandable by the business. And so essentially what happens is data becomes more usable as you go through this architecture pattern.
So let’s talk about some optimizations with Delta Lake. The first thing that you should understand is that in order to improve performance on these data lakes, we essentially had to apply some data management techniques from databases and data warehouses. First off is stats collection. We need to collect statistics on the data inside of these tables and in the files in order to be able to do things like data skipping. Data skipping is a really important performance metric because essentially if we don’t need the data that’s in certain files, we can just not load them into the cluster, which means the user gets better performance. What you need to do is you need to account for the fields inside of the table. So numericals and keys should be moved to the left of the 32nd position so that we collect stats on those columns. But long strings and complex data types are kryptonite to stats collection. You’re going to consume a lot of compute and not get any real yield at query time or ETL time on those stats collection.
So we want to move long strings or complex data types to the right of that field so that we’re not wasting compute and not getting any yield out of it. The next optimization in Delta engine is optimize and Z-order. Optimize will bin pack our files so that we get nice even chunks, so when we read them into the cluster, it’s performant. Z-order will organize the data inside of these files so that when we do data skipping, it’s more effective. When should you be using Z-order? Really good candidates for columns to include are keys, primary key, foreign keys on dim and fact tables, ID fields that are used to join up different tables, and high cardinality query predicates that would be included in WHERE clauses and SQL statements.
So most often what happens next is people ask, “Well, how should I think of partitioning in Z-order? Are they the same? Are they different?” They’re a little different. In the future, you probably won’t have to reason about partitioning. Databricks is working on that. But essentially, partitioning is really effective towards regular cardinality. Essentially, you want to create even chunks of data files to consume to the cluster. Z-order is more effective on high cardinality fields, fields that are unique and can be ordered when you organize them. And so numericals, phone numbers, email addresses, these are good candidates for fields that would be really good with Z-order. This doesn’t mean that Z-order wouldn’t be effective on lower cardinality fields, it just means that it’s more effective with high cardinality fields.
So how you should you be thinking about these layers and what performance steps do we have for each layer? Essentially, when you’re loading the data in raw into your bronze layer, if you’re loading files, wherever you drop those files in that directory, that’s your bronze layer. And so that’s really all you have to do. But if you’re streaming, you’re going to land the data in Delta raw format and then we’re going to turn off the stats collection by setting that configuration to zero. Most often than not, we find that collecting stats in this layer isn’t really valuable downstream. And what we want to do is when we’re merging into the silver layer or the next layer of our data, we want to optimize in Z-order by the join keys that we joined up between bronze and silver. That’s going to make the merge more effective.
Then in the silver layer is where we restructure the table to account for that data skipping index column. And this is where you’ll move numericals to the left of that 32nd position, move long strings or other complex types that won’t take advantage of stats collection to the right. And then you also want to turn on a Delta cache enabled instances for these types of operations. For ETL, you’re going to get a performance improvement on those operations by using this cluster type. Now you can enable Delta cache on any cluster instance. It turns out that even hard drives with spindles are actually a little bit faster than reading back to object store for all the operations, but your mileage may vary. The key here is the faster the local disk [inaudible] BMEs, the better their performance for Delta cache.
And then finally, in the gold layer, what we want to do is optimize in Z-order by our joint keys in the gold layer. So if we have a demand factor or dimensional model, we want to optimize in Z-order by those keys or high cardinality query product gets anything that’s included either in a joint or an aware clause to filter data. And then there’s another property called the Delta staleness limit. Whenever a query is issued against the Delta table, we cache data on Delta cache enabled instances. And every time you issue a query, it’ll check the validation of that cache. Well, if your orchestration only loads the Delta tables once a day or once an hour, you can turn the stale in this limit up into align with that orchestration and your analysts would perceive better performance. But in the background, it will refresh the cache if any tables have been updated. So it really is a good way to kind of have best performance for your analysts and not have a random slowdowns effects that are queries.
And then finally you seek one analytics for your analytic analytics use cases. Your analysts are going to get great out of the box performance for the SQL queries and don’t have to reason about all the configurations required to get good performance at runtime. So what is the SQL analytics product? Essentially, it’s a way for SQL analysts to get a native experience on their data lakes or what we’re calling lakehouse in an environment that they feel more comfortable in. And we’re going to look at it in a little bit, but essentially they get much better price performance on their data lakes, and they’re able to use all of their BI tools that they like to use in order to get really good experience and performance and reliability on your data lakes.
So why did we create the SQL analytics service? Well, what we found is that over time more organizations are standardizing on data lakes for the data strategy. And what we found is that over 40% of the queries that are issued on Databricks are actually in SQL. Most analysts don’t feel comfortable in a notebook environment issue SQL queries. They much prefer a SQL editor like experience. So that’s kind of what we gave to the users. What you get is this SQL native experience, where on the left-hand side, you get a Schema browser and you can browse all the metadata databases and tables and to create your SQL statements. You get the SQL [inaudible] box on the top right and you get an output window on the bottom right. And essentially this is a much more native experience just to SQL and data analysts, so they feel much comfortable here. We also have a feature called alerts where any query that you have, you can set a refresh schedule on and then you can evaluate a condition, and if the data changes or meets your evaluation criteria, it’ll send you a notification.
You can do this by email, or there’s a few other tools that you can extend to. You can look at more in our documentation. So how do you administer and govern this environment? Essentially, we have a few new parts of this product that make this much simpler for administrators. First, you get a SQL endpoint management screen that allows you to simply create endpoints. Under the covers, endpoints are just clusters. Here you give it a name, you choose a size, and you choose a couple of different configurations. If you want high concurrency on these endpoints, you can enable multi cluster and load balancing. If you’re interested in low latency on these endpoints, you can enable the new vectorized execution engine that internally we call photon, it’ll probably have a different name, but you turn these features on if you want to enable that type of functionality.
So at a high level, how is SQL analytics operating? What’s going on under the covers? Like we talked about, you get an excellent analyst experience and administrative controls to be able to monitor and administrate what’s going on in this environment. Then you have optimized SQL endpoints that help manage all of the workloads and queuing of all these queries. They handle auto scaling or multi cluster load balancing to handle all of the demand that’s coming in. And we enable caching technology that enables faster queries in accounting for data lakes are great, but they’re abundant and cheap, but they’re really slow. So we leverage this different type of caching to have a better analyst experience. All of this data uses the data that’s on your data lake in Delta lake where it’s already curated like we’ve been talking about so far.
And this all works with the new product you heard, announced at the Keynote called Unity Catalog along with Delta Sharing to be able to share the data that you curate on your data lake. So how do we enable a good BI experience for these data analysts? And it’s not just one thing, there’s a stack of things. First, you have these users that are going to be using their tools, and these could be Tableau, Power BI, Looker, or SQL analytics. We’ve worked with these vendors to optimize their connectors so that they’re simple and performing. Next, we worked with Simba who writes the drivers that are used to connect to all these different components. We worked with Simba to optimize their driver, to get better throughput over internet routes. Then this is where we have the SQL endpoint technology. And essentially, we created a layer on top of clusters to manage the complexity of the clusters in a more simple way. So it’s a much more efficient mechanism to manage the compute.
Then we’ve modified the query planning and the queuing so that the queries are able to be scheduled more efficiently on top of these endpoints across all of the different clusters that could be computing them. And then finally, the execution. This is where the new vectorized execution engine comes in. The actual execution on the [inaudible] is significantly faster because we’ve rewritten spark to get closer to the hardware and C++. And then all of your data is in the open data lake that we call Delta lake using the Delta lake architecture pattern.
So what does this mean? What can you expect for price performance? When we’ve compared this platform against other common cloud data warehouses, what we have found is that it could be up to four times better price performance than these other called data warehouses. Now, what I want you to take away from this is we’re not really trying to beat them by a whole lot, but what you should understand is that for the same price performance or better that you get from a cloud data warehouse, you get everything that Databricks offers, which is the complete data and AI package platform that we call lakehouse. So you’re not going to lose any price or performance opportunity with adopting the lakehouse architecture pattern.
So what are some common use cases that we’re seeing data teams adopt with this? If you’re connecting up your BI tools to your data sets on your data lake, you can use the SQL analytics enpoints to get much faster execution of your queries. Then if you need to do exploratory data analysis on your data lake for use cases where weren’t already accounted for in your data warehouse reporting or dashboards, and you have a business need that requires you to go look for data to solve a need, you can do exploratory data analysis really fast and perform it with the SQL analytics use query editor tool. And finally, because you can use the JDBC ODBC driver to connect to any application, but we’re seeing this prevalence of data enhanced applications where you’re going to use all of the power of your data and your data lake to fuel your data enhanced applications.
What always happens next is we talk about data security governance. And here what you have to keep in mind is that it’s a very similar experience to database or data warehousing technologies. We leverage SQL table access control lists or also referred to as ACLs or ACLs. And essentially you can apply grant statements to the catalog and it inherits to the databases and tables underneath, or you can apply grant statements to the database and that inherits to all the tables and views underneath it. And essentially, this is how you can manage your data governance strategy on lakehouse.
So what most organizations reason about next is how do I apply all of these access controls to my user base? And traditionally, most organizations have adopted groups and roles in a managed system like active directory or Azure Active Directory. You can sync those groups and roles to Databricks using a tool called SCIM. Then you can apply the SQL grants to those groups and roles so that you have a similar type of experience that you would have in your database or data warehouse. And then when you set up the sync, whenever the business adds new users or remove users from those groups, it will just sync up to Databricks. And all you have to do is reason about the right grant statements on those groups and roles.
What are some common governance models that we see in the wild? Well, traditionally, you’ll have enterprise grade data sources. And these are typically managed by a central it organization like the ETL team or something like that. And essentially, they’ll create datasets where only they have access to modify the data. But any of these different user groups can select data or read data out of them. And these are the type of grant statements you can apply to that database. Then we have business units that might have their own data that are different from the enterprise grade sources. Here you can apply these types of grant statements to those databases where the department can modify the data and then all the users can select the data. And then finally, users have a need to ingest their own data sets maybe from the internet or they uploaded from their laptop. And essentially you can create data sets or databases that allow the user to ingest their own data, and then other users can select that data if they want to join against other data sets in the organization.
Now, you heard about the unity catalog in the keynote. And what this will do is apply a much more seamless experience to managing your data on the cloud. You’ll have a defined once secure everywhere type of architecture where it’s fully audited. So you have an audit log that is great for data governance controls, and you can define credentials for any external data sets, so if you’re connecting to external sources. And essentially this is a much simpler way to manage security in the cloud. Now, let’s talk about an implementation example where we take all of these things that we’ve learned today and we actually produce an implementation that we can use and see. What we’re going to be doing today is another benchmark created by the TPC organization. The benchmark we looked at earlier was a decision science one primarily for query serving. This benchmark is for ETL or data integration. And here we’re doing the analysis combination and transformation of data from various sources and formats into a unified data model representation.
Data integration is key to data lake housing, application integration, and business analytics. If you think about those use cases we reviewed, this will cover all those examples. What is TPC-DI? Essentially, it’s a data set produced from a factitious retail brokerage firm. It includes data from a trading system, HR system, CRM system, and even external data. And then we can use this data to do historical loads, incremental load so we can touch Bastion streaming and it even has different types of inputs and target types with inter table dependencies. The important thing to understand here is that technology is technology agnostic to run this benchmark. So it’s perfect for this use case. What are we using here at TPC-DI? It has two major components, the data generator and the data model. Essentially, the data generator produces files from a range of gigabytes to terabytes. So we can test this system against all different types of ranges of data. And it produces all types of files, CSV files, CDC, or change data capture feeds from database systems, XML files, and fixed with text files.
And like I said earlier, it includes historical and incremental loads. And then it has a data model where the transformation is documented sort of like requirements… Well it requires documentation for a data warehouse. And we can do data analytics with the dimensional model. Here is the implementation reference architecture. We have the file loads from all of these various systems. We’re going to do a concept that I like to call frictionless loading into the silver layer. And then we’re going to use the merge into command to merge into the dimensional model in the gold layer.
What is frictionless loading? It’s a concept that includes two major components of the Databricks lakehouse architecture pattern. The first one is autoloader. This allows us to load files from cloud object store with a thing that we call notifications. Essentially, you can put a notification service on top of a directory and it’ll create a notification for any new files added. And then we store those in a queue service on the cloud provider. So instead of listing directories, we read from a queue service, which is significantly more price performance on cloud data lakes. Then we use structured streaming, but don’t think of streaming as real time streaming or near real-time streaming. We can use a thing called trigger once for batch equivalency. And then it has tools to handle schema inference. You can even apply schema hints, and you can handle your schema drift because we all know that data changes over time and we need to be able to handle that.
Then Delta lake offers you a flexible construct to load your data with a streaming source and sync and something I like to call the ultimate state store, which is structured streaming checkpoint protocol with the Delta transaction log. Essentially, you don’t have to reason about where you left off. It keeps track of wherever you are. Then Delta lake has schema enforcement and evolution, a thing called time travel so that you can go backwards in time to see where things happened before. And you can optimize for analytics using those optimizations that I talked about earlier along with the new thing that we call change data feed that essentially allows you to employ CDC like capabilities on your data lake.
You should definitely check out the talk for the change data capture or CDF feed that we have. There is a talk on Friday for change data feed in Delta by our own Rahul, Etai, and Jose Torres. Essentially, allows you to improve your ETL pipelines more efficiently on Delta lake. And for the next generation of end-to-end ETL on Databricks, you should definitely check out the talk on Delta live tables from our own [inaudible]. So now that we’ve learned all these tools, it’s demo time. Let’s see this stuff in action. Okay. Let’s go into the Databricks workspace and see this in action.
So the concept that I’ve strung together that I call frictionless loading essentially consists of a few different notebooks. I’d created these notebooks to be parameterized so that if you want it to script them out and apply them in your own ETL paradigms, you absolutely can. First thing we need to do is capture the parameters that are passed into the notebook, these are called widgets traditionally, but you can also use them as parameters. I’m going to collect the database name for the database that we’re creating, the location of the data files. This can be anywhere on object store, I’m just loading them on DBFS for simplicity. And then we’re going to pass the name of the table along with the schema of the table. Then I’m going to create the location if it does not exist, and then will create to replace this table, this Delta lake table, with that location that was created so that we can access this table. Next, we’re going to go over the frictionless loading notebook, where after I have created all of my database stables, now I can load data into those tables.
So what I’m going to do here is pass in the parameters or widgets. The first thing is the checkpoint. So like I talked about earlier, the checkpoint protocol keeps track of where you left off in your files. So a very common thing that I’ve noticed here is that you would have a folder for checkpoints and the table name and then you can include a number. This is for incremental purposes in case you want to restart your stream, you just need to give it a new folder to do it and it kind of starts over from scratch. We’re going to give it the database name. We’re going to pass in the delimiter if our files are delimited by different types, if we’re doing like a CSV file or something like that. We’re going to pass the format of the files along with the path to the source of these files. So wherever the bronze layer, wherever the files are being dropped to.
And then we’re going to put in the path to the target table. So wherever the Delta table is stored on object store along with the table that we’re loading. So we’re going to take these parameters and load them in to use as variables into our notebook. And then we’re going to grab the schema of that table using this little trick. And then we’re going to use auto loader like I talked about earlier, to use the cloud files paradigm to read from that directory. And then we’re going to read the files in using all of the customization with the parameters and then load them into our destination. And this is essentially frictionless loading. We can seamlessly load data into our Delta lake tables. And now we’re going to talk about the merge process.
And so in this example, I’m going to be loading in the company dimension table. The company dimension table, obviously, I need to look at the definition of these tables. So this, I can use a described table command to look at what fields are in the table. Then what I’ll need to do is generate a job ID. This is very common in ETL workloads. We need to understand what job updated this data, and we have a little notebook to help there. A little trick that I like to use to kind of do source to target mapping is to grab the actual schema of the table and do a joint to see if any of the fields are named the same. So I kind of have a quick start into mapping. And here we find that there are some fields that map, but others that don’t. And so we can take that and do further mapping.
Next, I’m going to create a replace attempt view that has all the data that I want to load in. And then I’ll process my merge statement to merge my data from the silver layer into the gold dimensional layer. Look at that, it loaded 10,000 rows in less than 10 seconds. That’s amazing performance. So now that we’ve seen that, how can I actually use this data? So now we’re going to go into SQL analytics, and this is where your analysts can write queries. So you get the very similar experience to a SQL like a native experience with the editor. And here you can see I can execute SQL queries against the data that’s in my data lake. And I get pretty good performance. In about one second runtime, I can start querying data. I’ve already kind of looked at this data set, and I kind of essentially had to do a bit of cleaning on the data for one of these bronze tables. And so what I can do is I can analyze that and take a look at what my data looks like on the data lake without having to use another tool.
Interesting. So definitely don’t get divorced because it looks like these people have definitely taken a hit on their income. It must be the lawyers. All right. So what happens if you’ve generated all, you were able to do the data exploration but you want to take this to a BI tool. And what we can do is go into Power BI and create a connection to our endpoint. And I’ve already done that here. I’ve created a connection to my endpoint that exists in sequel analytics. And what I can do is I can create dashboards based upon that data. Hopefully with that demo, you were able to see everything that we’ve learned today, along with the implementation example. You’re ready to build lakehouses of your own. With that, there’s a couple of the sessions that I recommend you check out to learn more about sequel analytics and Delta lake in general to learn more. And if you want to know how to get started, starting June 1st, check out databricks.com/try. Thank you for joining me today. Have an excellent time.

Franco Patano

Franco Patano is a Senior Solutions Architect at Databricks, where he brings over 10 years of industry experience in data engineering and analytics. He has architected, managed, and analyzed data ...
Read more