Funnel Analysis with Apache Spark and Druid

May 26, 2021 12:05 PM (PT)

Download Slides

Every day, millions of advertising campaigns are happening around the world.

As campaign owners, measuring the ongoing campaign effectiveness (e.g “how many distinct users saw my online ad VS how many distinct users saw my online ad, clicked it and purchased my product?”) is super important.

However, this task (often referred to as “funnel analysis”) is not an easy task, especially if the chronological order of events matters.

One way to mitigate this challenge is combining Apache Druid and Apache DataSketches, to provide fast analytics on large volumes of data.

However, while that combination can answer some of these questions, it still can’t answer the question “how many distinct users viewed the brand’s homepage FIRST and THEN viewed product X page?”

In this talk, we will discuss how we combine Spark, Druid and DataSketches to answer such questions at scale.

In this session watch:
Etti Gur, Developer, Nielsen
Itai Yaffe, Principal Solutions Architect, Imply

 

Transcript

Itai Yaffe: Hey everyone. Etti and I are very happy to be here at the Data+AI Summit 2021.
But before we start, I want to share some numbers with you. So digital advertising is a multi-billion dollar industry. In 2019 internet advertising spending worldwide was over $290 billion. Apple spent over $100 million on iPhone and TV+ advertising during September and October 2019. And GM said to be shifting significant dollars to connected TV advertising. So huge amounts of money are spent each year on digital advertising campaigns, that’s why everybody wants to measure the campaign efficiency. But how do you do that? Glad you asked. This is going to be the focus of our talk today. Funnel analysis with Apache Spark and Druid.
So, now I can introduce us. My name is Itai Yaffe, I’m Principal Solutions Architect at Imply and, previously, a Big Data Tech Leader at Nielsen. I’ve been dealing with big data challenges since 2012.
Co-presenting with me is Etti Gur who’s a senior big data engineer at Nielsen, and she’s been building their pipelines using Spark, Kafka, Druid, Airflow and more. You can reach us both over Twitter and LinkedIn.
So a little bit about Nielsen, and specifically Nielsen identity, which is the division that Etti works for. So Nielsen is a data and measurement company famously known for measuring the TV ratings in the US, and generally speaking, measuring media consumption. Now, Nielsen Identity has the unique role of unifying many proprietary data sets in order to generate a holistic view of consumers. We are talking about anonymous data, of course, and sales as a single source of truth of individuals in households for the broadened list of products in order to provide better and more accurate insights.
Nielsen Identity, some numbers about us. So there are over 10 billion events flowing into our main Kafka cluster every day. Those events are stored in our entry based data lake, which stores 60 TB of compressed data every day to a total of 5 PB. We process the data using Spark, and we launch 6,000 nodes/day. And finally, we ingest 10s of TB of data every day into Druid, which is our all OLAP data.
So you can probably imagine that this massive infrastructure comes with challenges and mainly scalability, fault-tolerance and cost efficiency. So what are we going to talk about today? We’re going to talk about how to overcome the technical challenges of funnel analysis using three Apache tools, which are Apache Spark, Druid and DataSketches. And why should we even care?
So we are talking about advertising campaigns and we’ve all been there, right? Browsing the web clicking different advertisements. So from the user’s point of view, the four main campaign phases. So from left to right, we have the awareness phase where our user becomes exposed to a campaign, for example, by viewing an online ad. Then there’s the consideration phase where interest is expressed, for example, by clicking an ad. Then we have the intent phase where actual steps are taken towards making a purchase, for example, adding a product to the cart. And finally, there’s the purchase phase. Internally we divide those into two. So the awareness phase is called the tactic, because we essentially have different ways or different tactics to advertise our points. And the other phases are called stages, and we’ll touch base on that later.
Now from the campaign owners point of view, are we talking about massive amounts of users, so maybe millions of billions of users reach the awareness phase, meaning they are being exposed to a campaign. Then after a significant drop off, we have the consideration phase. After another drop off, we have some users reaching intent phase. And finally, if we were lucky enough, we have some users that actually purchase the product.
Now, why is it even called a funnel? So take this made up example, let’s say that 100 million unique users were exposed to an ad. And we’ll talk about what’s the term unique users later. But, say 100 million of those users were exposed to an advertisement. Then after a drop-off, 15 million of them reached the homepage, 10 million reach the product page. And finally 3 million, reached the checkout page. So visually speaking, this resembles a funnel. Now we need to analyze the funnel, hence funnel analysis.
And we mentioned unique users. So let’s see what that means. Here’s another made-up example of an eCommerce website. And you can see, we have seven views across different pages in this website, but those views were made by only two unique or distinct users. And that led to only two purchases. So you can see that it doesn’t necessarily mean that the more views we have, the more purchases we have. That’s why campaign owners and website owners care about measuring the number of unique or distinct uses. Now measuring or counting the number of view is a relatively easy task, technically speaking. But measuring the number of unique users is more complicated, and we’ll talk about it in the next slides.
So remember this guy, everybody wants to measure the campaign efficiency, but how do we do that technically? So, first we need to collect a huge stream of events or what we call user activities while the campaign is live, while the campaign is in focus. Then we need to map those events to final stages. Remember that we said that an ad exposure is a tactic and the other phases are called stages. Finally, we need to provide insights quickly so campaign owners can take active measures toward making their campaigns more efficient.
Now you probably think, “Well, there are existing alternatives, right?” That’s true, but there are some caveats or some features in those off-the-shelf for companies. So some of them are lack scalability or have limited scalability. Some of them lack access to their raw data. And some of them, when it comes to count-distinct operations, or basically counting number of unique users, do it very slowly. Luckily for us, we have Apache Druid.
What is Druid? It’s a real-time analytics database, which is OLAP engine, very fast, scalable time series, et cetera. Now it’s really cool because it can store trillions of events and petabytes of data. It can self sub-second analytic queries, it’s highly scalable and cost-effective, and it has a decoupled architecture as we’ll see in the next slides.
Now I mentioned that Druid is a time series database. So it partitions the data, first and foremost, by some kind of a timestamp column like this timestamp column here. Other types of columns are… They mentioned column like this website column, where you can see the real data by. And a filled type of problems or metric columns, where you can aggregate your data by. All those types of columns comprised of what we put in Druid, a data source, which is basically an equivalent to a table in a relational database. Druid has a very powerful feature that’s called Roll-up. If you think about Druid, we are talking about, aggregation queries or an OLAP the engine. So we don’t really care about the whole data, but rather we want to have the final result. So in this example, we want to know how many views our website a.com received for a given date.
So what we’ll do, at ingestion time, we’ll use a certain set of aggregators in order to… Doing ingestion by meaning, while loading data into Druid, we will aggregate the data. And the data will be stored in Druid it in a much more compacted, smaller way, which is only the final result.
Now we mentioned that Druid has a decoupled architecture. So let’s talk about that. Basically, Druid has three types of processes. It has processed to ingest the data, it has a process to query the data, and it has processes to manage all that orchestration. Talking about ingestion, we can ingest data in real-time, for example, through Kafka into what is called in Druid Real Time Nodes. We can also ingest data in batch using net produce jobs, for example, into a component that’s called Deep Storage, which is essentially kind of a storage like SVI or HTFS. Now, as I mentioned doing ingestion, we’re doing some kind of Roll-up or aggregation, and we store the data in propriety file format, what we call segment files in Deep Storage.
Those will be loaded in a background process into a second set of nodes that are called Historical Nodes, because obviously they serve historically. Now, once a client comes in and wants to query the data, it can either use the native query language, which is JSON based or send a sequel query into a third type of node called Broker Nodes. So a Broker will receive the query, pass on the relevant sub-queries to the relevant nodes holding the parts of the data, and upon retrieving the partial result, it will make the final aggregation and return the result to the client. And some of you may have noticed this is basically an implementation of the Lambda Architecture.
So Druid is used by data companies of all shapes and sizes like Airbnb, Netflix, the Israeli AppsFlyer, Nielsen, of course, and Imply which was founded by the original creators of Druid. It’s also being used for many different use-cases like clickstream analytics and funnel analysis, network performance, or application performance monitoring, IoT use-cases and OLAP, and a lot more.
So in a nutshell, Druid is a real time analytics database, which is time-series in a columnstore. It can just ingest and store trillions of events and serve analytic queries in sub-second. It’s highly scalable and cost-effective, and it’s widely used among data companies for different use-cases. And specifically the funnel analysis use-case, which is the focus of our talk today.
Now, why is Druid suitable for the aforementioned task? Because as we’ve seen, it’s highly scalable, it can store trillions of events, and with regards to count-distinct operation, it can do sub-second approximate count-distinct with set operations.
Let’s explain what that means. So, sub-second is obvious. Approximate means we’ll estimate the number of unique elements from a given set of events. Set operations means union intersection between attributes or between sets. So for example, a query I’d like to give is, “How many unique devices were use by females in the US that were interested in technology?” Druid does all that using the Theta Sketch module, but we didn’t mention the ThetaSketch module, right?
So let’s see what ThetaSketch is. So essentially it’s a mathematical formula or a generalization of the KMV algorithm or K-means values. K stands for the number of samples we will take from an incoming stream of events. So, we will pre-define the size of the sample, we will take from the incoming set of events. And minimal values refers to the process of selecting those case samples. Now, using that simple, we can estimate set cardinality, or basically estimate how many unique elements are in that set. And it also supports set-theoretic operations such as intersection or union.
Now I mentioned that ThetaSketch is an approximation algorithm, so it comes with a built-in error rate. But you can see that the bigger, the K value, the more accurate result we’ll get. Now, obviously it’s trade-off because the larger the K, meaning the more samples we need to take from the incoming set, the more memory and storage we will need to handle that. But you can see that 4K equals 32,000, we’ll get around 1% error rate which is really good for huge data volumes.
Cool. Time for demo. Cool, so let’s take a very low K, K=4. And if you will follow this error rate box here, you can see that it’s going to be very high, right? It’s going to be dozens of percents. Now let’s take a bigger case, so K=4,000, and we’ll follow the same error rate box. And you can see that the error rate is about 1% or 2%, which is really good. So I hope that gives you some kind of intuition about how ThetaSketch works.
Lucky for us, there’s an actual implementation of ThetaSketch in Druid, which is a part of the awesome Apache DataSketches library originally created by Yahoo. Now at ingestion time, what happens is sketches, which are data structures are created, and stored in those Druid segment files that we mentioned earlier. At query time, those objects, those sketches are aggregated, for example, union intersection will difference between sketches and the result will be the estimated number of unique entries in the aggregated sketch. So going back to the same example, we can estimate how many unique devices such as smart phones, tablets, et cetera were use by females in the US that were interesting technology. We will share the slides later, but in this video, you can see my colleague talking about ThetaSketch and Druid.
Now, just to kind of go back to how do we do Roll-up with specific count distinct in Druid. So, taking the same example, now we want to count how many unique devices viewed the a.com website for given date. So we will use the ThetaSketch aggregator, which is just a different set of aggregator. And you can see that this device is actually been thrice. So we only need to count as one time, as one with device. There’s another device for the same website for the same date. So we will store the number two, because two unique devices viewed this website in the given date. Now, obviously we cannot store the number two itself because that won’t allow us to set the auto-progressions like we mentioned, right? So what we actually store in Druid is a ThetaSketch object. And the actual result is calculated in real time. Meaning once the query is executed and that allows us to do unions and intersections.
Now Etti will walk us through how to leverage all those tools in order to build funnel analysis pipelines.
To you Etti.

Etti Gur: Cool. So back to funnel analysis. The simple use-case of funnel analysis goes as follows, how many unique users viewed an online ad versus how many unique users viewed an online ad and also viewed a specific product page.
So this is a screenshot from our system that we give our clients, and obviously the numbers are made up, but let’s look at an example. Here we can see a tactic is some kind of an online advertisement, and we have around 8 million unique users viewing that ad. And all of these users, 3,100 unique users reach the homepage. So let’s see, how are we doing technically.
This is our data pipeline. And it starts with our S3 based data lake. Holding the raw event, data partitioned by date and since this data is quite huge as Itai said, 5 PB in total and 60 TB compressed data per day, we are going to use Spark to process it. So periodically, once a day, we run a Spark application called the Mart Generator that reads the files of the last day. There’s some kind of transformation to this data and creates data marts for all the campaigns that are currently running.
I just want to mention the term data mart is meant to describe a projection of the raw events, and it can be a subset or a transformation or aggregation. So these data marts are then written back to S3, partitioned by campaign and date. After that we have another Spark application called the Enricher, which reads the files per campaign enriches the data and writes its output back to S3, partition by date and company. And finally, we load this data per campaign to Druid, our OLAP database.
So I want to take a closer look at each of them components. So, as I said, our raw event, data is partitioned by dates and we have stream of events. And this is a simplified example, of course, we have billions of events. So we have the event time column, which is a timestamp, the user ID, which is the device ID that performed some kind of action, the attribute, which is the action that’s been made. And it can be viewing an online ad or clicking into a homepage or a product page, et cetera. So after the first spark application is done, we have the results partitioned by campaign and date. And now we still have the event time, the user ID and the attribute. But now we map these events to the matching stages in the funnel. So an event for online ad will be mapped into a tactic, homepage will be a stage and a product page will be another stage.
So, after the enrichment is done, we ended up with this structure. We have the data partitioned by date and campaign. The event date will be now truncated from the time part because we only care about the day trends of our campaigns. We’ll have the user ID, and now we’ll have the tactic that’s been made and the stage that’s been made. So we can see a role of tactic being an online ad and the stage being a homepage and another role for tactic being an online ad and the stage being a product page.
The next stage is to load this data into Druid, and we are using Hadoop MapReduce Ingestion Desk. So the next few slides are describing the specification of this task. And I’m not going to go into the details right now, but basically what you need to know now. And of course we are going to share a slide so you can take a closer look. But what you need to know now is that we end up with the data and data sources in Druid.
When at that the source is like a table and we have a table per campaign. When the schema is the time column, are the dimensions columns are the tactic, and the stage. And the metric column is the user ID sketch, which is the ThetaSketch object created at ingestion time based on the user ID field. Once we have this data in Druid, we can then query it. So this is an example of a sequel query that returns the estimated number of unique users that viewed that the online ad and also viewed homepage between a certain net period of time for a specific company.
Now, I wanted to revisit to the simple use-case. And we can see we have a tactic of some kind of an online advertisement that has around 8 million unique devices exposed to it.
And now I want to talk about the next phases of the funnel, which are the homepage and the product page. And here we can see, we have 3,100 unique devices reaching the homepage, a drop off of 2,500 unique users and 1,000 unique devices in the product page. But if you’re paying close attention, you can see that these numbers do not add up. So let’s try to understand why this happens.
If we go back to what a funnel looks like, it basically shows we have many users exposed to an ad. Some of them continue to the home page, some of them continue to the product page and then to the checkout. But as we all know, being internet users is that you can access the product page directly from external sources, for example, Googling the product and not necessarily going through the previous stages of the funnel.
So now our use-case gets a bit more complex into how many unique users viewed an online ad versus how many unique users viewed an online ad first, and then viewed a specific product page. This is what we call a sequential funnel because the chronological order of events is important. The data pipeline will be very similar, but now we will have to take into account only the events that happened in the predefined order of the funnel. That way we can better represent efficiency of a specific tactic. We can really know which ad contributed to the campaign success.
So if we go back to our pipeline and try to see what we do differently this time, let’s look at an example of the raw event data. We have a user that’s at 9:15 in the morning is clicking into the product page. After that at, 10:10, the same user is exposed to an online ad, and one minute later the user is viewing the homepage. So you can see the user did not really follow the predefined order of the sequential funnel.
Our Mart Generator will take these events and map them to the stages of the funnel. So we can see, we have a stage happening after that tactic and after that a stage. So our funnel is a little bit broken. Normally what our Enricher would do is take these events and map them to the following roles. Having the event date, the user ID, the tactic will be the online ad and the stage will be a product page. And another role with a tactic being the online ad and the stage being the home page. But we all remembered that this user viewed the product page before he saw the ad. Therefore, we cannot really say that this ad contributed to the user going into the product page. So this event does not conform to the predefined order of the funnel. So we would want to discard this event and all the events that don’t conform and remain just with the events that do.
So this Druid query is quite huge, so I’m not going to read it now. But as I said, we are going to share our slide, so you can take a closer look later on. But it returns the estimated number of unique users for the drop off between the homepage and the product page. And now when we go back to our screens, after doing the changes in our pipeline, we can see that now the numbers are correct, and they do add up because we eliminated all the events that did not conform with the sequential funnel predefined order. And if we look at it in another way, from the funnel point of view, we basically discarded all the events of users going into a stage in the funnel and not going through the previous stages.
So that’s it for me. And now Itai is going to sum things up.

Itai Yaffe: Thanks Etti, for walking us through the simple and more complex use-cases for funnel analysis.
I want to share a few tips with you with regards to the technology stack we’ve just seen. So we described how you use Druid with ThetaSketch for fast approximate count distinct along with set operations, such as intersection union aggregation.
Sharing the same example, how many unique devices were used by females in the US that were interested in technology. Etti showed you how you can use Spark to pre-processing incoming events, which will allow you to take into account only events that happened in a predefine order of the funnel. You can check out our talk, optimizing Spark based data pipelines to get more tips on optimizing your Spark bypass. And finally, you can further optimize our ingestion process by writing ThetaSketch object directly from your Spark application and loading them into Druid using isinputThetaSketch flag.
So what have we learned today? We learned that funnel analysis is very important for advertisers because huge amounts of money is spent on it each year. We’ve also learned that it’s not an easy task to solve technically, especially if chronological order of events matters.
We discussed Druid as a very powerful tool for real-time analytics because it’s high scalable, can you just ingest and store trillions of events and serve and analytic queries in sub-second. And it’s been used for many different use-cases.
And lastly, Etti showed you how by combining these three Apache tools, so Apache Spark, Apache Druid and Apache DataSketches can help you mitigate this very complex technical task. So you can pre-process events before ingesting them into Druid and decide how to handle out of all the events.
Great. So just before we wrap things up, a few things we care about. Women in Big Data is a worldwide program that aims to inspire connect, grow, and champion the success of women in the big data analytics field. There are over 30 chapters worldwide, and everyone can join regardless of gender. So we really encourage you to find a chapter near you using the Woman in Big Data website.
We have a couple of interesting conference talks. So Yakir Buskilla, my colleague, and I are going to talk about Druid in Practice in the upcoming Berlin Buzzwords Conference in June.
There’s also a talk about Migrating Airflow-based Spark Jobs to Kubernetes that my colleague Roi Teveth and I gave in the previous Data+Ai Summit Europe.
And there’s also the Nielsen RND Tech Center Blog, and specifically a post about data retention and deletion in Apache Druid.
Cool. That’s it for us. We’ll be taking questions now. We really appreciate the time you’ve spent with us. And please don’t forget to rate our session in the Data+AI Summit website.
Thanks everyone.

Etti Gur

Senior Big data Engineer, with over 20 years experience in the software industry. In the last 8 years, I've been working as a senior big data engineer at Nielsen, building big data pipelines using Spa...
Read more

Itai Yaffe

Itai Yaffe is a Principal Solutions Architect at Imply. Prior to Imply, Itai was a big data tech lead at Nielsen Identity, where he dealt with big data challenges using tools like Spark, Druid, Kaf...
Read more