Pre-aggregation is a powerful analytics technique as long as the measures being computed are aggregable. Counts reaggregate with SUM, minimums with MIN, maximums with MAX, etc. The odd one out is distinct counts, which are not aggregable. Traditionally, the non-reaggregability of distinct counts leads to an implicit restriction: whichever system computes distinct counts has to have access to the most granular data and touch every row at query time. Because of this, in typical analytics architectures, where fast query response times are required, raw data has to be duplicated between Spark and another system such as an RDBMS.
This talk is for everyone who computes or consumes distinct counts and for everyone who doesn’t understand the magical power of HyperLogLog (HLL) sketches. We will break through the limits of traditional analytics architectures using the advanced HLL functionality and cross-system interoperability of the spark-alchemy open-source library, whose capabilities go beyond what is possible with OSS Spark, Redshift or even BigQuery. We will uncover patterns for 1000x gains in analytic query performance without data duplication and with significantly less capacity. We will explore real-world use cases from Swoop’s petabyte-scale systems, improve data privacy when running analytics over sensitive data, and even see how a real-time analytics frontend running in a browser can be provisioned with data directly from Spark.
– Hello and welcome to the first ever virtual edition of the Spark+AI summit. In these pandemic times things are changing so quickly, its sometimes difficult to know what reality is. And we seek truth. We seek truth and facts and we seek insights, so we can take better actions. That’s what I wanna talk to you about today, which is how to pull insights from very large amounts of data with Apache Spark very quickly and very cheaply. In particular, how to do this using an open source library called Spark Alchemy that my company Swoop created. We call it alchemy because it helps you do magic with Spark. And we’ll see how, in a moment.
I’m the founder and CTO Swoop and my company is focused on improving patient outcomes, particularly one of the patient populations we care most about and people who have rare diseases. I don’t know if you know what a rare diseases but in the US technical definition is a disease that has fewer than 200,000 patients. So this is a very low prevalence diseases. And because they’re so low prevalence, they’re not very common. Doctors don’t know how to diagnose it very well. About 30 million people in the US suffer from thousands of different rare diseases. And on average, it takes more than five years to diagnose a rare disease that comes at huge suffering for the patient, the patient’s family, and at the huge cost for the health system as a whole. One of the things we do with rare diseases is we can predict the likelihood of somebody having a rare disease using some fairly advanced AI models working with petabytes of health and unhealthy. But in addition to doing machine learning and AI, we also do a lot of analytics, looking at data, understanding what’s happening, looking at changes, making sure that our AI models are doing the right thing. In particular in COVID times, the changing behavior can sometimes be so rapid. There are a number of AI models have done poorly and couldn’t adapt to the change. Change like for example, what happens when the President of the United States recommends an unproven therapy for COVID-19.
And our analysis showed up that in one day, prescription volume went up by 50 fold and then very quickly dropped down. Other times, you want to create interfaces for humans to be able to derive insights from data and custom interfaces, you can think of it as business intelligence. So in our case, some more unique and custom tuned interfaces. But the basic ideas to support human decision making and when a human is on one side of an IT user interface, they want the data, they want the queries to produce results at human speed, that quickly means sub second query times. So that creates a problem for us. The problem we have is that on one end we have petabytes of data and on the other hand, we need to produce sub-second queries. Spark is not designed for very fast query response times. That’s not its purpose. Spark queries have a reasonable startup and finish costs. So we need something in the middle. And you know, the simplest way to think about it, is well maybe will replicate all the data from Spark into some other system, a traditional high performance database. And then we power our business intelligence, or cube or custom web app, something like that. That sounds pretty complicated. So let’s dig into this problem and see if there are better ways to solve it.
Well, let’s start with the basics and don’t laugh. But the secret to high performance analytics, is to just process less data. The digging into this is actually means four things in the case of Spark.
First, it means to process fewer rows of input data. Second, it means to run fewer shuffles And third, it means that if you shuffles, you want shuffles to be as small as possible. The fourth thing which we’ll ignore for the purpose of this talk is that you also want to make sure that your data is not skewed. That it’s evenly distributed. That’s an important factor in any distributed system. So how do we approach this problem? Well typically the go to strategy for making things fast, is divide and conquer.
And in the case of an analytic system, we typically aggregating data getting from lots of rows to data summarized in a manner that a human can grok. So the aggregation usually involves some type of group by a roll up a cube. So if we could break that operation into say, a pre aggregation phase that gets done once. And then rather than going to the original data, we could use the pre aggregated data over and over and over again to satisfy queries. That would be a way to run a lot faster. And the property of data that allows this to happen is its reaggregability.
For example if you’re computing minimums or maximums,
you can compute intermediate minimum and maximum, and then take the minimum and maximum of that because the minimum of minimum is the minimum and the maximum of maximize the maximum. Count is a little bit more interesting, when you pre aggregate with count, you have to re aggregate with some because the count of two counts isn’t the countable things. But we have a problem. And the problem is there is another kind of count. Count distinct. So I gave you another slideshow that you’re not seeing them.
– [Man’s voice] Okay, – Hold on. I’ll just do it again. I said in my windows Zoom is having issues but no problem.
But we have a problem. There’s another kind of counting and that’s distinct counting. And the problem with distinct counting is the distinct counts are not reaggregatable. If you have a website and 10 unique visitors, visit yesterday and 10 unique visitors visit today, the total number of unique visitors that visit over the two days is somewhere between 10 and 20. We can just add them up. So what do we do in the situation? I mean, it seems like with count distinct, we need to process every single row of the input data to produce a final count for users. And that kind of defeats the purpose of pre aggregating and re aggregating.
So, let’s dig into this a little more. And to do this, let’s look at a specific example.
A specific example is a sample of our prescription data related to the code analysis I showed you earlier, I pull, you know, a small sample, just a few columns, essentially when a patient visits a doctor on a particular date then a doctor writes a prescription, I pull the most basic information about that, just about 10 billion rows, about a fifth of the terabyte.
And we gonna process it with a cluster where all his data is gonna comfortably fit in RAM. Because we don’t wanna check how fast Spark, you know, does IO to various storage layers. We wanna understand how Spark processes this type of data and computes different types of analytic aggregations. Here’s the data. You know you have a date, you have a generic drug name, a branded and drug name, drug code in the US that’s a National Drug code and NDC and doctor patient IDs which have hashed for extra privacy. Pretty simple. So one of the most basic queries we can ask over this data is, you know what happened to the number of prescriptions and how many different generic and branded drugs did doctors swipe prescriptions for per month, over some period of time. In this case, the data goes back about five years. It’s an interesting query because during pandemic times, you don’t go to the doctor unless it’s really urgent. So we’d expect reduction in prescriptions and a reduction in the total number of different drugs the doctors are prescribing. And as we can see here is that, you know, it ran a little over three minutes. And if we dig into the analysis a little bit further, we’ll see that it’s, you know, query that cannot run quickly enough while using it. So let’s apply the pre aggregation strategy, right. In this case, since we want data by month, we can clearly reduce the date level from days to months. And since we cannot aggregate distinct counts, we can group by the generic name in the branded drug name and essentially preserve the unique values, but still shrink the total number of rows with a group by. Now let’s see what happens to our query. when we run it again? Wow, you know, the simple change, we’re running 20 times faster. And digging deeper into the analysis of the performance, you see that we’ve reduced the row count by 850 fold. And we also reduced the shuffle size by five fold. Now, the reason for that is essentially, ’cause we are processing fewer rows and so fewer number of distinct values end up on all the workers. So they have to move less data to other workers in the cluster.
And the combination of the two is that we run the Spark 20 times faster. Now, it final query ran a traditional relational database system that’s designed for high interactive query response time, so high throughput.
It will be a sub second query, because there’s so little data to process but again we have another kind of problem and problem is that we can only do this trick, when we’re grouping by low cardinality dimensions. There are only a few thousand generic product names, and then only a few 10s of thousands of branded drug names. So when you go from 10 billion rows, to you know, five years times 12 months 60 unique dates, and at most 10,000 unique values for the branded drugs, I mean, 20 something thousand, 10s of thousands. That’s an easy trick we get the 850 fold reduction. But what if we wanted to get a distinct count of patients? Well, there’s more than 300 million patients in the US. So that creates a problem. We know that we couldn’t group by the patient ID in addition to everything else, that will hardly cause any reduction in data volume. I mean, how many different doctors do most patient see per getting right.
And if we just do a straight computation of a distinct count for patients, we see that the situation gets pretty bad. We don’t need more than two times slower over seven minutes. And look at what happened to our shuffle. We’re shuffling nearly 150 gigabytes of data in the cluster. Wow, that seems like a lot. The entire data set is about 200 gigabytes. So let’s do the math here.
Let’s say that there are 300 million unique patient IDs and the 64 bit integers of eight bytes. 300 million times eight bytes is roughly about two and a half gigabytes. You have 80 cores in the cluster. 80 times two and a half gigabytes is an upper limit of about 200 gigabyte shuffle if every single worker had every single unique patient ID, Now it’s a little less than that because you know, one worker is going to keep one set of IDs to process itself and only needs to shuffle the ID set that it’s not handling. But you get the idea right? The upper limit here is, you know, close to the size of data we’re processing because you have to multiply it by the number of workers in the cluster. And that’s where we have a problem. So what can we do about it? Well, if you’ve dug into the Spark functional library, you’ll see this Spark has approximate counting functions. And the documentation says, you know, they can count a little faster. Proximate counting uses a probabilistic data structure called HyperLogLog under the covers. Just some fascinating mathematical properties and we’ll be happy to talk to you about them in the Q&A session after the talk. But I just try it out, you know, let’s replace the count distinct with an approximate count distinct and compute an approximate count of unique patient seeing the doctors. That computation is done at the default 5% error. And we see that we’re now running much better, you know, we’re running two times faster and the shuffle size has gone way way down. So let’s analyze the situation. So the bad news is, we still have to process every single room of importation. The good news is that we have more than 50 fold reduction in the shuffle size. How does this happen? Well, the reason that happens is that approximate counting, rather than exchanging the individual patient IDs, it exchanges some binary data structure known as a HyperLogLog sketch. And a HyperLogLog sketch is a data structure computed by hashing all the IDs that a single worker sees. And its actually a number of the outputs, a number of different hash functions in the sketch itself. And those sketches, while they can get large if you want low error, are actually pretty small, compared to the size of the individual 300 million possible unique patient IDs. You know we talked about two and a half gigabytes per worker. And so the net result of good news bad news is mixed news. We’re running two times faster, but it’s still in the minutes range and not good enough for the type of queries you want to do. And this is really surprising because under the covers, we seem to have an opportunity to run much faster, right? Like how could HyperLogLog do what it’s doing without shuffling all the patient names. Clearly the HyperLogLog sketches must have an interesting property about them, that allows them to be combined to compute an approximate distinct count and the reduction phase of the job. Can’t we use that ability to combine sketches together in a pre aggregation re aggregation pattern? The short answer is yes we can. But for a really unknown reason, people who implemented HyperLogLog support in Spark for approximate distinct counting didn’t actually expose access to the binary data structures. The folks at Google who work on BigQuery, people working on Snowflake and a number of other systems understood the power of working directly with these HyperLogLog sketches as binary data, and so they provided direct access to them.
But for Spark, we actually had to build the Spark Alchemy library to do this and add all kinds of other bells and whistles that helped to inter operate in ways that are beneficial which we’ll see in a moment. So here’s what we can do with Spark Alchemy. During the pre aggregation phase, we can process patient IDs and build HyperLogLog sketches from them. And this is gonna be in a binary column and our bank data structure.
But the property of HyperLogLog sketches is that it can be merged to create HyperLogLog sketches? So in the re aggregation phase, we will merge the sketches we created in the pre aggregation phase into sketches and then in the presentation phase will actually turn those sketches through binary data into a distinct count. And this is what it looks like. On the creation of HyperLogLog sketch, use the HLL function from Spark Alchemy, we merge with HLL merge and we turn the sketches into distinct counts with HLL cardinality. So now we finally have a unified framework for working with the typical types of analytics queries and aggregates that we deal with in a pre aggregate reality present pattern. So let’s see what the impact of that is. So let’s build a pre aggregated table with HyperLogLog sketches.
You see, we just need to change one line in our query. And then let’s compute a final query using that pre computed table.
Wow! You know, we don’t have any more than the 60 times faster, we’re back in the seconds range.
If you look at the analysis of what happened, we see that now we have the best of both worlds. We have the 850 fold row reduction on the input side. And on top of that, we have a whopping over 300 fold reduction in shuffle size.
And a combination of those two pieces of good news is that we run in just a handful of seconds. And again, if this was done in a traditional relational database system, this would be a sub second query.
In other words, we’re done. We have a bit of a pattern for producing subsequent queries in our analytic system now. In the real world it’s a little bit more complicated because you have many different query types. But we can also create many different pre aggregate types to handle different query types. One of the things you have to think about of course, is that we’re no longer counting precisely. We have a counting approximately.
Now in my experience sometimes business people get very troubled by the notion of approximate counts. And they ignore the fact that very often the data we work with is not complete, or it’s dirty in some way, or it has bias. And, you know, when you’re approximately counting with very low error, in precise things, you actually may end up with a better outcome than if you’re precisely counting the same and precise things, you never know. The good news about HyperLogLog approximate counting is that the precision of the algorithm is tunable. It all depends on how many bits of precision you allow, in your HyperLogLog sketches. The basic rule of thumb is that to have your error, you need to quadruple the size of your sketch. So if you want less than 1% error at 99% confidence, you have to turn sort of a set of IDs in this case, four bit IDs but does it really matter how large the data is that you’re hashing because even if you’re hashing much larger data, you still produces the same HyperLogLog size, and they will become a 32 kilobyte sketch in your largest case.
So pretty much for any business name unless you’re dealing with a very tight compliance or transactional financial scenario, you can find that approximate accounting approach mechanism threshold of error that satisfies same business requirements.
There lots of other benefits to using HyperLogLogs in your production. HyperLogLog data structures are privacy safe that takes potentially unsafe things such as IDs and turn them into opaque binary sketches using one way hashing. You can perform unions and intersections or HyperLogLog sketches both across rows and across columns. Unions are lossless, intersections can increase the error right? And I’m happy to talk more about this in the Q&A section. The basic idea is that if you’re comparing cardinalities that are not too far apart, or its too far, you know, 10, 2000 x maybe, the error is manageable. But if you’re comparing things that are many many orders of magnitude apart in the cardinalities, the error can be quite high.
And doing intersections with HyperLogLog in that case is not recommended. And last but not least, with Spark Alchemy you have the ability to inter operate, at the binary HyperLogLog sketch level with other systems. In particular, you can do it with Postgres and its cluster version situs. And all you have to do is one function call, instead of HyperLogLog merge, it’s HyperLogLog union agg in the Postgres world.
So that’s all I have for you today, and I wanna leave you with a couple of calls to action. I really encourage you to take a look at Spark Alchemy and see all the different capabilities of HyperLogLog data structures and the native Spark functions that Spark Alchemy exposes. And if you’re doing any type of distinct counting, you should be thinking about whether you should be doing it approximately with HyperLogLog sketches. If you have to power dashboards, or use experiences for business intelligence for human decision making, you probably are replicating data out of Spark unnecessarily today. Ask yourself if you can keep most of the data in Spark. And instead of using moving raw data lots more rows out of Spark, you could actually move HyperLogLog sketches. And last but not least, you should care about saving lives and doing it while making Spark great. Maybe you should work at Swoop. Let’s Talk. Thank you so much for your attention.
Sim Simeonov is an entrepreneur, investor and startup mentor. He is the founding CTO of Swoop and IPM.ai, startups that use privacy-preserving AI to improve patient outcomes and marketing effectiveness in life sciences and healthcare. Previously, Sim was the founding CTO of Evidon (CrownPeak) & Thing Labs (AOL) and a founding investor in Veracode (Broadcom). In his VC days, Sim was an EIR at General Catalyst Partners and technology partner at Polaris Partners where he helped start five companies the firms invested in, three of which have already been acquired. Before his days as an investor, Sim was vice president of emerging technologies and chief architect at Macromedia (now Adobe). Earlier, he was a founding member and chief architect at Allaire, one of the first Internet platform companies whose flagship product, ColdFusion, ran thousands of sites such as Priceline and MySpace.