One of the most significant benefits provided by Databricks Delta is the ability to use z-ordering and dynamic file pruning to significantly reduce the amount of data that is retrieved from blob storage and therefore drastically improve query times, sometimes by an order of magnitude.
However, taking advantage of this approach, over petabytes of geospatial data requires specific techniques, both in how the data is generated, and in designing the SQL queries to ensure that dynamic file pruning is included in the query plan.
This presentation will demonstrate these optimisations on real world data, showing the pitfalls involved with the current implementation and the workarounds required, and the spectacular query performance that can be achieved when it works correctly.
Speaker: Matt Slack
– So welcome to this talk on optimizing geospatial queries with Dynamic File Pruning. So just to introduce myself, I’m Matthew Slack I’m Principal DataArchitect at Wejo, I’ve been there for a couple of years now, before that I’ve been working on Spark in various different companies for the last four or five years. Heavily involved with performance tuning both stream and batched. And before that I was working more widely in NBI and Big Data for about 15 years. So what are we going to talk about today? Well, first we’re going to go and have a look at Wejo, what we need to do, and then we’re going to look at some typical use cases for geospatial queries within Wejo. So what kind of queries do people want to run? And then we’re going to go on to looking at how traditionally we’ve been able to optimize the Data Lake and lay out the Data Lake to take advantage of indexing strategies that made those queries perform. And then we’re going to move on to looking at Dynamic File Pruning introduced by Databricks, and just have a look at how that can really optimize those queries further. Then we’ll come on to Z-ordering strategies and what type of Z-order we should use for geospatial data. And we look at how we can measure the effectiveness of Z-ordering and the file pruning. And then finally, we’ll look at some caveats around queries that use Dynamic File Pruning and how we can optimize those queries to activate for our pruning and making sure that we get the best performance out of our data. So let’s first give a background to Wejo. So Wejo has the largest Data asset of connected vehicle data. We’re currently consuming Datafrom, you know up to of 15 million devices, and we have petabytes worth of historical data about where those devices have been. And from that we can infer lots of really interesting information. So one of Wejo main focuses is around Data for good is how we can use this connected car Data to improve safety reduce traffic, look at things like mobility intelligence whether…and we’re heavily working with people like retail customers, universities, to best understand how we can use this huge dataset. So this high level diagram here illustrates how we consume the data from OEM sometimes in the stream and batch. And then we work with egress partners to stream that Data to them or supply in batch. And we also produce derived Data insights machine learning off this dataset as well. So it’s a really exciting Dataset to work with. So Wejo has been working with Databricks for about two years now and Databricks is fairly heavily involved throughout our stack. We work entirely with AWS and Databricks underpins a lot of our work in Data science a lot of our aggregates and derived Datasets. And we’re starting to use Delta Lake to optimize our geospatial queries as we’ll come onto later. We also use Delta Lake for GDPR and CCPA compliance. So let’s talk through some of the typical use cases that we have at Wejo for geospatial data. Well, as you can imagine our entire Dataset it’s a breadcrumb trail of connected car data moving along roads predominantly. And so of course all our data, is geospatial in nature and temporal. So typical queries around looking at particular junction around a road segment. So to do that we have to snap the connected car Data to roads, we do an anomaly detection on roads and on traffic. We’re also inciting where people are going from and to, to look at things like retail customers, who’s going to shops, who’s going to baseball games, all this kind of thing. But all of these of course require both geospatial and temporal indexing. So the obvious question is why don’t we use some of the many libraries which already exist for working with geospatial Data in Spark. And you can see some of them listed on the right there like GeoTrellis, GeoSpark, Magellan, GeoMesa as well. So the answer is we use all of these. But most of them are not very good at improving the performance of reading the Datafrom the underlying storage layer. So most of our data… because we have huge Datasets it’s stored in Blob storage or in S3. And so we want to look at ways we can retrieve that Data really fast. It would cost too much Data… too much to store all that data in a faster Data store like NoSQL store HBase or something like that. So we still want to be able to retrieve Data really fast. And these tools on the right are generally designed to optimize joining geospatial Datasets which are already loaded in memory. And they don’t optimize the reading of data from this underlying storage layer. Yes, GeoMesa, if you can use that can be.. do that, but it is quite complex to use. And it’s reliant on the partitioning structure underneath and doesn’t take advantage of Dynamic File Pruning. So let’s have a recap on how historically we’ve been able to structure our Data Lake to get better performance for different types of queries. So the main tools our disposal as most of you will know is around partitioning. So we can partition our Data Lake. For every Dataset we have, it can be partitioned by one or more columns and those columns form a hierarchy. So for example, on the right hand side you can see some different common hierarchies that might be used like year, month, day country, or state, ingest Date and hour or a mixture of a geospatial and a date partition. Now the choice of the partition columns we use is always a trade-off. We want to make them really selective. So if we have a filter for a specific day or hour, it’s going to be able to narrow down to the individual partition really quickly which of course is corresponds to a folder as we think of it in the storage layer. However, we don’t want those partitions to be too small. Otherwise there’s a big overhead and retrieving that Data from the storage layer. And if you have too many partitions typically more than 10 or 20,000, and it becomes too difficult to manage and the matter store layer will slow it down as well. So it’s always a trade off and these are the main leavers we’ve had to be able to optimize queries. So Spark introduced Dynamic Partition Pruning. And what this means is if we have a query which has some filters, which include partitioned columns which are in the partition structure then those partitions can be skipped if the filter excludes them. Say for example, if we’ve got Data that’s partitioned by date, if we have a filter that says “I’m only interested in a specific day or a specific week.” If those columns are used in that filter to the same columns that we’re partitioning off and they can be dynamically skipped when the query is run. And that obviously drastically reduces the number of files that Spark has to read out of blob storage to then query. So historically Data skipping with effective partitioning strategies and the combination of columnar file formats like Parquet which we’re all familiar with ORC as well. So they were the main ways we could optimize the Data Lake. So Dynamic File Pruning brings something new to the table. So let’s talk about Dynamic File Pruning. So it’s in a way it’s an alogist its not dynamic partition pruning, but for files within the partition rather than the partitions. So it was introduced earlier this year with Databricks Delta Lake. Of course Delta Lake is open source but the actual Dynamic File Pruning only works within the Databricks runtime. And it allows files to be dynamically skipped within a partition. So dynamic partition pruning, skipped partitions dynamically this skips files dynamically within a partition. So files that do not match the query filters are now excluded by the optimizer. So it’s another layer of being able to reduce the amount of Data that has to be pulled back from your underlying storage layer. So how does Databricks know what Data is in which files? Well, it collects metaData on a subset of columns in all the files, which are added to the dataset. And this metaData is stored in a folder called DeltaLog. Of course, it only works if the files are pre sorted beforehand and we’ll come onto later how we do that. So let’s have a look at this DeltaLog folder. So in there there’s a JSON file. And this file is used for multiple purposes for Data Lake. It’s used for things like ACID it’s used for time travel and a few of other uses, which we’re not going to come onto today, but you can look at it in the documentation. But within that file, there is a row for every file in this case, Parquet files, which are added to that Dataset. So you can see in the top left box, we’ve got a list of files there. And in the box below that I’ve exploded one of those rows and you can see within that, there’s a “stats” column. And that “stats” column is on the right hand side. And it shows that for every file we are collecting “mini-max” values and “nullCounts” for some of the columns in that file. Now it’s not all the columns it depends how wide your Dataset is. And you can see the setting at the bottom delta.dataSkippingNumIndexCols, which is set at the file level sorry, at the table level. And that limits the number of columns that’s Databricks is going to generate metaData for. So what Databricks can now do is it can use this, JSON file to limit the number of files that it has to be read from blob storage. So for example, if you’re filtering for a specific “geohash” in this case, so you can see on the right hand side we’ve got some columns with “mini-max geohash.” So if you’re filtering for a “geohash,” which is outside of that range, then that file will simply be ignored. So obviously this is going again like partition printing drastically reduced the amount of Data we’re having to pull back. So let’s talk about our Tests dataset before we go any further. So we’re going to use Spark 2.4 because it’s the one most people are fairly familiar with now. We’ve got a medium size cluster 10 executors we’ve got auto-scaling off because that makes the comparison between datasets a little bit more unrealistic. And we’re going to do a CLEAR CACHE before every test we do. Our input Dataset is one day of connected car data. As I talked about before, for us, that’s a pretty huge Dataset. It’s 16 billion rows and we’re already talking about over a terabyte of data. So it’s a reasonably large Dataset not massive but reasonably large. And it’s stored as Nested Parquet in blob storage in S3. So on the right hand side is the query we’re going to be used at filtering for. So essentially we’re going to be looking for Data points which fall within this polygon on the right hand side. And the way we’re going to do that is basically by putting a bounding box over the top of that polygon of course you could then filter down further but once you’ve restricted the Data to the bounding box around that polygon, of course, the filtering to just look at that specific polygon is going to be much, much quicker. So let’s look at the speed of this query before we even start to look at Dynamic File Pruning. So the first thing we’re going to do is create a Dataset which has all the “geohashe’s” that cover the Austin polygon. For those of you who don’t know what a “geohash” is, it’s a geospatial indexing system which creates a grid of cells over your Latin long. So there’s different precision that the “geohash” can have. So you can have precision one through to 10 or 12, we’re going to be using precision six which roughly corresponds to I think it’s about 9.6 kilometers by 1.2 kilometers. So we’re effectively going to be putting a grid of that size over Austin. And I think it’s about 300 or so “geohashes.” So in this query, we can see on the right hand side, we’ve got a filter for ingest state. That’s one of our partition columns. So we’re automatically reducing the amount of Data we’re pulling back by filtering on a partition column, great. We’ve then got an “INNER JOIN” geohash which is our list of geohashes, which is a geohash, is it’s just a string in this case, six characters long. And I’m going to look at those geohashes for that day in our base dataset. And we can see that the results are around 97 million Data points. So it’s still a reasonably large, but it’s much smaller than the 15 billion Data points of the entire Dataset. However, what you can see with this query is it’s quite slow. So even with that medium size cluster, it took just over five minutes. So what can we do to speed this query up? So you can see on the right hand side. We can see from this simple query, which uses input file name that it shows that the geohash is for Austin was spread across all nearly 2,500 files in our input dataset. Now, what can we do with Dynamic File Pruning to reduce the number of files that our Data is stored in? So we’re going to use a technique called Z-ordering and this is a technique that’s been brought in as part of the Dynamic File Pruning Delta Lake in Data the Databricks runtime, which allows us to co-locate related information in the same set of files. So I’m not going to go into Z-ordering in too much detail. There’s lots of information on that online, but what it does is it allows you to co-locate the same geographical Data in the same files. And because of that metaData file that we saw earlier on Spark knows which files to pull back to return the results of our query. So how do we Z-order the data? Well, you can do it using the SQL API using the optimized statement. So first we have to convert to Delta adapt. Delta if it isn’t already that’s when can use the syntax on the left-hand side there’s other syntaxes available using the scholar API. And then we use this optimized table statement. And at the bottom we specify which columns we wanted to Z-order by and Databricks runtime we’ll create a Z-order over these columns. So what will happen is Data points with similar location, longitude and latitude will be co-located within the same files. So off the back after the back of this process, we get a new Dataset which is just over 6,000 files, same number of rows a little bit less data because we’ll reduce the number of columns, same partitions we’ve now Z-ordered by longitude and latitude. So let’s see how this impacts the data. So what this image on the left shows is which files contain the same data. So the files are color coded. So you can see that the Midwest, North Midwest is all contained within one file. That’s really sparse while the cities, which are really busy cities like New York, LA, they might be in their own file in their own right? So the files are kept the same size by the Databricks runtime, but the Data within them is co-located geospatially. So if we run that same query over the date, so that we run before using the input file named statement, we can see that our Data for the Austin polygon is now contained in just over 340 files. So that means that for querying, that Data spark only has to return just over 10% of the original files that it had too before. So how do we measure the effectiveness of file pruning and whether it’s even working, because in a lots of cases, you think it’s working when it’s not. So obviously we can use the input file name function which reviews before. But one of the main tools to check it’s working is with the query plan which you can obviously do with using explain and look for this statement in the query plan the sub query broadcast dynamic pruning that’ll indicate to you that Spark is activating dynamic pruning on your query. Another way we can measure the effectiveness of file pruning. This doesn’t work for dynamic pruning but it will work for file pruning static file pruning. Is using this handy function from Databricks, which gives you an indication of the percentage of data, which is skipped by your dynamic…… By your file pruning strategy. So, which columns do we want to use then, we’ve got the whole range of geospatial columns. There’s obviously longitude latitude, geohash, which I’ve talked about. That’s a Z-order over longitude and latitude Zip codes, State. And there’s other approaches like H3 which is developed by Uber and S2 by Google. One of the great things about Z-ordering, unlike dynamic partition pruning when we Z-order, and we enable Dynamic File Pruning is that any query that has a geospatial component as long as that column is in the metadata then we will get a benefit in our query performance. So if we Z-order on geohash that will also improve the performance when we’re querying on zip code or state. So we did an analysis on which columns were most effective latitude longitude, H3, geohash. They’re all really good across the board, improving the performance for multiple different types of queries. So you can see that if we Z-order by geohash we also got pretty good Data skipping on a query by state. So 76% of the Data was skipped on a query by state. And that meant that query was much, much faster. In the end we sparkled on geohash because it’s already a column that we use widely in our Datasets but we could have equally used H3 as well. So there’s a few gotchas with Dynamic File Pruning. And as I touched on before you need to check the query plan to make sure that it’s being activated. You’ve got three queries here, which are almost identical yet only one of them activates Dynamic File Pruning. And that’s the one on the right hand side. So what’s making that query special. Well, it’s got the broadcast 10 10, so it’s forcing the broadcast on our table of geohashes. And strangely it also has to have the….a column from your lookup table in this case, the geohash table in your “WHERE” clause. Without that column there, which is the one you can see in the left-hand side, you don’t get the benefit of the query performance, that dynamic file printing. But what you can see is when Dynamic File Pruning is.. works as a massive improvement in performance. So our original query took five minutes to run with the same size cluster. We’ve already got it down to just over half a minute. So it’s already been into approach what analysts would want as an interactive query performance. So there’s a few gotchas with this Dynamic File Pruning some filters won’t activate Dynamic File Pruning for example, a regular expression in well, the instatement, but only if you have up to 10 values and you’ll see if you go over 10 values in the end clause then it will stop working. In a join works but as I touched on before, it has to be broadcast lookup table has to be broadcast. And one column must be included in your “WHERE” clause. I’m guessing at some point, Databricks will improve that but that’s at the moment that we’re working with. So “OPTIMIZE” is great. And as we can see, it makes a massive difference to the …our query performance. However, it’s quite expensive to do especially over these huge Datasets that we have at Wejo. And that requires an additional step in your Data pipeline. So maybe two steps if it’s not Delta already. So for example, optimize might take twice as long in our case as generating the Dataset in the first place. So how can we solve that problem? Well, another alternative to using optimize is using repartitionByRange. And this has the benefit of working for both batch and stream. It doesn’t Z-order the Data in the same way as the optimized does, but it does shuffle Datapoints into files based on the columns you’ve selected. So in our case, what we Z-order by is the geohash column because geo hash is already a Z-order over the data. So we re partitioned by range using the geohash. And that creates the same effect as the optimize would do over Latin long, because geohash is a Z-order itself over latitude and longitude. So what this means is we don’t need to do the optimized steps. So instead of having this second step in our pipeline we change the pipeline where we create this stage in the first place. And at the end of that, we add a repartitionByRange function. And that then outputs the Data into files, which are pocketed in the same way as you would get with the optimize function. But it’s taking that step out. So let’s review how our pipelines look overall. So when we’re importing Data in stream or batch we have bring it in and then we do some processing on it. As part of that processing, we add geohash. The geohash is really important to us so we spatial index. So lots of things, hang off the geohash. When we’re writing that deep Data out we repartitioned By Range over the geohash. And what that does is then co-locate the same geohash is in the same Latin long in the same file. That’s not all we have to do we then have to move that data to Delta, because what that will do is create the statistics in the metaDatafile that I talked about before. And once those, the statistics are there then Dynamic File Pruning can work. When we’re querying data, we can convert all our queries to if it’s a query over a polygon we can convert it to a range of geohashes. We can look up those range of geohashes using a broadcast function. And as I touched on before, making sure that a “WHERE” clause is in there that triggers the Dynamic File Pruning. And what’s the result? Well, as shown on there we can get easily 10 times reduction inquiry times and we can get up to a hundred times reductions as well. When the query is already cashed it’s really fast when that metaDatais cashed. And what that does is it massively reduces the amount of Data we have to pull from the object storage layer and in our case S3. So just to wrap up, so Dynamic File Pruning is a fantastic tool for optimizing any query, but particularly in our case geospatial queries, which read from object storage. So it’s great to getting that Data off desk. If you then decide to use another library to join it that’s fine, but it’s great for improving the time for getting that quick query…. that Data off desk. However there are a few caveats you have to make the query right and you have to set up the Data right in the first place. So hope that’s has been useful. Please review and rate the sessions and any questions.
Matt leads data strategy at Wejo, coming up with innovative ways of processing petabytes of connected car data, in both stream and batch, using technologies such as Spark, Kafka, Kafka Streams and Akka. He is a strong advocate of Spark, having led on implementations of Cloudera and Databricks in his most recent roles. Matt can regularly be found leading Spark training sessions, or getting stuck into the latest performance tuning challenge.