Introduction
Databricks Delta Lake is a unified data management system that brings data reliability and fast analytics to cloud data lakes. In this blog post, we take a peek under the hood to examine what makes Databricks Delta capable of sifting through petabytes of data within seconds. In particular, we discuss Data Skipping and ZORDER Clustering.
These two features combined enable the Databricks Runtime to dramatically reduce the amount of data that needs to be scanned in order to answer highly selective queries against large Delta tables, which typically translates into orders-of-magnitude runtime improvements and cost savings.
You can see these features in action in a keynote speech from the 2018 Spark + AI Summit, where Apple's Dominique Brezinski demonstrated their use case for Databricks Delta as a unified solution for data engineering and data science in the context of cyber-security monitoring and threat response.
Get an early preview of O'Reilly's new ebook for the step-by-step guidance you need to start using Delta Lake.
How to Use Data Skipping and ZORDER Clustering
To take advantage of data skipping, all you need to do is use Databricks Delta. The feature is automatic and kicks in whenever your SQL queries or Dataset operations include filters of the form "column op literal", where:
column
is an attribute of some Databricks Delta table, be it top-level or nested, whose data type is string / numeric / date/ timestampop
is a binary comparison operator,StartsWith / LIKE 'pattern%', or IN
literal
is an explicit (list of) value(s) of the same data type as a column
AND / OR / NOT
are also supported, as well as "literal op column" predicates.
As we'll explain below, even though data skipping always kicks in when the above conditions are met, it may not always be very effective. But, if there are a few columns that you frequently filter by and want to make sure that's fast, then you can explicitly optimize your data layout with respect to skipping effectiveness by running the following command:
More on this later. First, let's take a step back and put things in context.
How Data Skipping and ZORDER Clustering Work
The general use-case for these features is to improve the performance of needle-in-the-haystack kind of queries against huge data sets. The typical RDBMS solution, namely secondary indexes, is not practical in a big data context due to scalability reasons. If you're familiar with big data systems (be it Apache Spark, Hive, Impala, Vertica, etc.), you might already be thinking: (horizontal) partitioning. Quick reminder: In Spark, just like Hive, partitioning 1 works by having one subdirectory for every distinct value of the partition column(s). Queries with filters on the partition column(s) can then benefit from partition pruning, i.e., avoid scanning any partition that doesn't satisfy those filters. The main question is: What columns do you partition by? And the typical answer is: The ones you're most likely to filter by in time-sensitive queries. But... What if there are multiple (say 4+), equally relevant columns? The problem, in that case, is that you end up with a huge number of unique combinations of values, which means a huge number of partitions and therefore files. Having data split across many small files brings up the following main issues:
Metadata becomes as large as the data itself, causing performance issues for various driver-side operations
In particular, file listing is affected, becoming very slow
Compression effectiveness is compromised, leading to wasted space and slower IO
So while data partitioning in Spark generally works great for dates or categorical columns, it is not well suited for high-cardinality columns and, in practice, it is usually limited to one or two columns at most.
Data Skipping
Apart from partition pruning, another common technique that's used in the data warehousing world, but which Spark currently lacks, is I/O pruning based on Small Materialized Aggregates. In short, the idea is to:
Keep track of simple statistics such as minimum and maximum values at a certain granularity that's correlated with I/O granularity.
Leverage those statistics at query planning time in order to avoid unnecessary I/O.
This is exactly what Databricks Delta's data skipping feature is about. As new data is inserted into a Databricks Delta table, file-level min/max statistics are collected for all columns (including nested ones) of supported types. Then, when there's a lookup query against the table, Databricks Delta first consults these statistics in order to determine which files can safely be skipped. But, as they say, a GIF is worth a thousand words, so here you go: On the one hand, this is a lightweight and flexible (the granularity can be tuned) technique that is easy to implement and reason about. It's also completely orthogonal to partitioning: it works great alongside it, but doesn't depend on it. On the other hand, it's a probabilistic indexing approach which, like bloom filters, may give false-positives, especially when data is not clustered. Which brings us to our next technique.
ZORDER Clustering
For I/O pruning to be effective data needs to be clustered so that min-max ranges are narrow and, ideally, non-overlapping. That way, for a given point lookup, the number of min-max range hits is minimized, i.e. skipping is maximized. Sometimes, data just happens to be naturally clustered: monotonically increasing IDs, columns that are correlated with insertion time (e.g., dates / timestamps) or the partition key (e.g., pk_brand_name - model_name). When that's not the case, you can still enforce clustering by explicitly sorting or range-partitioning your data before insertions. But, again, suppose your workload consists of equally frequent/relevant single-column predicates on (e.g. n = 4) different columns. In that case, "linear" a.k.a. "lexicographic" or "major-minor" sorting by all of the n columns will strongly favor the first one that's specified, clustering its values perfectly. However, it won't do much, if anything at all (depending on how many duplicate values there are on that first column) for the second one, and so on. Therefore, in all likelihood, there will be no clustering on the nth column and therefore no skipping possible for lookups involving it. So how can we do better? More precisely, how can we achieve similar skipping effectiveness along every individual dimension? If we think about it, what we're looking for is a way of assigning n-dimensional data points to data files, such that points assigned to the same file are also close to each other along each of the n dimensions individually. In other words, we want to map multi-dimensional points to one-dimensional values in a way that preserves locality. This is a well-known problem, encountered not only in the database world, but also in domains such as computer graphics and geohashing. The answer is: locality-preserving space-filling curves, the most commonly used ones being the Z-order and Hilbert curves. Below is a simple illustration of how Z-ordering can be applied for improving data layout with regard to data skipping effectiveness. Legend:
Gray dot = data point e.g., chessboard square coordinates
Gray box = data file; in this example, we aim for files of 4 points each
Yellow box = data file that's read for the given query
Green dot = data point that passes the query's filter and answers the query
Red dot = data point that's read, but doesn't satisfy the filter; "false positive"
An Example in Cybersecurity Analysis
Okay, enough theory, let's get back to the Spark + AI Summit keynote and see how Databricks Delta can be used for real-time cybersecurity threat response. Say you're using Bro, the popular open-source network traffic analyzer, which produces real-time, comprehensive network activity information2. The more popular your product is, the more heavily your services get used and, therefore, the more data Bro starts outputting. Writing this data at a fast enough pace to persistent storage in a more structured way for future processing is the first big data challenge you'll face. This is exactly what Databricks Delta was designed for in the first place, making this task easy and reliable. What you could do is use structured streaming to pipe your Bro conn data into a date-partitioned Databricks Delta table, which you'll periodically run OPTIMIZE on so that your log records end up evenly distributed across reasonably-sized data files. But that's not the focus of this blog post, so, for illustration purposes, let's keep it simple and use a non-streaming, non-partitioned Databricks Delta table consisting of uniformly distributed random data. Faced with a potential cyber-attack threat, the kind of ad-hoc data analysis you'll want to run is a series of interactive "point lookups" against the logged network connection data. For example, "find all recent network activity involving this suspicious IP address." We'll model this workload by assuming it's made out of basic lookup queries with single-column equality filters, using both random and sampled IPs and ports. Such simple queries are IO-bound, i.e. their runtime depends linearly on the amount of data scanned. These lookup queries will typically turn into full table scans that might run for hours, depending on how much data you're storing and how far back you're looking. Your end goal is likely to minimize the total amount of time spent on running these queries, but, for illustration purposes, let's instead define our cost function as the total number of records scanned. This metric should be a good approximation of total runtime and has the benefit of being well defined and deterministic, allowing interested readers to easily and reliably reproduce our experiments. So here we go, this is what we'll work with, concretely:
case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)
def randomConnRecord(r: Random) = ConnRecord(
src_ip = randomIPv4(r), src_port = randomPort(r),
dst_ip = randomIPv4(r), dst_port = randomPort(r))
case class TestResult(numFilesScanned: Long, numRowsScanned: Long, numRowsReturned: Long)
def testFilter(table: String, filter: String): TestResult = {
val query = s"SELECT COUNT(*) FROM $table WHERE $filter"
val(result, metrics) = collectWithScanMetrics(sql(query).as[Long])
TestResult(
numFilesScanned = metrics("filesNum"),
numRowsScanned = metrics.get("numOutputRows").getOrElse(0L),
numRowsReturned = result.head)
}
// Runs testFilter() on all given filters and returns the percent of rows skipped
// on average, as a proxy for Data Skipping effectiveness: 0 is bad, 1 is good
def skippingEffectiveness(table: String, filters: Seq[String]): Double = { ... }
Here's how a randomly generated table of 100 files, 1K random records each, might look like:
SELECT row_number() OVER (ORDER BY file) AS file_id,
count(*) as numRecords, min(src_ip), max(src_ip), min(src_port),
max(src_port), min(dst_ip), max(dst_ip), min(dst_port), max(dst_port)
FROM (
SELECT input_file_name() AS file, * FROM conn_random)
GROUP BY file
Seeing how every file's min-max ranges cover almost the entire domain of values, it is easy to predict that there will be very little opportunity for file skipping. Our evaluation function confirms that:
skippingEffectiveness(connRandom, singleColumnFilters)
Ok, that's expected, as our data is randomly generated and so there are no correlations. So let's try explicitly sorting data before writing it.
spark.read.table(connRandom)
.repartitionByRange($"src_ip", $"src_port", $"dst_ip", $"dst_port")
// or just .sort($"src_ip", $"src_port", $"dst_ip", $"dst_port")
.write.format("delta").saveAsTable(connSorted)
skippingEffectiveness(connRandom, singleColumnFilters)
Hmm, we have indeed improved our metric, but 25% is still not great. Let's take a closer look:
val src_ip_eff = skippingEffectiveness(connSorted, srcIPv4Filters)
val src_port_eff = skippingEffectiveness(connSorted, srcPortFilters)
val dst_ip_eff = skippingEffectiveness(connSorted, dstIPv4Filters)
val dst_port_eff = skippingEffectiveness(connSorted, dstPortFilters)
Turns out src_ip lookups are really fast but all others are basically just full table scans. Again, that's no surprise. As explained earlier, that's what you get with linear sorting: the resulting data is clustered perfectly along the first dimension (src_ip in our case), but almost not at all along further dimensions.
So how can we do better? By enforcing ZORDER clustering.
spark.read.table(connRandom)
.write.format("delta").saveAsTable(connZorder)
sql(s"OPTIMIZE $connZorder ZORDER BY (src_ip, src_port, dst_ip, dst_port)")
skippingEffectiveness(connZorder, singleColumnFilters)
Quite a bit better than the 0.25 obtained by linear sorting, right? Also, here's the breakdown:
val src_ip_eff = skippingEffectiveness(connZorder, srcIPv4Filters)
val src_port_eff = skippingEffectiveness(connZorder, srcPortFilters)
val dst_ip_eff = skippingEffectiveness(connZorder, dstIPv4Filters)
val dst_port_eff = skippingEffectiveness(connZorder, dstPortFilters)
A couple of observations worth noting:
It is expected that skipping effectiveness on src_ip is now lower than with linear ordering, as the latter would ensure perfect clustering, unlike z-ordering. However, the other columns' score is now almost just as good, unlike before when it was 0.
It is also expected that the more columns you z-order by, the lower the effectiveness.
For example,ZORDER BY (src_ip, dst_ip)
achieves 0.82. So it is up to you to decide what filters you care about the most.
In the real-world use case presented at the Spark + AI summit, the skipping effectiveness on a typical
WHERE src_ip = x AND dst_ip = y
query was even higher. In a data set of 504 terabytes (over 11 trillion rows), only 36.5 terabytes needed to be scanned thanks to data skipping. That's a significant reduction of 92.4% in the number of bytes and 93.2% in the number of rows.
Conclusion
Using Databricks Delta's built-in data skipping and
ZORDER
clustering features, large cloud data lakes can be queried in a matter of seconds by skipping files not relevant to the query. In a real-world cybersecurity analysis use case, 93.2% of the records in a 504 terabytes dataset were skipped for a typical query, reducing query times by up to two orders of magnitude.
In other words, Databricks Delta can speed up your queries by as much as 100X.
Note: Data skipping has been offered as an independent option outside of Databricks Delta in the past as a separate preview. That option will be deprecated in the near future. We highly recommend you move to Databricks Delta to take advantage of the data skipping capability.
Read More
Here are some assets for you:
Delta Lake Product Page
Databricks Delta User Guide AWS or Azure
Databricks Engineering Blog Post on Databricks Delta
Interested in the open source Delta Lake?
Visit the Delta Lake online hub to learn more, download the latest code and join the Delta Lake community.
To be clear, here we mean write.partitionBy(), not to be confused with RDD partitions. ↩
To get an idea of what that looks like, check out the sample Bro data that's kindly hosted by www.secrepo.com. ↩