In the analysis of big data there are often problem queries that don’t scale because they require huge compute resources to generate exact results, or don’t parallelize well. Examples include count distinct, quantiles, most frequent items, joins, matrix computations, and graph analysis. Algorithms that can produce accuracy guaranteed approximate answers for these problem queries are a required toolkit for modern analysis systems that need to process massive amounts of data quickly. For interactive queries there may not be other viable alternatives, and in the case of realÂ-time streams, these specialized algorithms, called stochastic, streaming, sublinear algorithms, or ‘sketches’, are the only known solution. This technology has helped Yahoo successfully reduce data processing times from days to hours or minutes on a number of its internal platforms and has enabled subsecond queries on real-time platforms that would have been infeasible without sketches. This talk provides a short introduction to sketching and to DataSketches, an open source library of a core set of these algorithms designed for large production analysis and AI systems.
– Hello, this is Lee Rhodes of Verizon Media. And today, I’m gonna talk about production quality sketching library for the analysis of big data.
The three major topic areas I’m gonna discuss today. The first is problematic queries of big data, and these are kinds of queries that are very common, but yet traditional analysis methods don’t work very well. Then I’m gonna talk about an area of science and streaming algorithms, is called approximate analysis using sketches, and how we can use probabilistic analysis and stochastic processes to analyze these big data and answer these queries very quickly. And the third area I’m gonna quickly introduce the open source Apache DataSketches library, which addresses a lot of these problems.
So what is the challenge we have with big data? Well, our data is quite varied, often, very messy, and it’s often organized in many different dimensions and including time. And the data does not appear nicely organized in rows and columns like this, but it’s oftentimes comes sporadically with the key, value pairs and we have to analyze this data as it comes in and be able to make sense out of it. And of course, the real challenge here is that there’s billions of rows or billions of these key value pairs that we have to deal with and analyze this data in near real time.
Here’s some common queries that almost everyone that is struggling with big data, has to think about. One is unique identifiers or unique values in the stream. Oftentimes these identifiers, whether they’re for people or IOT devices, have many duplicates in the stream. And so we wanna be able to eliminate those duplicates and count only the distinct values or the unique values in the stream. Also, we’d like to be able to perform some sort of analysis on these sets.
We may have, like we do at Verizon Media, we may have visitors to our sports page or to our finance page, and we wanna say, okay, what is the union of those users between those two products and intersect that with, say another union of C and D and then excluding another set. So, this is what we call set expressions. Another very common query is where we wanna understand the distribution of values in our database. We might have, a lapse time for instance, or latencies are measured in seconds or minutes. And we wanna be able to say, what is the 95th percentile of our distribution or the median or the fifth percentiles. And that basically allows us to understand the full distribution of our data. Never a close cousin to this is to be able to display that as histograms or probability mass functions, PMFs. Very convenient to be able to do that. Going down to the lower left. Another very common query is where we want to find whether the most frequently occurring items in our stream or in our data. Suppose we are Apple with iTunes and where we are dealing with millions of requests for different tunes every hour or every day. And we wanna say which tunes were the most popular that we may wanna change our advertising or our pricing strategy based on popularity of those items. And we wanna be able to do that quickly. And other kinds of analysis that is also common, particularly in AI, is where we have large vectors or matrices, and we want to be able to do a decomposition of those matrices in an efficient manner. Graph analysis is another kind of analysis that’s very common. Where, we all live in networks, whether in a relationship, people-relationship networks or in communications networks. And we wanna be able to understand the characteristics of those networks. And then of course, there’s various types of sampling, both weighted and uniform sampling that we wanna be able to do correctly and be able to combine those samples across different data sets and do that correctly. One of the common characteristics across all these types of queries is that they’re not additive.
When the data gets large or resources are severely limited. All of these queries are problematic, because the aggregations or the attempt to try and combine different groups or even add individual items are not additive. In others, you can’t take say, the unique count of some group A and unique count of group B and combine those two numbers. Because it does not include the identifiers that were in common across both A and B. And so we’ll get the wrong answer. And the same thing goes, if you have a current result and you just want to add one new item.
Suppose you’re talking about a distribution of values. You want the 95th percentile, you may have the value, that’s the 95th percentile, but you add a new value.
And it’s no longer the 95th percentile. There’s no way to know what that change is, just from the values itself.
Okay, let’s talk for a minute about why these particular kinds of analysis queries are difficult for big data. And it’s in this traditional systems where you wanna do exact analysis. All of these different types of queries require that you have local copies of the data in the query engine. Suppose your big data is off on discs or in a cloud or wherever in order to answer these queries, you would literally have to copy sometimes all of the big data or vast proportion of the big data into the local system, in order to be able to complete the analysis. And of course, this is very problematic. If the data is very big and the bigger the data, the more difficult this is. So the size of your local query engine now begins to approach the size of your big data. And an additional problem is that in the query processing, in the analysis of these particular queries, sorting is often required. And so sorting billions of items is very costly, okay.
Well, you might think as a knee jerk reaction, let’s just parallelize this. We’ll put it on a major cluster, like a MapReduce and that will be able to solve our problem. Well, it doesn’t really, so parallelization does not really help. Because the nonadditivity, you still have to keep all the copies somewhere. And even after you have split all the data, you’ve got very expensive shuffles in the middle of the process. And then you still have to do the distinct count or whatever the nonadditive computation for each of the partitions. So it is not cheap to do this.
Time windowing is another example. Suppose we want analysis of say the last 30 days, so that we can report what are some or some aggregate function is over the last 30 days and redo that every day. So you get always a view of a window of say 30 days or 30 minutes. It doesn’t matter. But the problem in traditional analysis is that means you have to process each data item that many times. So if you’re looking at a 30 day window, you need to process that same data 30 times. As each day advances through the window, you’ve got to combine all the data together, again, and do your recomputation for the summary for that 30 day window. So this is even more expensive.
So, let’s challenge a fundamental premise. And that is that our results must be exact. If we can allow for some approximation, along with accuracy guarantees, we can achieve orders of magnitude improvement in speed and reduction of resources.
So here, I’m gonna introduce the sketch. It’s also known as a stochastic streaming algorithm, and the basic task here is that we want to model the problem that we’re trying to solve as a stochastic process, and then analyze this process using probability and statistics. So, a typical sketch has four major components to it. It has a stream processor at the beginning, which takes the stream of items that you want to analyze in, one at a time. It does has a random selection process inside that throws out some data, generally throws out a vast majority of the data, but chooses that intelligently and then stores a few items, or keeps a representation, not necessarily a copy, but it’s representation of what it is seeing in the next stage, which is the data structure. And that’s what we call the stochastic process. It is intentionally random. And we often add randomizing functions into this stream processor in order to improve the ability to do the analysis. Then the next stage is this data structure. It’s typically quite small. Typically only kilobytes. It can be larger. It can be in megabytes in size, depending on the user, how they want to configure the sketch for the accuracy trade off. The third basic box here is the query processor where it does the probabilistic analysis of the data structure when a query comes in and then returns a result that is approximate, but it has well known or well understood error distribution of that particular result. Another part of the sketch, which we have that is required for distributed analysis is a merge and set operations where it receives instead of a sketch of items or a stream of items. It receives a stream of sketches and then is able to merge these other sketches, which is really the data structure of these sketches into the current data structure. And then returns a result sketch on the output. That is important if you wanna do set operations with a downstream of this particular process.
So how and why do these sketches achieve superior performance for systems processing massive data?
Here’s some of the major properties of a sketch. The first, it has a small storage size, but not only is it important that it starts small, but it remains small as the size of the input stream or size of the input data gets larger. So that’s what we call sub-linear. So the size of the sketch is sub-linear in space with respect to the size of the input stream. So in this little graph in the middle, you see the stream size, say growing on the x-axis and the sketch size then becomes limited. It’s not even linear proportional. It is in fact sub-linear. So it’s like logarithmic in growth, or even grows up to some maximum size and stops. And that makes it very suitable for analyzing very massive data because it keeps the size limited. The third major property is what we call single-pass or one-touch. It only touches each item once, and doesn’t need to go back and look at older items in the stream at all, keeps going, it’s what we call one-touch. And that’s very important for real time analysis. This fourth major property is what we call data insensitive. That it really doesn’t matter how the input stream is organized. It can come in different orders. It’s insensitive to the distribution of the data. It doesn’t matter whether it’s Power Law or galcion or however your data is distributed. It is insensitive to that and produces basically the same and answer with the same error distribution.
This next property, Mergeable. I spoke about earlier, but is critical because for distributed systems, you wanna split your data into many fragments and be able to analyze all these fragments separately and then converge, merge the sketches to data together. So you’re not really merging the answers. What you’re doing is merging the data structures of these sketches together. And that’s also very vast. And of course, the nature of sketches in general, is they’re approximate and probabilistic. However, these are not just empirical algorithms. They have mathematically proven error properties that are very important to us. Now, some may say, well, how is this different than just sampling? Well, there’s some similarities. Some sketches are sampling type sketches. And that means that actually keeps in its data structure some resemblance of or transformation of the item it has seen in the input stream. And there are sketches that do not, and there are sketches that keep only a probabilistic record of it seeing a certain item, but it is not really sampling. And there’s types of sampling of course, that are not sketching. So it’s sort of a Venn diagram as you see here, but it depends on the basic sketch.
So the first win, the first major advantage of sketching algorithms is that the query space is small, compared to the previous diagram where we had to store virtually data that was nearly the size of the input data. Here, the query space is very tiny. So the sketch itself is say, typically kilobytes in size. And, so when the user submits his query the processing now, because the sketche’s small means it can also operate very quickly. There’s not much data in it.
Typical speeds for these sketches, are 10 to 40 nanosecond per update items which means 100 million operations or so, anywhere from 50 to 100 million operations per second to be able to analyze the input string.
The second major advantage or win is the mergeability. And so now we can, as I mentioned before, we can partition our stream into many different fragments and we can do our sketching on each of these fragments separately and it’s can be infinitely parallelized. So it’s embarrassingly parallelizable into many different sketches and then these sketches can be merged for the answer at the end.
The next two major wins using sketching is we can achieve near real-time query speed and very much simpler architecture. So in this diagram, what I’m showing is you have say billions of rows in your source data. And if you process this data offline or in a ETL process where you compute a lot of your sketches beforehand, then you can place these sketches because they’re so small as if they were individual values in a data row, in a database. And so now you create a data mart or a hyper-cube structure that is uploaded from your back end process with all the precomputed sketches. And now the query engine, which is on the right can query and choose based on the predicates of the database, which rows it wants to query and merge those into the final answer which appears on the right. And these sketches of course can be merged by any dimensions, including time. And it’s all a choice in the query side of this space.
Even in time windowing, we see a major advantage and here’s the fifth win.
We can put these sketches now in a windowing sequence that could be virtually any length. And now the advantage is that the merging really only meets each item only needs to be touched once. Unlike the exact case, we have a huge speed advantage. So even though you have 30 days or 400 days, or whatever in your window your item, each item coming in is only processed once. And, so the only thing that goes on during the summary is the merging of the sketches across that window.
Here’s an example of an actual system that we built, Showing how this can be done. We had a product called Flurry and Flurry’s customers were mobile app developers, and we wanted to be able to provide real-time statistics to the mobile app developers about how their users are using their apps and what the user experience is with their apps in real-time. And so on, the top part of this diagram, you see we had a continuous stream of events coming off the web, off the internet for users that are then split and partition by storm in real-time into, what’s basically a large time window. It had a 48 hour history with one minute resolution of the sketches. And then the query process then chose across many dimensions and across time what results they wanted to compute for the users. And it was sent immediately to the user’s browsers. Now, the reason it was 15 seconds, and not even less is because we didn’t wanna overload the browsers. So we basically gave updates to the browsers every 15 seconds. And so the freshness of this data that the users actually experience was within 15 seconds of real-time. Down at the bottom, we show that we were able to integrate this with a longer term processing that was performed by Hadoop, Hive in the back end so that the user could make a query. I wanted to see a summary of the last week or last month or last year of how my data looks over that period of time and integrate it with the real-time process. And so both could be achieved really with both within the same process.
Last major win is lower system costs because the processing of these sketches is so much simpler than trying to do it brute force. That what you can see here is that the actual system compute cycles, which exactly translates into hardware costs, was a factor of four smaller. And this is not just for answering one query. This is the overall system, which was answering many other Dyke types of queries at the same time. This is the entire product Flurry was offering to its customers. So it went from before sketches in terms of virtual core seconds, went from 80 billion core seconds per month down to 20 billion per month. So it’s 1/4 the amount of resources to provide answers in matter of seconds where before sketches, it was virtually impossible to provide anything close to real-time.
So now I wanna talk a little bit about the Apache DataSketches Library.
Here’s our team, it’s small but we have a number of distinguished scientists that are working with us that do the heavy lifting in terms of the mathematical analysis and are all published in major publications, and scientific publications about these algorithms and an engineering team that I like to say, our team is made up of engineers that love science and scientists that love engineering.
So it’s a nice combination. And we have a number of consultants from around the world, actually, that also very interested in this project. What’s good is it allows the scientists to see their theoretical algorithms actually implemented and used in real life.
Our mission for our project is to combine deep science with exceptional engineering and to develop production quality sketches that address these difficult queries.
The Apache DataSketches Library has around five or so major families or family groups. Different types of sketches. And in the cardinality area, which is counting number of uniques or distincts in your data. We have the well known HyperLogLog, HLL sketch of Flajolet Martin, as well as what we call a CPC sketch, which is really a much more recent sketch that was developed by our team that actually beats the accuracy per space performance of the HyperLogLog sketch. We have Theta sketches that allow set expressions, union, intersection, difference, and they also operate on and off heap. And Tuple sketches, which are associative sketches which really, the tuple sketch produces a small data table as its result, and allows you to do more extensive analysis. We have three flavors of Quantiles sketches, two that are in the library now, and one under development for doing quantile in histograms, PMFs, CDFs kinds of queries on your data. And we have two major families in the frequent items, heavy-hitters sketches, both weighted and unweighted frequent items, as well as they frequent directions which is a approximate singular value decomposition analysis, which is a vector sketch. We have other sketches for doing sampling, reservoir and VarOpt, which is Edith Cohen’s famous optimal variance analysis of weighted sampling. And we have some, a number of specialty sketches. We’re doing things like customer engagement, frequent distinct tuples maps, and so on. Now all of these sketches we’re developing to be compatible across C++, Java and Python with a binary compatibility. Which means you can say, do your back end analysis with Java and have a front end analysis engine or query engine based on C++. And they’re compatible across both.
There’s a bright future for sketching, technology and solutions. What you see in this table in the red items or items that we’ve already implemented in our library and other areas that are in black or in the future. So there’s a lot of areas where we can make contributions using this technology, and we’re working very much with our science teams to develop the theory in these areas where we can actually realize these in our library.
Thank you. I wanna make an open invitation for collaboration in developing this library. And we would very much like to hear from you.
Lee Rhodes is a Distinguished Architect at Verizon Media (Yahoo). In 2012, Lee started the DataSketches project, which has been widely adopted into many of Yahoo's data analysis systems. In October, 2015, the DataSketches project was open-sourced, and is now being migrated to the Apache Software Foundation as a top-level project dedicated to production quality sketch implementations. Lee's education background includes MS EE from Stanford and a bachelor's degree in Physics. Lee has been awarded over 15 patents and a co-author of some key papers in the field of streaming algorithms.