Building a Feature Store around Dataframes and Apache Spark

Download Slides

A Feature Store enables machine learning (ML) features to be registered, discovered, and used as part of ML pipelines, thus making it easier to transform and validate the training data that is fed into machine learning systems. Feature stores can also enable consistent engineering of features between training and inference, but to do so, they need a common data processing platform. The first Feature Stores, developed at hyperscale AI companies such as Uber, Airbnb, and Facebook, enabled feature engineering using domain specific languages, providing abstractions tailored to the companies’ feature engineering domains. However, a general purpose Feature Store needs a general purpose feature engineering, feature selection, and feature transformation platform.

In this talk, we describe how we built a general purpose, open-source Feature Store for ML around dataframes and Apache Spark. We will demonstrate how our platform, Hopsworks, seamlessly integrates with Spark-based platforms, such as Databricks. With the Feature Store, we will demonstrate in Databricks how data engineers can transform and engineers features from backend databases and data lakes, while data scientists can use PySpark to select and transform features into train/test data in a file format of choice (.tfrecords, .npy, .petastorm, etc) on a file system of choice (S3, HDFS). We will also show the potential of Koalas for making feature engineering even easier on PySpark. Finally, we will show how the Feature Store enables end-to-end ML pipelines to be factored into feature engineering and data science stages that each can run at different cadences.


 

Try Databricks

Video Transcript

– In this talk, we’re going to talk about our experience in building and operating a feature store for the last year and a half, and we’re gonna share some of our experiences and some of the things that we’ve learned along the way.

Building a Feature Store around Dataframes and Apache Spark

So I’m gonna start by telling you about Twitter, who run the feature store in production and Twitter along with many hyperscale AI companies have been using feature stores to manage their machine learning assets. And one of their motivations for this is to increase the reuse of features across teams within the organization. So they actually introduced a metric which they call the number of teams who use another team’s features in production, or the sharing adoption metric. And that’s basically one of the ways in which they’re evacuating their platform. Now, if you talk to any existing team who don’t have platform support from machine learning assets, so data for machine learning, they don’t have a feature store, this is the kind reaction you might get.

When Data Engineers are asked to re-use other teams’ features*

Now this is a guy called feel the pain or hide-the-pain-Harold, and hide-the-pain-Harold, he’s putting his thumb up and he’s saying, great, of course I’ll share my features with this other team, even though I have 500 million other things to do. He’s saying he’ll do it but in reality he won’t because it’s gonna be painful. It’s gonna be painful because he doesn’t have platform support for data for AI. Now, the main hyperscale AI companies that are out there, the likes of Uber, Airbnb, Twitter that I mentioned, Facebook, Netflix, they all have feature stores in production and they’ve all talked about them.

Knowh Feature Stores in Production

So there’s a few links here you can follow to find out more about some of the existing feature stores in production. We at Logical Clocks of job helps works. It was the world’s first open source, and it’s still the only fully open source feature store for machine learning and our feature store is running in production. And if you want to learn more about how, where it’s running in production, you can go to the previous Spark AI Summit, the European one in 2019 where Swedbank talked about how there were managing 40 terabytes of data, transaction data, and they were doing fraud on it.

Feature Store in Banking Swedbank

So we’re trying to detect fraud. And that particular, in that use case, we’re managing the features for training models for predicting fraud in the feature store and then we have, we’re using also the helps for its top form to train the models to classify transactions as fraud or not fraud. So what is the context here of what we’re talking about?

Data Teams are moving from Analytics to ML

So I’ve mentioned that there’s hyperscale AI companies, have feature stores in production. So one of the things that we’ve seen is that existing data teams who have built that data infrastructure. So they’ve already maybe being, you know, they’ve got a data lake, they’ve got maybe Delta Lake Databricks, and they built in BI tools. So what they’ve done is they’ve basically have to say, well, we’ve got all this data, we’ve got great infrastructure in place, how do we build this out to to explore AI? So this is where the feature store comes in and you need to take your data from all the places that it currently exists and write what we call feature pipelines. And the feature pipelines will transform the data in the data lake into features where the feature store will store it. And that data will be used for training and models, for serving features to online applications, and also for analytical applications that will use models to store, to do analytics offline.

Features are created/updated at different cadences

Now, features themselves are not one homogenous thing. You can have features that need to, that are entered by user real time features and we have maybe less than two seconds to transform that input, that the user’s input into a feature from machine learning model and that’s something we experienced with a customer. But a lot of other features have higher latency than the two seconds that maybe their historical data maybe use your clicks, change data capture from an operational database, maybe you would use their profile information or some form of information collected by batch applications. So this data that comes in might have low latency and might have high latency, but it’s gonna come into the feature store and we’re gonna have these online applications, the data scientists creating training sets in batch applications using it. And they have very different requirements. Beyond identification, we’ll need low latency access to the data, but as we saw what Swedbank, we might need many tens of terabytes of storage capacity which is not what you would associate with online low-latency database. So the main thing that we have to take it, take away from here is that nearly all feature stores actually are two databases. We can call it a dual database system. You have an online feature store for low latency serving and features, and an offline feature store to store a scalable SQL data. And what can we can see from this is that the pipeline where we’re feeding our features into the feature store, now it needs to actually feed into both the online and offline, so we’re introducing complexity. And that’s really because there’s no existing database that meets this need of scalable and low latency. And also being able to join features because we want to reuse features and that really means joining them.

FeatureGroup Ingestion in Hopsworks

So what we did in our platform is we introduced this concept of a feature group, and it’s really a data frame. So we have a data frame API where the features that are coming in, that are being ingested from these backend up platforms, they’re gonna be read or cached in our feature store from data frame, from data frame. So you’ll have an application maybe it’s written in Spark, Spark Streaming, maybe it’s in pandas, and that application will materialize or save its features in a feature group. So what our particular framework, the help service feature stored those is by providing the data frame API, we simplify the complexity of storing data in the online and offline feature stores. You don’t have to, as a developer, have to worry about, well, is this going to online? Is it going to offline? How to ensure they’re in sync? How do I ensure the data made it to them, but not to the other and so on. So the feature group is an abstraction we introduced to simplify the usage of this feature store. So you may have noticed that this pipeline ends at the feature store, right? And the observant members of the viewers here, but have said that this is not what we’ve heard before. We’ve often heard that machine learning pipelines are end-to-end pipelines. We go from the raw data to a model at the end. So in our case, when you have a feature store, you typically don’t have end-to-end pipelines.

MO Pipelines start and stop at the Feature Store

What you have is a pipeline that starts at the back end of where the data resides. So in your data lake, in your data warehouses, in your operational SQL systems, in your file systems, your object stores, and it ends up in the feature store. So in our case, in our platform, Hopsworks, we use Airflow typically to write, to orchestrate these pipelines. Often they’ll be written at Spark, Scala Spark or Pyspark. And when the features arrive in the feature store and as we saw earlier, they may arrive at different cadences, they’ll just end up there where data scientists can use them to actually write training pipelines. They’re gonna take the data from the feature store those features, create training and test and validation data sets and then they’re going to use that those data sets to train and evaluate their models on the boat. So we use Airflow in our platform again for these training pipelines.

Feature Store Concepts

So our feature store is built around the concept of features. This example is from the Titanic dataset. We have some features for uncovering whether a passenger has survived on the Titanic or not, and the sex of the passenger but in this example, we can see there’s a second feature group called bank accounts. So we’re assuming that we’ve, somebody has somehow discovered the amount of money people have in bank accounts on the Titanic, and it’s now available in the feature store. So data scientists can come along to this feature store and the data science can say, well, I’m gonna join a bunch of these features together. I’m gonna take the Titanic passenger, this feature features, and I’m actually gonna augment it with this bank account balance because I think maybe that will help me predict better if passenger survived, maybe they bribed to get on life boat. So once a data science is decided on the features to join, they’ve decided on a file format to that they like to help put the data out. So if you’re gonna materialize that training and test and validation data, and you go ahead and say, it’s TF record for TensorFlow or NumPy for pint torch, and then you decide where to store it. Maybe an S3 booklet, maybe on a file system. Everything here is versioned. So the features, the feature groups and the training test datasets perversion.

Register a FeatureGroup with the Feature Store

So what does the API look like? We said it’s a data frame API. Well, you’re just reading in a data frame from a backend system. Maybe if you’re doing dry features, you’re reading from the feature store itself, and then you create a feature and that’s really just putting in a description for the feature group, the data frame itself, and whether it’s online, offline or both.

Hopsworks Feature Store

So we covered the ingestion of features to the feature store that’s on the left hand side. Now we’ll just look briefly at how we use the feature store. So we look at online applications and then creating training test datasets.

Create Train/Test Datasets using the Feature Store

So creating a training test dataset with our feature store. And originally we had this flat namespace of features. So we can see, we have this sample underscore data which is a data frame we get by joining these features together and the query planner in our feature store would look for a common joint key across the feature groups, give you back the data frame. We can use a directory, or we can materialize the data output of this TF records and we can see the formatting is slightly off there.

Online Feature Store

The online feature store which is another user of the feature store will use the feature store to get features in real time. So it’s going to, it’s, this online application is going to say, hey, you know, I have a transaction, I want to classify it as fraud or not fraud, and I have a bunch of input data. I have the IDs of the customers involved, the banks involved in the event, the money, the days, not only need to go to the feature store to get a lot of historical features. So number of transactions users have done, you know, and so on, could be even hundreds of features. So in our case, we use the online feature store mice grill cluster. We basically pass those keys to the feature store and it performs the join there. We’re using a JDBC API because it’s kind of language-independent, and it’s also low latency, and for the application, we’ll get back is a feature vector that it can use to send to a model for prediction. So typically, models will be served over the network, maybe it’s TensorFlow serving server or Flask server, and in our case, we’ve set it up so that everything is highly available. So in the cloud, typically then, of course this online database on our model serving will be highly available across availability zones. I’m gonna hand over to Fabio at this stage. He’ll talk about what we’ve learned between this first version of the feed store and the new version that’s coming out now. – So thanks Jim. So far we have seen out to, how we work with the features of the API version one, and the APIs have been out for over a year now, and we have learned several sort of things. So we would like to go through what that, the good decisions that we took in designing API one, and the shortcomings and how we solved the shortcomings for in version two. So regarding version one, I think there are some like four aspects that we that we did right. We took what we believe are good decisions. The first one is we built the API around the concept of data frames, we didn’t build around the concept of DSL. DSL could be other later. The main reason why is that it’s really all through, got to DSL a general purpose of your site right.

There are some companies that already are building internally a feature store and they have the assault internally for this specific use case but to generalize it, it’s quite complicated. The second step is that we see the feature store as a cache for materialized features, not the libraries. Library is good if you want only to enforce that the same code that is used to generate features during training, it’s the same code that is generated features stores during it’s serving. But as soon as you start eating limitations in terms of latency when serving features, you’ll soon realize that you have to materialize these features. And even if you don’t have this online serving aspect in your pipelines even if you’re doing dispatching, you’ll soon realize that it’s more useful and less expensive to materialize the feature of computation instead of computing the features over and over again. And this brings us to the third point, but basically separating the use case for online and applying them to different databases. So we can serve both low agency and the buck stops efficiently. And finally, the key component of the feature store features and features that would be the building blocks you can use to build fit, apply to datasets and you can share them, you can reuse them across a multiple throwing datasets and this means that you will need to join features across with other features and this means you need an engine that can do joins and I know what case is it’s a spot.

What are the shortcomings of the API one? Now, the first one is that we try to do like a flat namespace, as you were saying before. The issue is that as soon as you stop dating scallop, as soon as you start scaling your feature store, onboarding new teams, adding features and so on, you will get a name conflict. So you have two things. I mean the same feature with the same name, but they represent two different things. And so we introduced the concept of feature groups to scope out and basically solve these kind of like naming issues. The second point is like when talking to users and customer, we are seeing over and over again that people want to have production feature store where basically only for pipelines or like predefined pipelines can right into this production feature store and let the scientists and engineers play with like new features and next new experiments on our development feature store. As team down, people want to allow me to scientists and the engineer to join features between development and production feature store to build up extra training datasets. So with this in mind, we actually improve the joining capabilities to allow people to join but there features from different feature groups for different feature stores and being more expressive when defining joins, and we’ll see that on example. We also have a fiscal support for time travel for feature groups. So you can go back in time and see exactly which stage, any point in time what the future group look like, and get also the the changes that apply, that will apply to this feature group. And finally, we try to be more consistent with all the frameworks that are out there meaning postmark and fondness data frames So this is what I was referring before.

Connect and Support for Multiple Feature Stores

So this, we have a demo later on where we basically go through in more detail, but this was the idea that was studying it before where we try to work with that production and development feature store. So we get a connection from an instance running on obsolete BI and we get two handles, one for the production feature store and one for the development feature store and usually those handles, we can reach the feature groups that we are interested on and join them together.

Feature Group Operations

Regarding the feature groups, nothing much has changed relative to the first version. You create in the first instruction grid a meta data object that links within your obligation only, and then you save this and when saving it, you pass the data frame, and the library passes the schema data frame and uses the columns of data frames as feature and it gonna persist at that moment, the metadata in the feature store. Of course, you can go read, you can add more data afterwards and in particular, you can also make this feature group available online by searching the online-enabled flight to throw.

One thing we added in this new version is we have the support from tagging. If you have a 1,000 features across 1,000 of feature groups, you can easily add tags to that, you can define tags across multiple features stores and also reduction and development, and you can tag features and feature groups entering datasets, and the platform is gonna index those tags and allow you to do free text search on those types.

Schema Version Management

One more thing we have that is that we are the support for schema changes, a number of schema changes that do not require increasing the version. So feature groups and doing a set of versions and the versions are meant to represent breaking changes. So you remove a feature or you change the meaning of a feature, and this makes that, like, if you increment the feed diversion, existing pipelines that rely on that feature being there or existing models that are relying on the feature being complete in a certain way, do not break. However, like if you want to just add the feature, this is not the breaking changes and we can do that now on the same feature group, similar version without asking you to bump the version.

Exploratory Data Analysis

When I, like, if I put my digital scientists out all night, I’m doing like explore, I’m exploring the feature groups at the feature store and looking for features that I can use. We try also in this cases of the same for, to be as similar as possible to existing frameworks like MySpark and found us. So, as you can see here, I can pretty much do the same operations that can on postbox data frames or I can select a bunch of features from a specific feature group.

From a specific feature store, I can call, show on them, I can read, I can filter for a specific value and yeah, this is pretty much the same as a Postbox data frame.

Joins – Pandas Style API

When it comes to join, as this is a sample that we try to be as good as much as opportunity to data scientists to express their join in a familiar way, like in the fondest data frames.

So in this case, there’s a sample. I’m trying to join three feature groups together. One coming from the production feature store and two coming from development feature store. So I get down a little for all of three and I select some features from all of them that I’m interested on, and I can sum them together. So I can decide to pass and provide the joining key as in the second join, or I can omit the joining key. If I omit the joining key, then the feature store will automatically figure it out what is the largest subset of primary keys from the different feature groups and use those to join. And of course, I can overwrite this behavior. I can also overwrite the behavior of which kind of joins to apply. By default, there’s the inner join, but you can also overwrite it to be a left join or whatever other join. And finally, I can build up the data frame and see the results. When looking at the discoloration, I can also use the time travel feature of the features store. So I can go and fetch the feature group, and I can specify either a time. So what data features to look like at X time in the first, or I can specify a range and the range will give me back three changes that are on that feature group during the time range.

And finally we create during three data sets.

Create Train/Test Data from Joined Features

So to create three data sets, I got to create that we built in the previous slides and I apply and agree that the three data sets with the same as a similar concept to decorate the feature groups. So I create, I call the grid methods.

We now have support for automotive and defined splittings. So we can split the three data sets in as many splits as we want. In this case, some running tests and by date.

As in version one, we can save the same data set on ops fs, or we can save it to an external system like as three buckets. You can specify the data formats. And again, we call safe on the changes of this object and we pass preview. We post the preview and the preview is gonna be reused when we are, when we are doing reference.

Get feature vector for online serving

During the reference time, we want to augment our influence vector and the feature store can help us in that sense. So we can ask the feature store to get the feature vector for a specific triangular set, and this will return the set of features in the order that we used during training and the free days so you can add, you can then go to the online feature store and select the entities from those feature groups from the online feature store. You can also specify a set of digital objects to restrict the specific interest that you are interested on. Okay, so in this demo, we’re gonna see how we can work with the HFS library and if you just started BI to interact from a database cluster with the AppSource feature store. So at any one time anchor that an instance on opposites could the opposites of the AI and at the same time, I already created and configure a cluster on Databricks. I installed in the necessary libraries and applying that into say a spot configuration. All the instructions are available in a documentation. We have a section describing how to configure your DataBricks cluster, and the you can check it out at hopsworks.readthedocs.io.

So if I go on my notebook as a demo notebook, the first thing we have to do is to get a connection for the specific feature store we are working with, passing the location and the project we want to work with and how to fetch the API key to authenticate with the feature store. At the same time, we have to get an envelope for the specific store we want to work with, in this case a development one but at the same time, we should if you want to work with the production one as well and join featured groups between the two, we also need to get to end up for the production feature store.

In the next steps, we’re gonna do some basic feature engineering. We are gonna work with a CSV file containing weather measurements from stations in weather stations in Australia. So we’re gonna create three data frames. One’s gonna be a location data frame, it’s gonna be allocated, look up a feature group. I’m basically going to any location as a string, the city and a location around them ID, basically for each city. We also do build up a feature group for temperature and we’re gonna do a bunch of features which are temperature-related and some aggregations on them, and same thing for rain and the irrigation related to rain. Yeah, finally, we create the feature groups on the feature store. The first thing we have to call this create feature group. This is equivalent to Spark that create data frames. So at this stage, nothing gets persistent on the feature store at somebody creating a metadata object within the application. We pass a name, pass a version, we pass a description and we pass a primary key which means that all entries in my feature group will be uniquely identified using location ID and finally we call it the safe. And this stage, we actually parse your data frame object and use the columns of the frame as features and it’s gonna pursue several things in the feature store. We do the same for the temperature feature group. In this case we also pass a partition key which allows you to control our data which is actually within on disk which is gonna be useful as you just start doing optimization, enjoying and so on. And again, we save it and we do the same for the rain feature group.

So if I now put my, the scientist hat on and I want to explore what’s available in the, on the feature store, I can call this get feature group and get feature group will get me a past person. I have to pass a particular name which is the brain and in particular version, version is not mandatory. If you don’t provide it, it’s gonna default to one. I can get a summit data and I can also get some of like a sample of the data. And all of this is also available from the UI. In particular, you can see the same, the sample data from the UI. You can, if they feature group was partitioned, we could select a specific partition. If it was also available online, we could select to see a sample data from the online feature store. Let’s go back. Yes. So now we want to build the query and the query is submitted on the entry to be able to create a triangle set. And for to do that, we also get the handle for the temperature feature group. And we get to a simple query and joint temperature and rain features together and again, we just select all and yet we don’t have to pass any journey key.

The future story is smart enough to figure out that both of them are the same primary keys and use them to be able to join them. But if you want to override this, this behavior, then you can pass your own. Join your keys and you can also pass your, the type of the join basically. You can also not select all the features and select only your substance of them, and you can go in the feature store UI and you can select a specific, you can see the schema and whatever features you actually need and use them in your preview basically.

You can execute the queries that are online and the data feature store, and you can use the query to create the trainer set. So same as for the feature groups, creating your set doesn’t create anything, and it just create some metadata object within the application itself, and when you create the set, you can also pass besides name, description and version, you can possible spas like how to split the trainee data set and if you want to have train test by date, for instance. And when you save it, you can also overwrite, you can also overwrite the some right option. So in this case, instead of having a common, I want to have a pipe between the different columns.

One more thing I want to show you. If you work with a lot of feature groups and a lot of features, it becomes quite out to then find them and for this, you can use tags. So you can define tags at cross production and development feature store, and you can select a feature group or a feature or during data set, and you can apply a specific tag. In this case, I already apply a country tag and a year tag, so Australia and 2010, and then I can search for them. So if I go and search for Australia, then I can see that in the development feature store, there is a feature group with a tag called Australia and then, you know, I can go and inspect it and I can see that, you know, I can go back to some UI and see the schema and so on. So yeah, if you want to try it out, if you just saw and the DataBricks integration, I suggest you check out website of the UI and documentation of hopsworks.io. Thank you very much. If you want to learn more, you can try it out on the features of source feature store from hopsworks.ai. You can have a look at the open source code and key tablet custody Logical Clocks in Hopsworks. You can follow us on Twitter at Logical Clocks, and I would like to take this opportunity to do also thank my colleagues for their contribution to the feature store.

Thank You!

for their contribution to the feature store.


 
Try Databricks
« back
About Jim Dowling

Logical Clocks AB

Jim Dowling is CEO of Logical Clocks and an Associate Professor at KTH Royal Institute of Technology. He is lead architect of the open-source Hopsworks platform, a horizontally scalable data platform for machine learning that includes the industry’s first Feature Store.

About Fabio Buso

Logical Clocks AB

Fabio Buso is the head of engineering at Logical Clocks AB, where he leads the Feature Store development. Fabio holds a master’s degree in cloud computing and services with a focus on data intensive applications, awarded by a joint program between KTH Stockholm and TU Berlin.