Skip to main content

Getting machine-learning systems to production is (still) hard. For many teams, building real-time ML systems that operate on real-time data is still a daydream – in most cases, they are tied to building batch prediction systems. There are a lot of challenges on the road to real-time ML, including building scalable real-time data pipelines, scalable model inference endpoints, and integrating it all into production applications.

This blog will show you how you can dramatically simplify these challenges with the right tools. With Tecton and Databricks, you'll be able to build the MVP for a real-time ML system in minutes, including real-time data processing and online inference.

Building a Real-Time ML System in One Notebook

In this example, we'll focus on building a real-time transaction fraud system that decides whether to approve or reject transactions. Two of the most challenging requirements of building real-time fraud detection systems are:

  • Real-time inference: predictions need to be extremely fast – typically model inference should happen in
  • Real-time features: often the most critical data needed to detect fraudulent transactions describes what has happened in the last few seconds. To build a great fraud detection system, you'll need to update features within seconds of a transaction occurring.

With Tecton and Databricks, these challenges can be greatly simplified:

  • Databricks and its native MLflow integration will allow us to create and test real-time serving endpoints to make real-time predictions.
  • Tecton helps build performant stream aggregations that will compute features in real-time

Let's walk through the following 4-steps on how to build a real-time production ML system:

  • Building performant stream processing pipelines in Tecton
  • Training a model with features from Tecton using Databricks and MLflow
  • Creating a model serving endpoint in Databricks using MLflow
  • Making real-time predictions using the model serving endpoint and real-time features from Tecton

Building stream processing pipelines in Tecton
Tecton's feature platform is built to make it simple to define features, and to help make those features available to your ML models – however quickly you need them. You'll need a few different types of features for a fraud model, for example:

  • Average transaction size in a country for the last year (computed once daily)
  • Number of transactions by a user in the last 1 minute (computed continuously from a stream of transactions)
  • Distance from the point of the transaction to the user's home (computed on-demand at the time of a transaction)

Each type of feature requires a different type of data pipeline, and Tecton can help build all three of these types of features. Let's focus on what would typically be the most challenging type of feature, real-time streaming features.

Here's how you can implement the feature "Number of transactions by a user in the last 1 minute and last 5 minutes" in Tecton:

@stream_window_aggregate_feature_view(
    inputs={'transactions': Input(transactions_stream)},
    entities=[user],
    mode='spark_sql',
    aggregation_slide_period='continuous',
    aggregations=[
        FeatureAggregation(column='counter', function='count', time_windows=['1m', '5m'])
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 4, 1),
    family='fraud',
    tags={'release': 'production'},
    owner='[email protected]',
    description='Number of transactions a user has made recently'
)
def user_continuous_transaction_count(transactions):
    return f'''
        SELECT
            user_id,
            1 as counter,
            timestamp
        FROM
            {transactions}
        '''

In this example we take advantage of Tecton's built-in support for low-latency streaming aggregations, allowing us to maintain a count of the number of transactions a user has made in real-time.


A feature pipeline built in Tecton for computing a count of user’s transactions

When this feature is applied, Tecton starts orchestrating data pipelines in Databricks to make this feature available in real time (for model inference) and offline (for model training). Historical features are stored in Delta Lake, meaning all of the features you build are natively available in your data lakehouse.

Training a model with features from Tecton using Databricks and MLflow

Once our features are built-in Tecton, we can train our fraud detection model. Check out the notebook below where we:

  • Generate training data using Tecton's time-travel capabilities
  • Train a SKLearn model to predict whether or not a transaction is fraudulent
  • Track our experiments using MLflow
# 1. Fetching a Spark DataFrame of historical labeled transactions
# 2. Renaming columns to match the expected join keys for the Feature Service
# 3. Selecting the join keys, request data, event timestamp, and label
training_events = ws.get_data_source("transactions_batch").get_dataframe().to_spark() \
                        .filter("partition_0 == 2022").filter("partition_2 == 05") \
                        .select("user_id", "merchant", "timestamp", "amt", "is_fraud") \
                        .cache()


training_data = fraud_detection_feature_service.get_historical_features(spine=training_events, timestamp_key="timestamp").to_spark()
training_data_pd = training_data.drop("user_id", "merchant", "timestamp", "amt").toPandas()
y = training_data_pd['is_fraud']
x = training_data_pd.drop('is_fraud', axis=1)
X_train, X_test, y_train, y_test = train_test_split(x, y)
with mlflow.start_run() as run:
  n_estimators = 100
  max_depth = 6
  max_features = 3
  # Create and train model
  rf = RandomForestRegressor(n_estimators = n_estimators, max_depth = max_depth, max_features = max_features)
  rf.fit(X_train, y_train)
  # Make predictions
  predictions = rf.predict(X_test)
  
  # Log parameters
  mlflow.log_param("num_trees", n_estimators)
  mlflow.log_param("maxdepth", max_depth)
  mlflow.log_param("max_feat", max_features)
  mlflow.log_param("tecton_feature_service", feature_service_name)
  
  # Log model
  mlflow.sklearn.log_model(rf, "random-forest-model")
  
  # Create metrics
  mse = mean_squared_error(y_test, predictions)
    
  # Log metrics
  mlflow.log_metric("mse", mse)

Creating a model serving endpoint in Databricks using MLflow
Now that we have a trained model, we'll use MLflow in Databricks to create a model endpoint. First, we'll register the model in the MLflow Model Registry:


Register your trained model in the MLflow model registry

Create a new model called “tecton-databricks-fraud-model”

Next, we'll use MLflow to create a serving endpoint:


Enable serving from the MLflow model registry UI

Once our model is deployed, we'll take note of the endpoint url:

Now, we have a prediction endpoint that can perform real-time transaction scoring – the only thing left is collecting the features needed at prediction-time.

Making real-time predictions using the model serving endpoint and real-time features from Tecton

The model endpoint that we just created takes features as inputs and outputs a prediction of the probability that a transaction is fraudulent. Retrieving those features poses some challenging problems:

  • Latency constraints: we need to look up (or compute) the features very quickly (
  • Feature freshness: we expect the features we defined (like the one-minute transaction count) to be updated in real-time as transactions are occurring.

Tecton provides feature serving infrastructure to solve these challenging problems. Tecton is built to serve feature vectors at high scale and low latency. When we built our features, we already set up the real-time streaming pipelines that will be used to produce fresh features for our models.

Thanks to Tecton, we can retrieve features in real-time with a simple REST call to Tecton's feature serving endpoint:

curl -X POST https://app.tecton.ai/api/v1/feature-service/get-features\
     -H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
  "params": {
    "feature_service_name": "fraud_detection_feature_service",
    "join_key_map": {
      "user_id": "USER_ID_VALUE"
    },
    "request_context_map": {
      "amt": 12345678.9
    },
    "workspace_name": "tecton-databricks-demo"
  }
}'

Check out the rest of the notebook where we'll wire it all together to retrieve features from Tecton and send them to our model endpoint to get back real-time fraud predictions:

def score_model(dataset):
  headers = {'Authorization': f'Bearer {my_token}'}
  data_json = dataset.to_dict(orient='split')
  response = requests.request(method='POST', headers=headers, url=model_url, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

amount=12345.0
df = fraud_detection_feature_service.get_online_features(
  join_keys={'user_id': 'user_131340471060', 'merchant': 'fraud_Schmitt Inc'},
  request_data={"amt": amount}
).to_pandas().fillna(0)

prediction = score_model(df)

print(prediction[0])

Conclusion

Building real-time ML systems can be a daunting task! Individual components like building streaming-data pipelines can be months-long engineering projects if done manually. Luckily building real-time ML systems with Tecton and Databricks can simplify a lot of that complexity. You can train, deploy, and serve a real-time fraud detection model in one notebook – and it only takes about 15 minutes.

To learn more about how Tecton is powering real-time ML systems, check out talks about Tecton at Data and AI Summit: Scaling ML at CashApp with Tecton and Building Production-Ready Recommender Systems with Feature Stores

Try Databricks for free

Related posts

See all Partners posts