Zipline – A Declarative Feature Engineering Framework

Download Slides

Zipline is Airbnb’s data management platform specifically designed for ML use cases. Previously, ML practitioners at Airbnb spent roughly 60% of their time on collecting and writing transformations for machine learning tasks. Zipline reduces this task from months to days – by making the process declarative. It allows data scientists to easily define features in a simple configuration language. The framework then provides access to point-in-time correct features – for both – offline model training and online inference. In this talk we will describe the architecture of our system and the algorithm that makes the problem of efficient point-in-time correct feature generation, tractable.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Hello everyone, I’m Nikhil. I’m here to talk to you about Zipline. So I work for a team called ML Infra at Airbnb, and Zipline is a declarative feature engineering framework. That’s a lot to unpack, but we will do that in the next set of slides. So let’s take a step back and look at the end to end machine learning process. So a data scientist comes up with an idea then goes about exploring data for this idea across the company then tries to generate features for this business problem, and trains a model and this is a loop until the model is performant enough.

Now once this is done, the real difficulty begins which is production using this model.

So you need to serve this features and you need to also serve the model and wire it up with an application.

Now the simplistic view is that data scientist is a singular person that is responsible for this whole flow, but a more realistic and practical view is that there are teams of engineers with varying skill sets that enable this whole function. So, there is data engineer that is responsible for managing the data pipelines, then there is a data scientist, your math PhD, that is responsible for training a model, and then there is this new breed of engineers called ML engineers, but in reality they are just plain old systems engineers, people who deal with storage systems and serving fleets.

So the realization here is that a lot of the work that needs to be done is mostly plumbing this data around the actual machine learning model. So this is a image from a paper that Google had, which basically says that about 5% of the work or code is machine learning code and the rest, the 95% of it, is all non machine learning glue plumbing work. What’s the goal of bigger with this in mind? The goal here is to basically eliminate this glue code, the 95% of work and make it reusable across different use cases.

And in that sense, it tends to be General Purpose machine learning platform.

But today we are going to talk about Zipline just specifically for feature engineering and the idea behind Zipline is that making this process that is imperative, that is requiring data scientists to go and tell at each step how to do certain things, instead of that, turning it into a declarative specification. So users write a declaration of what their features look like and Zipline as a system is responsible for generating all the underlying data pipelines and more importantly, the real value here is reducing the time that it takes to make this flow, get this flow into production from months to days and we looked at three different functions that are required to do that work there and the goal, one of the goals here is to be able to enable data scientists to do all this work by themselves.

So yeah, so what’s the difficult thing about feature engineering? According to some surveys, about 60 to 70% of the work that is involved in getting a model into production is purely feature engineering, not the hard, difficult math work that you think would be the bulk of difficulty.

And another realization here is that good consistent data with a really simple model is always better than bad data or inconsistent data with a really, really advanced sophisticated model.

And just to confirm our bias here, this is a slide from Google machine learning project and what it says basically more than half of your products time is just featuring engineering.

Now, we are going to talk about this in the context of a larger product called Bighead.

Towards the end of the slides I have linked to various resources where you can learn about the big picture but today we are going to only focus on Zipline. And Zipline specifically deals with structured data. By that I mean records database, records your data in hive, your data in kafka etc and not image or audio or media data.

So we feel that the biggest problems exist in the structured data space and unstructured data space is a harder mathematical challenge but easier systems challenge to address and the rest of the talk is structured like so. We are going to talk about the problem statement, both for generating features for training and also serving these features to a model.

We are also going to look at what are the main technical challenges within these problem statements and we are going to describe a solution.

Current Approaches

So before we jump into that, let’s look at current approaches. So when you want to generate features for your model, a very common approach is logging and waiting, we are going to refer to this as login and wait from this point forward.

So if you want to add a new feature to this flow, you have to just log it. So basically your model is not seeing this new feature, but you’re just logging it for future and you’re waiting for enough time to generate enough experimental data. So what I mean by that is basically, you have a model that requests a feature that goes to the feature serving framework or feature surveying fleet and that in turn goes into service calls or like make some production database requests, but the log is more than the response, it has the response that is required for the model but also the new features that new data scientists or other data scientists want to add into their experiments. Now you wait for that to accumulate, generate training data and train a new model. And this is some sort of a continuous loop. Right now, the red arrow is what makes this whole process take months. So to capture seasonality right for businesses like Airbnb, at least you need at least a year’s worth of data. And depending on markets, it might vary and depending on use cases, there might be a seasonality requirement or not. But in general, we tend to train with at least six months worth of data. So the red arrow is basically the six months worth waiting.

Now there is a way to short circuit that red arrow and that goes into the next approach, which is basically trying to replicate the logic that the service calls and production database requests do by writing them manually as a ETL pipeline and we call the manual backfills. So the idea here is that you replicate the logic that is required to generate these features but it comes with consistency problems. Now you have this two sets of systems that is generating similar data that you need to keep in sync and very often, the synchronization will lead to data consistency problems. And the real problem here is that these problems are not visible unless you measure. Your model will happily taken this inconsistent data and produce predictions that are inaccurate. You won’t know until you have labels that your system is having a degraded performance and building a consistency check framework is pretty intense in most production settings, even at Airbnb, we haven’t seen people build a consistency measuring framework.

Now these are really, really hard to optimize. What I mean by that is we’ll see more about this later but this is the largest problem with manual backfills. The six month battle basically ends up taking weeks to months again, right, and it goes against this idea of making this feature engineering iteration very small and performing.

So between these two approaches, the trade off really is iteration speed versus effort and consistency. The previous log and wait approach basically gives you full consistency. It’s low effort, but you’re sacrificing speed for it. So it’s your engineering hours that’s being wasted. Now the second effort is, or the second approach requires a lot of effort to build something like that, to maintain a replica of system that produces this feature values offline, and also to keep the data consistent all the time.

So I think we have some idea of what makes feature engineering really hard but let’s drill in a bit more.

Within backfilling approach itself, one needs to deal with different kinds of data sources.

There is a data lake then there is data stream. So what I mean by data like is this s3 data that s3 or HDFS data that is cataloged by a system like hive and something like spark is used to run a ETL across these data sources, and there is streams of data that is basically kafka. And in Zipline in particular, we use flink to deal with streaming data, we will drill more into that later, and then there is live databases and services that you need to make requests to generate a consistent view of the feature.

Now, there is different models of accuracy even within this framework. So some features need to be accurate real-time accurate as in get the feature value as of now. So let’s say you’re predicting some, let’s say, you’re generating a search result on YouTube, the query you type in, will look at users history, and also we’ll look at the features related to the video that is being recommended. Now, the video recommendation features can be snapshot accurate, but the user history needs to be real-time accurate. I say that because your history of viewing and searching in the last five minutes is way more relevant than your average searching history like yesterday or viewing history like a year ago.

And to emphasize this point again, this particular thing to backfill is really, really hard to make performant or optimize.

So let’s look at a visual aid of sorts to give you an idea of what these different data sources look like.

So you have a service fleet and the production database that you users interact with and this generates Event Stream. So your click stream, your real stream, your search stream and this gets stored in a data lake something like HDFS or s3 that is cataloged by hive or spark. And similarly for the production database you have change data that generates the change log that again gets stored in the data lake. And additionally, you also have the database snapshots.

Now there is data that is generated by others. Teams are bought from external sources, which for this scenario, we’ll call this derived data or retail data or external data, they’re all interchangeable and then there is audio, image and video data. So as I said earlier, we are going to not talk too much about audio, video and image data, we’re going to deal with only data that is evolving in time rather quickly.

So this is a class of events. These are your fact tables. If you are familiar with data engineering, then this is the next class of data, which is your database snapshots. So this can also evolve in real-time. The observation here is that most companies or organizations don’t capture the change log as a first class citizen of the data warehouse, but for this to be evolvable in real-time, you will need the change capture log in the data lake and this cannot be real-time unless there is an equivalent stream somewhere but for the purpose of this discussion to simplify things, we’re going to deal with this as a non real-time, snapshot accurate data. So let’s start with an example. Let’s say I’m trying to predict the likelihood of you going to a particular restaurant. And for this particular example, it’s I’m choosing Indian restaurant. I’m going to make up some fake features to illustrate what the plan does.

So the first feature is the total visits to Indian places last month and the second one is average rating of this restaurant that I’m trying to recommend to you in the last year.

One observation here is that these are all aggregations.

What I mean by that if you break these things down, the first feature total visits to Indian places last month, it’s a count operation over the chicken stream of a visit in a window of one month, and similarly, the average rating of restaurant is a average aggregation on the rating column of the ratings table in a window of one year. So to give you an example, this is what the API looks like.

Feature Set Example

The aggregations are defined as this object called group by, this is all in Python and you can specify your data by using this concept called event source and you can point to the Event Stream and the table. So stream is the message bus counterpart of the event log and log is something that resides in hives.

And you can define your projections and filters and you can define what’s a key.

So in this case, the key is the restaurant and the aggregation is the average rating of disruption in the last one year.

And for the second example, it says database ratings table. So it’s a DB source, and you specify two things. So a snapshot table that exists in the warehouse and a change topic. So this is something that most companies don’t have but this is necessary to get the real-time pneus of this particular feature, the other alternative is making a service call. We will see in the future why that is not a good idea and similar to the last example, you specify what are the keys so here in this case, the keys are user and this particular choosing so we want to get the average rating of this user towards a particular key name. So let’s call this the keys affinity and we want to do this in a 30 day window.

And finally, you put these two sources that event log and a database table together into a training set. So this is basically a join.

And you specify what features go into this join. This is a simplistic example of what is actually supported in Zipline. Full example will be available when we open source. Now let’s try to visualize what happens in time when we’re trying to generate features in the warehouse.

So at some point in time, you will want to make a prediction and at a later point in time, you’ll have a label. So I recommended this restaurant to this user but did the user actually check in into that restaurant at a later point in time? Did my recommendation actually work? And then there are all this data sources, the chicken stream, the ratings table, etc, that keep changing in real-time.

And the feature values keep changing, and when I say I want the feature values at a point in time p one, I need the exact feature values at those points in time.

And the next feature request or the next time I tried to predict I need the evolved features.

And again, the label will materialize later. So this is a training data set that is used to generate a model but stepping back from machine learning, this is what’s called a temporal join.

So you’re aggregating and generating a time series drawing of sorts to get the training data and doing this in spark over long periods of time with large data is really, really expensive. And we’re going to talk about what techniques we use to optimize this particular join in the next set of slides.

So to reiterate features serving for inferences, what is the value of these feature aggregates now?

And there are basically two modes, one of them is having this features in real-time, which basically boils down to this map. So if you have an event log into Event Stream, you can generate real-time features. And similarly, if you have historical database snapshots and change data, you can generate real-time features. And just to state it again, this is not the case with all features applying supports, snapshot accuracy or midnight accuracy if you call if you will. First, for the features you don’t want the real-time accuracy. So the real requirements in feature serving is latency.

Feature Serving

When you’re doing this recommendations you will be, you’ll need to do them at a really, really low latency and so you need to fetch these features at a really low latency. So this is what’s called a point lookup and the system that we need to build needs to be optimized for point lookups.

And then there is this additional concept that real-time features introduced, which is freshness, and it’s different from latency. So latency is how quickly can I read and freshness is when was the last write I counted into my system. So in our case, at least the latency needs to be under 10 milliseconds and freshness could be anywhere from a second to midnight, a full day based on the use case.

We will talk about this another thing called batch correction and why it’s necessary in the next set of slides, and this is a interesting requirement of features around.

So we will first look at the harder problem, which is feature computation for training. So it’s generating that training data and the image that we saw in the earlier slide, slide 19, I reckon, basically, so how do we generate this point in time correct features are those specified points in time where we made predictions? So let’s say you have a query log, which is basically the user and the time in which we wanted to make this prediction. Now to begin with, you won’t have the features. You haven’t even deployed this into production. What you really need is take those Python feature definitions that I just showed you and fill in the features at those points in time as if you had a service that was responding with those responses.

So basically filling in those two features that I showed you, which are evolving in time at those exact points in time.

So, the architecture is more or less, you define your feature declaration which generates the feature backfill, then this generates a model which gets served and to really serve this model, you need to serve also the features.

So, we generate this partial batch aggregates and real-time streaming updates if you need real-time accuracy, and then we provide a feature client that talks to the model to give a consistent view of these features.

Now, the blue boxes are all Zipline, even the dark blue one, and the green boxes are rest of Bighead, the other components in Bighead, and purple box is your application server. So this can be any product that’s trying to use these features to make predictions and the dotted box it basically represents, which it basically represents a feature serving side of things.

Now, before we jump into the full solution, there is a prerequisite, which is basically understanding how aggregations work and what guarantees can we provide given an aggregation.

So let’s take a simple example, the sum aggregation and you want to compute some of an array of elements, right? This has this property of commutative which basically means that you can flip the elements in any order, sum them up, and it will give you the same result. And there is another property called associativity, which basically means that I can break up this array into different chunks and add them up together to produce the same sum. So it’s basically how accounting works. Let’s say you’re trying to count your piggy bank and you have a sister, you can ask her to count separately, break up the pile in a random order, and you can count separately and you can add your totals together, and that translates to massive parallelism. So you can do this particular thing with many computers simultaneously in any order without having to sort data. And there is another interesting property of reversibility. So if you want to say, okay, now I’m going to spend some of this money on a PlayStation, you can subtract that and again count from the total to see how much money you will have left without having to pay for the PlayStation and count again and there are operations that don’t have this property, we will look at those later. And this particular thing, no need to remember this phrase, but this particular operation is an abelian group.

Now, there is another operation average. Again, this is an abelian group so you can break it up randomly and the idea to do that here is split the operation into an intermediate representation and a final representation. So you keep the summed account separately these are both reversible and when you finalize you finalize it by division.

Now, we have an additional constraint beyond the mathematical abstract algebra definitions which is all this needs to be done in constant or bounded memory.

So you cannot keep a history of these events and call it abelian group. Well this is artificial and in practical terms that translates to us being able to scale storage. So there are two classes of aggregations, some average and count, and the other one is min, max, approx, unique, etc, so the sum has this property of reversibility, all of those operations in that group bill, and min and max don’t have the reversibility property.

So we will see the implications of them not being reversible in the next set of slides.

Incremental Windowing – with reversibility

So let’s say you’re trying to sum up the number of visits of a user in a certain window, this is one of the features. Now to update this window so the window slides forward, to really update this, you don’t need to recompute the sum over everything in the window again, you can remove what went out of the window and add what came into the window to get your next update. So this is really basic math and you can get the value of updated value of the number of visitors in the last year.

Now max, for example, is not reversible, and we knowing this is not possible. So let’s say you slide your window forward, there is no concept of removing the last element from max so in the previous window four was a max but now use slide and forward but you don’t know what happens when you remove four because all you are storing is four. The idea here is a bit more complex but not too complex. The way to make this efficient is to store a binary tree structure of sorts, which is the max operation of consecutive elements. So since it’s binary, it’s the first level is two, the second level captures four, and so on.

Now the benefit of this structure is that every time I slide the window forward, I don’t need to recompute or I don’t need to look at every element that was in the window. Instead, I can just look at fewer set of nodes in this binary tree to compute the next max.

So, in this particular example, we are looking at logarithmic number of events versus linear number of events and this is a huge deal if this linearity is in the order of millions. So, log of a million is basically 20 but a million is a million, so this makes this property or this trick makes non reversible aggregations very easy.

Windowing – wo reversibility

So the total time of the algorithm will be n-squared versus n log n for computing windows or the window array and this is a space trade off. So we are getting a couple orders of magnitude of improvement for time and we are spending twice the amount of space so it’s not free, but it’s worthwhile even and it’s pretty large.

So to rephrase and to emphasize the importance of groups, so group is what has reversal. So if it’s an unwindowed aggregation, there is no need for reversals and if it’s a windowed aggregation, then groups support reversals so you can simply subtract, and non groups basically max, etc, they don’t have reversal. And the only case where you need to do this treat algorithm is for the windowed non group case, everything else either doesn’t need reversal or its reversal.

So another side note about this whole window business is that we encourage data scientists in the company to use windowed features because they don’t drift. So if you’re not windowing your features, they keep increasing all the time. So some more count for example, they keep increasing, average doesn’t, it stabilizes but the idea here is that the features keep drifting and your model needs to be retrained very frequently if the features are not been done. So we looked at how we need to generate a logarithmic number of nodes for non reversible aggregations. So this is what we call the tiling problem. So given a range, left and right, give me all the tiles that I need, all the logarithmic number of tiles that I need to get the entire window span.

So, there is a really clever way to use bit shift operations to generate these tiles and you can recursively go down to get the logarithmic number of tiles from the split point.

So this description is all hand wavy, there’s more detail in the slide but still not enough and it still doesn’t prove why this is logarithmic. So there will be less hand waving in the paper or in the blog that we’re going to write.

Reversibility – Unpacking Change data

Now this is another important consequence of reversibility, so if you want to evolve your data with mutations or change data, change data is basically represented as inserts, updates and deletes. So deletion, here is a reversal, and an update is a deletion followed by an insertion at that particular time point. And an example, so this is not up to date, I guess. So a real example here is that let’s say I’m trying to count the number of people in California and one person moves from California to Nevada due to high cost of living. So this person who moved, do you see that as an update? So that is basically a deletion of the California entity and insertion of the Nevada entry. So you need to just increase the count for Nevada and decrease the count for California, because it’s reversible. Now the same thing cannot be done with max or unit count, or approximate unit count. So if there is a max, there is no way to reconcile this thing when a person moves from California to Nevada, and that’s why reversibility is important to provide that guarantee. And in the case where we don’t have that reversibility property, we need what’s called a batch correction. So those features that are being served needs to address the drift of features, within a certain duration so that those features don’t drift too much because of lack of reversibility.

So this is the same example that we showed again, just to reiterate, the larger problem here is given this log, we want to generate the aggregated features at those points in time. So we are going to put together what we learned in the last few slides into a complete algorithm that runs on spark to generate the full training set. So the feature backfill is essentially a join. So join is the left side and the right side, the left side being query log and the right side being raw data that needs to be aggregated and presented as features, and this is what we call a temporal aggregating join and the key here is that you need to fuse the join and aggregation. What I mean by that is to think of it alternatively, another idea, or another way to think of this is to roll back or to time travel your data warehouse to a historical point in time and to run this aggregations. So if you try to do this for every point in time and for every prediction request, the naive algorithm that I just described, is intractable. So we need to fuse these aggregations and joins without having to recompute and hold all the raw events in memory. So the next set of slides are going to describe what that topology looks like. So another observation here is that the raw data is way, way larger than the query log, and you need to look into history to generate the features. So that’s what makes it really big. So this is a same variation of the tree idea, but instead events are coming in. So these are the raw data that is flowing through our topology, and we need to update the appropriate predictions events or the appropriate feature request events based on this event data and the idea here is to use that tree idea again to update only logarithmic number of nodes, and then collapse these things together later in the topology to give you the full backfill.

Feature Backfill – Topology

So the topology here is not really straightforward, but there will be more detail in the blog.

So you start with this query log, and you kid group it by the key and at this point, we broadcast this so we use observation that the left side is really small and we use that to do a broadcast join.

And this broadcast to all the machines that are streaming the raw data in and we generate this partial aggregates. So this is basically where the tree idea lives and then we shuffle these partial aggregates. Now the data is already small because we went through all our raw data and kept the size the same as the query log. Now we shuffle them and re-aggregate them or rejoin them and that’s how we generate the results. So we collapse the tree in the last step to finalize the rhythms.

So these are three distinct phases of the topology, or three distinct spark stages, if you will, there are more nuances to this. So we saw only one kind of time as event time, but event time is a distinct thing from ingestion time. So event time is basically how many people were born in the window of 1992, 2011, ingestion time is when did that particular user with 1990 birthdate register on the Airbnb platform. So ingestion is like when our system or when the company saw this data point and event time is the actual data.

So, we showed a simple joint example but this is a multi joint. So we join against many raw data sources at once and this particular algorithm only, it also handles time skew, but if we can guarantee that there is no time skew between event time and ingestion time, then we can do something much faster.

Again more in the paper, this is a bit hand wavy but this is what the time permits for, So briefly talk about feature serving.

The architecture is really simple, to serve these things we build basically break it down into partial aggregates and streaming aggregates. So we keep them as separate data sets in the storage and then the fetcher client will basically read these two things and aggregate. So the reason why we need to do this is that we need to correct the batch aggregates because of lack of reversibility. That correction is why we want to keep it as a independent moving piece. So the feature client can update without loosing any availability.

Feature Serving

So another way to visualize this is basically the blue things are the streaming-head and batch-tail, and to get a 30 day window, we need to slide forward, so we just add two more days of data in the head which allows us to do this sliding without loosing availability. Again more detail in the paper.

Why lambda VS. Kappa?

There is, so I’m going to skip this slide given that time but the idea here is that we cannot really do lambda because there’s a need for reversibility and batch correction, plus kappa is too slow, so spark is specifically tuned for passing batch data

that is column not formatted

but there are different requirements for processing so there’s links to certain things that are mentioned in the paper and also Bigheads and then there’s a very need blog that compares Bighead with other systems.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Nikhil Simha

Airbnb

Nikhil is a Software Engineer on the Machine Learning infrastructure team at Airbnb. He is currently working on Bighead, an end-to-end machine learning platform. Prior to Airbnb, he built self healing scheduler - called Turbine, a real-time data processing engine - called stylus at Facebook. He is also the co-author of Realtime Data Processing at Facebook (SIGMOD-16) and Bighead(DSAA-2019) Nikhil got his Bachelors degree in Computer Science from Indian Institute of Technology, Bombay.