Real-Time Forecasting at Scale using Delta Lake and Delta Caching

Download Slides

GumGum receives around 30 billion programmatic inventory impressions amounting to 25 TB of data each day. Inventory impression is the real estate to show potential ads on a publisher page. By generating near-real-time inventory forecast based on campaign-specific targeting rules, GumGum enables the account managers to set up successful future campaigns. This talk will highlight the data pipelines and architecture that help the company achieve a forecast response time of less than 30 seconds for this scale. Spark jobs efficiently sample the inventory impressions using AMIND sampling and write to Delta Lake. We will discuss the best practices and techniques to make efficient use of Delta Lake. GumGum caches the data on the cluster using Databricks Delta caching, which supports accelerated reads, reducing IO time as much as possible, and this talk will detail the advantages of Delta Lake caching over conventional Spark caching. We will talk about how GumGum enables time series forecasting with zero downtime for end users using auto ARIMA and sinusoids that can capture the trends in the inventory data, and will cover in detail AMIND sampling, Delta Lake to store the sampled data, Databricks Delta Lake caching for efficient reads and cluster use, and time series forecasting.


 

Databricks testen

Video Transcript

– All right, welcome on to Spark + AI Summit 2020. My name is Jatinder Assi. I manage data Engineering team at GumGum. My co-speaker is Rashmina Menon. She’s a Data Engineer at GumGum. And today we’ll be talking about real-time forecasting at scale using Delta Lake and Delta Caching.

Alright, so let’s get started. Just a quick introduction about GumGum. So we are an AI company. We’re located in Santa Monica, California.

Our Divisions

So our divisions are advertising, where we leverage our computer vision and NLP technology to detect unsafe text and imagery, allowing us to deliver ads in a brand safe and a contextually relevant environment.

Our second word goal is Sports Valuation where we help marketers and right media holders capture the full media value of sponsorship across broadcast TV, streaming and social media.

The agenda we will cover today is a Programmatic Inventory Intro just high level overview, where the Programmatic Inventory looks like and the scale that we’re talking about. Then we’ll talk about the Architecture for real-time forecasting. And then we’ll talk about Data Sampling approach that we took. Then Rashmina will take over. She will talk about Search and Forecasts Application, Data Caching with Delta and Forecast Accuracy.

Programmatic Inventory Intro. So let’s talk about what is an Advertising Inventory.

Advertising Inventory Real estate to show potential ads in a publisher page

So it’s a real estate to show potential ads in a publisher page. An ad can be of different forms and different formats and it appears across different formats like mobile, desktop and web browser.

Programmatic Advertising Ecosystem

So let’s talk about Programmatic Advertising Ecosystem. So it’s a technology ecosystem to automatically buy and sell targeted online advertising in real-time.

So on one side of the spectrum, we have publishers. These are some of the publishers that we work with or who produce quality content.

On the other side of the spectrum, we have advertisers. These are some of the brands that we work with, who are looking for cost-effective ways to place media buys.

All the advertisers integrated with DSPs. These are Demand-Side Partners. These are some of the popular Demand-Side Partners that GumGum works with. These are the partners who will actually participate in the bid on behalf of the advertiser.

All the DSPs work with ad exchanges. GumGum is gonna be our exchange where the auction will actually happen.

So during the auction, a publisher will make their inventory available in that auction, all the DSPs get to bid on their inventory. Whoever wins the bid, will actually gets to display the ad.

Why Forecast Inventory?

Alright, so let’s talk about why Forecast Inventory. So our sellers are trying to set up ad campaigns with certain targeting rules and would like to know if GumGum’s publisher network has enough inventory to fulfill it. We also would like to provide faster response time to the forecast to allow our sellers to iterate, propose and sell ad campaigns faster.

So let’s look at some of the scenarios. So here’s a scenario where we want to forecast inventory available in US in cities Los Angeles and San Diego from premium websites for the next 30 days. So that’s one of the scenario. Another scenario could be forecast inventory in US and Canada or pages related to sports and entertainment category, targeting males of aged 25 to 40.

Inventory Forecast At Scale

So let’s talk about the Scale.

As I mentioned, so in order to forecast inventory, our goal is to be able to set up campaigns for success by generating near real-time forecasts on the inventory.

So the scale we’re talking about here is we roughly get 30 billion plus Programmatic Inventory every day. We have Programmatic Ecosystem that I mentioned. That amounts to about 25 terabytes compressed data per day.

And our goal is to be able to provide average of forecast response time of 30 seconds.

So let’s now jump into the Architecture.

All right, so all the data for inventory is sitting in S3 in Raw format. All this data is in Avro format. The first module is data transform. So this is a Spark module, which we’ll read every day, 25 plus terabytes of data apply. Apply transformation on Azure, which is pretty much cleaning, prepping and then applying business rules to transform this data. Once the transformation is complete, we will actually run the sampling algorithm in Spark in a distributed fashion. We will significantly reduce the amount of data.

The next module will be enrichment. So after we have samples from on a daily basis, we will enrich the samples. By enriching that data via DynamoDB and MySQL datasets. Offer enrichment, all the data gets reduced to after sampling enrichment, the data gets reduced to roughly 1.5 Gigs per day in delta format will store all that data in Delta Lake on a daily basis. Rashmina will go on details why we chose delta format, and Delta Lake.

Architecture

All this is encapsulated in a Daily-Data Pipeline. All these modules are Spark modules and they run on a daily basis in airflow.

So let’s say a user is ready to forecast with their targeting trick here. So they will go to our internal dashboard. The internal dashboard internally Will make a search query. It’s a set Spark module which will search the samples based on the user Cartier or pass 365 days. The Search module will Search Samples for past 365 days. Once we get the filter samples, then we will apply that to a Forecasting Model. This is an AR model running on Spark driver, which means I will go and detailed which model you end up using for time series forecasting. Once we apply the forecast model, we will get the forecasting results and we will display to our user in the internal dashboard.

So end to end real-time forecast with this architecture, we are able to get within 30 seconds response time on the 25 terabytes of data we process every day reducing to 1.5 Gigs and running this forecast for past 365 days. We can get response time within 30 seconds.

Alright, so let’s talk about Data Sampling now.

Why Sampling?

So, why sample? That’s a pretty obvious question. So we don’t want to waste a lot of compute in processing all of the inventory for past 365 days, which will be north of nine petabytes of data. And also it will be hard to attain 30 seconds of forecasting response time even the most optimized forecasting model.

So instead, we can pre-process it, the impressions per day using the distributor sampling approach to capture most relevant subset of the inventory population. So what I mean by that is the big circle here is actual all the inventory and the small circle is capturing relevant subset of the inventory and that’s our sample data set.

Sampling Approach

So the Sampling Approach.

So let’s say our user wants to query a data, usually an on Sample Approach, you will go to a Base Data, query the Base Data and you will get the Exact Results. In case of sampling, we will generate a Sample Data which is read from based here using a sampling algorithm. So when the user wants to Query, they will Query the Sample Data.

And once a user queries the Sample Data, we will use an Estimator which is a Scale-up factor which is also generated as part of Sample Data. You will use this Estimator to scale the results back to the original data set. So a user queries the Sample Data, we use Estimator to scale the results up. So the results will be relatable to the exact results but we will get approximate results which will be relatable to exact results.

Types Of Sampling

So let’s talk about Types Of Sampling. So the most common form of sampling is Uniform Sampling. This is where there’s an equal probability of selecting any particular item. The problem here is it will be biased towards commonly-occurring items so which is not great for frequency cap.

Frequency cap is one of the key factors in how GumGum serve ads where we don’t show ad to the same user frequently.

Our criteria could be a set frequency cap once an hour, once a day. We weren’t sure same user and adds more than once a day or more than once an hour That’s a frequency cap. So we end up using a slightly modified version of this, which is Distinct Item Sampling. So in this approach, we will still sample uniformly but from distinct items to support the frequency cap use case. So in this case, distinct item in our use case, would be, we can identify our distinct user by their user ID hash of user.

So algorithm I’m using here is augmented min-hash distinct item sampling, where we will keep up to

AMIND Samping Daily Job

Alright, so let’s talk about the Sampling Daily Job.

So the sampling is paralyzed for every single hour individually and then separate hours are combined to form our daily sample. So let’s see how it looks in action.

So we have all these orange bucket as our raw data. Our real data sitting in S3. Then in Spark, we will paralyze all these our processing of all these hours in parallel and generate sample hours. So all the red buckets are actual sample data for every single hour with up to M distinct hash values.

So then we will group by all these, our real samples by the hash values and generate a pre-daily sample.

Once we have the pre-daily sample, then we’ll sort and click up to M smallest hash values to generate a daily sample. And also we will generate a Scale up factor using this algorithm which will be used later on to relate the result of sample back to the original data set.

AMIND Samping Daily Job parallelized pet haus and hourly Samples are combined

Alright, so the next step will be Rashmina. She will talk about Search And Forecast Application.

– Thanks Jatinder. So I’m going to dive right into Search And Forecast Application.

So let’s take a look at the (indistinct).

Search And Forecast A

of certain forecast application from left to right. So on the left, we have a user for submitting a forecast request from dashboard. There is a thin layer of API, which connects the user to the Search and Forecast Application. The functionality of this API layer is to submit the Spark job using the Databricks Jobs API, to get the results back from the Spark job and give it back to the user. Let’s talk about the Search and Forecast Application.

So the Search and Forecast Application, as the name implies, has two components in it. Search application and forecasting application. I’m going to talk in detail about each of these applications. But search application and forecasting application put together has to complete within 30 seconds. That’s the SLA that we are bound to achieve. So let’s zoom into search application.

The goal of this application is to read past 365 days of sample data along with the multiply data.

Search Application

Filter the sample data based on the user inputs and then generate the time series of the form impressions per day. But number of impressions is number of sample impressions times multiple. So here comes an important question, where does a search application reads the data from? Which takes us to the next slide, Data Caching.

To Cache Or Not To Cache?

The important question here is whether we should cache the data or not cache the data? And if we are indeed caching the data, what is the technology that we should use?

So suppose that we’re not caching the data, suppose for every single forecasting request, we are reading the data directly from S3, of course. We are not going to have too much past but this is going to be terribly slow. We cannot guarantee on the SLA. So we got to cache the data somewhere. We are working on Spark, so we all know Spark works great with in memory cache. This is going to be lightning fast but you The target here is that we’ll need a humongous cluster to cache the entire data in memory, especially for the scale up that we operate in. Also remember that sampling pipelines are running once a day. So we’ll have to refresh this in memory cache daily, which is a bit tedious. So what’s the alternate choice we have? The obvious choice is discussion. This as we know is cheaper than memory. Using discussion also means that we could utilize memory for compute. Which is great. But coming to the point, it’s not very efficient. This so, that if we were to use disk cache, we will at least need 35C for direct slash nodes. The question is, can we do better? Can we reduce the number of instances so that we can reduce the cost? Now similar to in-memory caching, disk cache also needs daily refresh. So let’s come to the question, can we do better than discussion? The answer is yes. Although this is (indistinct) with Delta cache.

Delta Lake

Let’s refresh about Delta Lake. Delta Lake is an open source storage layer that brings ACID transactions to Spark. The basic idea is that the Stream application and Batch application can write the Delta Lake. You can use the cloud storage, whichever you’re using, probably you’re using AWS S3 and Analytics and Machine Learning applications can read seamlessly from Delta Lake without having to worry about the ACID properties on LinkedIn. Delta Lake works on the notion of transaction log. So the transaction log in Delta Lake keeps a record of every single transaction that’s happening on Delta Lake. So when the sampling data pipelines are writing the Delta Lake, the tables are already aware that there’s already new data available and it will refresh the underlying tables automatically, which is great.

Delta Lake (Contd.)

Well, the Delta Lake is pretty straightforward. Generally, when we write data frame, we specify the format option which we are using, whether it’d be package JSON tsp. We do the same for Delta Lake. We specify delta as a format. Now that we know what format we’re using, let’s come to the question, what’s the caching layer that we’re able to use? The caching layer that we are going to use is Delta caching on Databricks.

Delta Cache

Delta caching on Databricks is basically disk caching but it supports accelerated data rates by creating copies of remote files in nodes local storage using a fast intermediate format. So this fast intermediate format is faster than disk caching.

Delta caching is enabled by default on i3.xlarge instances that which we are using for our use case. Now, once we start using a cluster, the delta caching is enabled and we start reading from Delta Lake. We can see interesting statistics under the storage tab in the spotlight. We can see how much data is read from S3, we can see how much data is read to the cache, how much data is getting repeatedly read and so on. So all these parameters will later help us to tune the application.

Let’s look at the common commands that are getting used to create the Delta Table and to enable the Delta caching. So creating a Delta Table is very straightforward. We can use the CREATE TABLE statement. We can point to the S3 location and get done. We have the samples Data Table created.

Now to cache the Delta Lake onto your cluster, we use the cache command, followed by the select statement. Your cache in the sample sample data, which we get for the last 365 days and hence the word class.

To refresh the cache associated with the table, we use the REFRESH TABLE statement.

Now let’s look at the common commands that we use to run maintenance operations under delay.

All important commands is optimize command. Now for all of us who work on Spark, we know how important it is to get the file size correct. How important it is to get the number of files correct. Optimize command comes really handy for this particular use case because the command will compact the files to one GB file size. We can also specifies ZORDER which will collocate same information on same set of files.

Now to delete the already compacted files, we can run the backup command.

Analyze Table command is also equally important. Analyze Table command will help the query analyzer collect statistics about the table which will help the query to be performing. Now how often you run these commands depends on your use case, But for us, we run these commands every single day.

To Cache Or Not Ta Cache? zz

So coming back with the question, whether to cache or not cache, we have an answer. We are using Delta Lake, the Delta Cache. So since this Delta Cache is similar to Disk Cache but increased performance, we get an opportunity to utilize the memory for compute. This is also the least expensive option that we nailed down. So to remember to remind you, that disk cache we were using 35C for direct slash nodes. Now, the Delta Lake and delta caching, we could reduce this to 20 ITF slash nodes. So from the cost perspective, this works great. At one caveat is that the warm up queries can take longer if you’re not refreshing the cache.

Search Application (Cont.)

So this is how the basic search application architecture looks like. Such application as a Spark shop, which reads the data which is cached on cluster. The caching layer is basically Delta caching on Databricks. The data format which we use is Delta Lake and the Delta Lake data is stored on S3.

Let’s revisit the entire workflow once again for search and search application.

So the first step is to basically filter the sample data for the user invoice. So the basic inputs the chooser provides a date product and data which are partition keys for our S3 data. So we do first level of filtering here by applying the part that were used by applying filter on the user inputs based on the partition keys.

The second step is to further filter this data based on other user inputs. Examples would be Country, BrowserType, 3rd party segments, Page categories and so on. We have 15 to 16 user inputs that the user can enter through the dashboard.

The last step is to aggregate this and generate the time series data. So if frequency cap is present, the can number of impression volumes are at the frequency gap. Later, we aggregate this data to build the time series of the form impressions per day.

We multiply the sample number of impressions with the multiplier to get the projected impressions. So this is all about the Search Application. Let’s move on to the next part of the Search and Forecasting Application, which is a forecasting application.

Forecasting Application

The goal of the forecasting application is to forecast a time series for the next N days based on the time series trends in past 365 days. So we know that the search application is spitting out a time series for the last 365 days. Our goal is to generate the forecast for next N number of days.

Forecasting Application (Cont.)

Our time series forecasting model is written in R and we use a ARIMA as the base model. ARIMA is nothing but AutoRegressive Integrated Moving Average. It’s a very popular and common time series forecasting models. It is used to describe the autocorrelation in data. And the variant which we are using is a non seasonal one. But here comes an important question. The kind of targeting rule that a user can enter for a campaign A will differ a day before a campaign B. So how do we use same ARIMA model for different trends?

So the question which we’re trying to answer is, how do we generate different ARIMA models for different user inputs which result in different time series. For this, we use AUTO ARIMA. So AUTO ARIMA finds the best ARIMA model for a given time series. So for a given time series X, if the targeting routes are entirely different from another time series Y. We’re getting different models because AUTO ARIMA is finding different parameters to fit both this time series.

Let’s look at the other models which we are using for our forecasting. For all of us, an app that we know that trends very drastically from quarter to quarter for active. Content which is very important for us is quarterly trends, The data which we have or the traffic trend, which we have in Q1 is entirely different from the main competitive data in Q4. So quarterly trend is very important. We capture the quarterly trend using sinusoids. We also capture the VP trend using sinusoids.

So, the AUTO ARIMA, weekly trend, quarterly trend put together combines the basic forecasting model that we are using. We run the forecasting model on the driver node and our average forecast execution time is less than two seconds. So you can imagine that within the 32nd SLA, the most time is taken by the search application because a forecasting application on its own was really fast, even if it runs only on the driver node.

So suppose the black line here represents the actual and the blue line represents a forecast. How do we know if the forecast is actually good enough. Of course, we have to measure the forecasting accuracy.

Measure Forecasting Accuracy

We compute the mean absolute percentage error for pre-defined forecasting requests. As I mentioned previously, we have 50 to 60 targeting tools that the user can enter through the dashboard. So it’s very difficult for us to generate all the permutation and combination of the inputs that a user can enter. So we predefined certain forecasting requests which are very common from ourselves. And we compute the mean absolute percentage error for these forecasting requests time over time. So the mean absolute percentage error is defined as an error over actual. So A here stands for actual, F stands for forecast. We compute actual minus forecast, which is the error, we divided by actual and we get the average for and observations and that’s mean absolute percentage error. So we compute mean absolute percentage error every single day for these predefined forecasting requests. And we ensure that this doesn’t go above the thresholds that we want to achieve. And for now, we have been, we have never got alerts for the longest time, so we know that the model is really functioning well. So that’s all about our forecasting application.

Thank you for listening to our presentation.


 
Databricks testen
« back
About Rashmina Menon

GumGum

Rashmina Menon is a Senior Data Engineer with GumGum, which is a Computer Vision company. She's passionate about building distributed and scalable systems and end-to-end data pipelines that provide visibility to meaningful data through machine learning and reporting applications.

About Jatinder Assi

GumGum

Jatinder is data engineering manager at GumGum. With 10+ years of experience with software design and development, Jatinder has spend last 5 years with focus on scalable distributed data processing systems and engineering management.