Fine-Grained Time Series Forecasting at Scale With Facebook Prophet and Apache Spark: Updated for Spark 3

Advances in time series forecasting are enabling retailers to generate more reliable demand forecasts. The challenge now is to produce these forecasts in a timely manner and at a level of granularity that allows the business to make precise adjustments to product inventories. Leveraging Apache Spark™ and Facebook Prophet, more and more enterprises facing these challenges are finding they can overcome the scalability and accuracy limits of past solutions.

Go directly to the forecasting notebook referenced in this post
To see this solution for Spark 2.0, please read the original post here

In this post, we’ll discuss the importance of time series forecasting, visualize some sample time series data, and then build a simple model to show the use of Facebook Prophet. Once you’re comfortable building a single model, we’ll combine Facebook Prophet with the magic of Spark to show you how to train hundreds of models at once, allowing you to create precise forecasts for each individual product-store combination at a level of granularity rarely achieved until now.

需要予測システムの精度と欠品防止の重要性

Improving the speed and accuracy of time series analyses in order to better forecast demand for products and services is critical to retailers’ success. If too much product is placed in a store, shelf and storeroom space can be strained, products can expire and retailers may find their financial resources tied up in inventory, leaving them unable to take advantage of new opportunities generated by manufacturers or shifts in consumer patterns. If too little product is placed in a store, customers may not be able to purchase the products they need. Not only do these forecast errors result in an immediate loss of revenue to the retailer, but over time, consumer frustration may drive customers towards competitors.

New expectations require more precise time series models and forecasting methods

For some time, enterprise resource planning (ERP) systems and third-party solutions have provided retailers with demand forecasting capabilities based on simple time series models. But with advances in technology and increased pressure in the sector, many retailers are looking to move beyond the linear models and more traditional algorithms historically available to them.

New capabilities, such as those provided by Facebook’s Prophet, are emerging from the data science community, and companies are seeking the flexibility to apply these machine learning (ML) models to their time series forecasting needs.

This movement away from traditional forecasting solutions requires retailers and the like to develop in-house expertise not only in the complexities of demand forecasting but also in the efficient distribution of the work required to generate hundreds of thousands or even millions of ML models in a timely manner. Luckily, we can use Spark to distribute the training of these models, making it possible to predict both demand for products and services and the unique demand for each product in each location.

時系列データにおける季節変動需要の視覚化

To demonstrate the use of Facebook Prophet to generate fine-grained demand forecasts for individual stores and products, we will use a publicly available dataset from Kaggle. It consists of 5 years of daily sales data for 50 individual items across 10 different stores.

まず、全ての商品と店舗について、年間売上の全体像を確認しましょう。下のグラフからわかるように、商品の総売上高は大きな増減もなく毎年増加しています。

Sample Kaggle retail data used to demonstrate the combined fine-grained demand forecasting capabilities of Facebook Prophet and Apache Spark

Next, by viewing the same data on a monthly basis, it’s clear that the year-over-year upward trend doesn’t progress steadily each month. Instead, there is a clear seasonal pattern of peaks in the summer months and troughs in the winter months. Using the built-in data visualization feature of Databricks Collaborative Notebooks, we can see the value of our data during each month by mousing over the chart.

週単位では、売上のピークが日曜日(weekday 0)にあり、月曜日(weekday 1)に大きく落ち込んだ後、残りの平日に徐々に回復する傾向がみてとれます。

従来の時系列予測手法による季節パターンの会計処理の難しさを実証

Prophet のシンプルな時系列予測のための解析モデル

As illustrated above, our data shows a clear year-over-year upward trend in sales, along with both annual and weekly seasonal patterns. It’s these overlapping patterns in the data that Facebook Prophet is designed to address.

Facebook Prophet follows the scikit-learn API, so it should be easy to pick up for anyone with experience with sklearn. We need to pass in a two-column pandas DataFrame as input: the first column is the date, and the second is the value to predict (in our case, sales). Once our data is in the proper format, building a model is easy:

import pandas as pd
from fbprophet import Prophet
 
# instantiate the model and set parameters
model = Prophet(
    interval_width=0.95,
    growth='linear',
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=True,
    seasonality_mode='multiplicative'
)
 
# fit the model to historical data
model.fit(history_pd)

Now that we have fit our model to the data, let’s use it to build a 90 day forecast:

# define a dataset including both historical dates & 90-days beyond the last available date, using Prophet's built-in make_future_dataframe method
future_pd = model.make_future_dataframe(
    periods=90, 
    freq='d', 
    include_history=True
)
 
# predict over the dataset
forecast_pd = model.predict(future_pd)

That’s it! We can now visualize how our actual and predicted data line up as well as a forecast for the future using the Facebook Prophet model’s built-in .plot method. As you can see, the weekly and seasonal demand patterns shown earlier are reflected in the forecasted results.

predict_fig = model.plot(forecast_pd, xlabel='date', ylabel='sales')
display(fig)

Comparing the actual demand to the time-series forecast generated by Facebook Prophet leveraging Apache Spark

This visualization is a bit busy. Bartosz Mikulski provides an excellent breakdown of it that is well worth checking out. In a nutshell, the black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval.

Training hundreds of time series forecasting models in parallel with Facebook Prophet and Spark

Now that we’ve demonstrated how to build a single model, we can use the power of Spark to multiply our efforts. Our goal is to generate not one forecast for the entire dataset, but hundreds of models and forecasts for each product-store combination, something that would be incredibly time-consuming to perform as a sequential operation.

数百単位のモデルを用意することで、たとえば、スーパーマーケットチェーンであれば、地域ごとの需要の違いに基づいて、各店舗で発注すべき生乳の量 などを正確に予測分析できるようになります。

Spark DataFrames を使用した時系列データの分散処理

Data scientists frequently tackle the challenge of training large numbers of models using a distributed data processing engine such as Spark. By leveraging a Spark cluster, individual worker nodes in the cluster can train a subset of models in parallel with other worker nodes, greatly reducing the overall time required to train the entire collection of time series models.

もちろん、クラスターのワーカーノードでトレーニングする場合も、相応のクラウドインフラストラクチャが必要で、その分のコストがかかります。しかし、クラウドリソースをオンデマンドで容易に利用できれば、必要なリソースを迅速にプロビジョニングできます。また、モデルのトレーニングやリソースの展開も短期間に行え、物理資産を長期間保持することなく、拡張性を大きく向上させることができます。

The key mechanism for achieving distributed data processing in Spark is the DataFrame. By loading the data into a Spark DataFrame, the data is distributed across the workers in the cluster. This allows these workers to process subsets of the data in a parallel manner, reducing the overall amount of time required to perform our work.
Of course, each worker needs to have access to the subset of data it requires to do its work. By grouping the data on key values, in this case on combinations of store and item, we bring together all the time series data for those key values onto a specific worker node.

store_item_history
    .groupBy('store', 'item')
    . . .

We share the groupBy code here to underscore how it enables us to train many models in parallel efficiently, although it will not actually come into play until we set up and apply a custom pandas function to our data in the next section.

Leveraging the power of pandas functions

With our time series data properly grouped by store and item, we now need to train a single model for each group. To accomplish this, we can use a pandas function, which allows us to apply a custom function to each group of data in our DataFrame.

This function will not only train a model for each group, but also generate a result set representing the predictions from that model. But while the function will train and predict on each group in the DataFrame independent of the others, the results returned from each group will be conveniently collected into a single resulting DataFrame. This will allow us to generate store-item level forecasts but present our results to analysts and managers as a single output dataset.

As you can see in the abbreviated code below, building our function is relatively straightforward. Unlike in previous versions of Spark, we can declare our functions in a fairly streamlined manner, specifying the type of pandas object we expect to receive and return, i.e. Python type hints.

以下の関数では、モデルの作成と設定、受け取るデータとの適合について定義されています。また、モデルでの予測が実行され、関数からの出力としてデータが返されます。

def forecast_store_item(history_pd: pd.DataFrame) -> pd.DataFrame: 
    
    # instantiate the model, configure the parameters
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )
    
    # fit the model
    model.fit(history_pd)
    
    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d',
        include_history=True
    )
    
    # make predictions
    results_pd = model.predict(future_pd)
    
    # . . .
    
    # return predictions
    return results_pd


Now, to bring it all together, we use the groupBy command we discussed earlier to ensure our dataset is properly partitioned into groups representing specific store and item combinations. We then simply applyInPandas the function to our DataFrame, allowing it to fit a model and make predictions on each grouping of data.

各グループに対して関数が実行され、データセットが返されると、生成される予測データに結果が反映されます。このような仕組みにより、複数の時系列解析モデルから生成されるデータのトラッキングや、実運用への展開が可能となります。

from pyspark.sql.functions import current_date
 
results = (
    store_item_history
        .groupBy('store', 'item')
          .applyInPandas(forecast_store_item, schema=result_schema)
        .withColumn('training_date', current_date())
    )

次のステップ

We have now constructed a forecast for each store-item combination. Using a SQL query, analysts can view the tailored forecasts for each product. In the chart below, we’ve plotted the projected demand for product #1 across 10 stores. As you can see, the demand forecasts vary from store to store, but the general pattern is consistent across all of the stores, as we would expect.

Sample time series visualization generated via a SQL query

As new sales data arrives, we can efficiently generate new forecasts and append these to our existing table structures, allowing analysts to update the business’s expectations as conditions evolve.

To generate these forecasts in your Databricks environment, please import the following notebook:

To access the prior version of this notebook, built for Spark 2.0, please click this link.

Databricks 無料トライアル 使ってみる

ご登録