Fine-Grained Time Series Forecasting at Scale With Facebook Prophet and Apache Spark: Updated for Spark 3
Advances in time series forecasting are enabling retailers to generate more reliable demand forecasts. The challenge now is to produce these forecasts in a timely manner and at a level of granularity that allows the business to make precise adjustments to product inventories. Leveraging Apache Spark™ and Facebook Prophet, more and more enterprises facing these challenges are finding they can overcome the scalability and accuracy limits of past solutions.
Go directly to the forecasting accelerator referenced in this post. To see this solution for Spark 2.0, please read the original blog post here.
In this post, we'll discuss the importance of time series forecasting, visualize some sample time series data, and then build a simple model to show the use of Facebook Prophet. Once you're comfortable building a single model, we'll combine Facebook Prophet with the magic of Spark to show you how to train hundreds of models at once, allowing you to create precise forecasts for each individual product-store combination at a level of granularity rarely achieved until now.
Accurate and timely forecasting is now more important than ever
Improving the speed and accuracy of time series analyses in order to better forecast demand for products and services is critical to retailers’ success. If too much product is placed in a store, shelf and storeroom space can be strained, products can expire and retailers may find their financial resources tied up in inventory, leaving them unable to take advantage of new opportunities generated by manufacturers or shifts in consumer patterns. If too little product is placed in a store, customers may not be able to purchase the products they need. Not only do these forecast errors result in an immediate loss of revenue to the retailer, but over time, consumer frustration may drive customers towards competitors.
New expectations require more precise time series models and forecasting methods
For some time, enterprise resource planning (ERP) systems and third-party solutions have provided retailers with demand forecasting capabilities based on simple time series models. But with advances in technology and increased pressure in the sector, many retailers are looking to move beyond the linear models and more traditional algorithms historically available to them.
New capabilities, such as those provided by Facebook’s Prophet, are emerging from the data science community, and companies are seeking the flexibility to apply these machine learning (ML) models to their time series forecasting needs.
This movement away from traditional forecasting solutions requires retailers and the like to develop in-house expertise not only in the complexities of demand forecasting but also in the efficient distribution of the work required to generate hundreds of thousands or even millions of ML models in a timely manner. Luckily, we can use Spark to distribute the training of these models, making it possible to predict both demand for products and services and the unique demand for each product in each location.
Visualizing demand seasonality in time series data
To demonstrate the use of Facebook Prophet to generate fine-grained demand forecasts for individual stores and products, we will use a publicly available dataset from Kaggle. It consists of 5 years of daily sales data for 50 individual items across 10 different stores.
To get started, let's look at the overall yearly sales trend for all products and stores. As you can see, total product sales are increasing year over year with no clear sign of convergence around a plateau.
Next, by viewing the same data on a monthly basis, it’s clear that the year-over-year upward trend doesn't progress steadily each month. Instead, there is a clear seasonal pattern of peaks in the summer months and troughs in the winter months. Using the built-in data visualization feature of Databricks Collaborative Notebooks, we can see the value of our data during each month by mousing over the chart.
At the weekday level, sales peak on Sundays (weekday 0), followed by a hard drop on Mondays (weekday 1), then steadily recover throughout the rest of the week.
Getting started with a simple time series forecasting model on Facebook Prophet
As illustrated above, our data shows a clear year-over-year upward trend in sales, along with both annual and weekly seasonal patterns. It’s these overlapping patterns in the data that Facebook Prophet is designed to address.
Facebook Prophet follows the scikit-learn API, so it should be easy to pick up for anyone with experience with sklearn. We need to pass in a two-column pandas DataFrame as input: the first column is the date, and the second is the value to predict (in our case, sales). Once our data is in the proper format, building a model is easy:
import pandas as pd
from fbprophet import Prophet
# instantiate the model and set parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model to historical data
model.fit(history_pd)
Now that we have fit our model to the data, let's use it to build a 90 day forecast:
# define a dataset including both historical dates & 90-days beyond the last available date, using Prophet's built-in make_future_dataframe method
future_pd = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True
)
# predict over the dataset
forecast_pd = model.predict(future_pd)
That's it! We can now visualize how our actual and predicted data line up as well as a forecast for the future using the Facebook Prophet model's built-in .plot method. As you can see, the weekly and seasonal demand patterns shown earlier are reflected in the forecasted results.
predict_fig = model.plot(forecast_pd, xlabel='date', ylabel='sales')
display(fig)
This visualization is a bit busy. Bartosz Mikulski provides an excellent breakdown of it that is well worth checking out. In a nutshell, the black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval.
Training hundreds of time series forecasting models in parallel with Facebook Prophet and Spark
Now that we've demonstrated how to build a single model, we can use the power of Spark to multiply our efforts. Our goal is to generate not one forecast for the entire dataset, but hundreds of models and forecasts for each product-store combination, something that would be incredibly time-consuming to perform as a sequential operation.
Building models in this way could allow a grocery store chain, for example, to create a precise forecast for the amount of milk they should order for their Sandusky store that differs from the amount needed in their Cleveland store, based upon the differing demand at those locations.
How to use Spark DataFrames to distribute the processing of time series data
Data scientists frequently tackle the challenge of training large numbers of models using a distributed data processing engine such as Spark. By leveraging a Spark cluster, individual worker nodes in the cluster can train a subset of models in parallel with other worker nodes, greatly reducing the overall time required to train the entire collection of time series models.
Of course, training models on a cluster of worker nodes (computers) requires more cloud infrastructure, and this comes at a price. But with the easy availability of on-demand cloud resources, companies can quickly provision the resources they need, train their models, and release those resources just as quickly, allowing them to achieve massive scalability without long-term commitments to physical assets.
The key mechanism for achieving distributed data processing in Spark is the DataFrame. By loading the data into a Spark DataFrame, the data is distributed across the workers in the cluster. This allows these workers to process subsets of the data in a parallel manner, reducing the overall amount of time required to perform our work.
Of course, each worker needs to have access to the subset of data it requires to do its work. By grouping the data on key values, in this case on combinations of store and item, we bring together all the time series data for those key values onto a specific worker node.
store_item_history
.groupBy('store', 'item')
. . .
We share the groupBy code here to underscore how it enables us to train many models in parallel efficiently, although it will not actually come into play until we set up and apply a custom pandas function to our data in the next section.
Leveraging the power of pandas functions
With our time series data properly grouped by store and item, we now need to train a single model for each group. To accomplish this, we can use a pandas function, which allows us to apply a custom function to each group of data in our DataFrame.
This function will not only train a model for each group, but also generate a result set representing the predictions from that model. But while the function will train and predict on each group in the DataFrame independent of the others, the results returned from each group will be conveniently collected into a single resulting DataFrame. This will allow us to generate store-item level forecasts but present our results to analysts and managers as a single output dataset.
As you can see in the abbreviated code below, building our function is relatively straightforward. Unlike in previous versions of Spark, we can declare our functions in a fairly streamlined manner, specifying the type of pandas object we expect to receive and return, i.e. Python type hints.
Within the function definition, we instantiate our model, configure it and fit it to the data it has received. The model makes a prediction, and that data is returned as the output of the function.
def forecast_store_item(history_pd: pd.DataFrame) -> pd.DataFrame:
# instantiate the model, configure the parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model
model.fit(history_pd)
# configure predictions
future_pd = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True
)
# make predictions
results_pd = model.predict(future_pd)
# . . .
# return predictions
return results_pd
Now, to bring it all together, we use the groupBy command we discussed earlier to ensure our dataset is properly partitioned into groups representing specific store and item combinations. We then simply applyInPandas the function to our DataFrame, allowing it to fit a model and make predictions on each grouping of data.
The dataset returned by the application of the function to each group is updated to reflect the date on which we generated our predictions. This will help us keep track of data generated during different model runs as we eventually take our functionality into production.
from pyspark.sql.functions import current_date
results = (
store_item_history
.groupBy('store', 'item')
.applyInPandas(forecast_store_item, schema=result_schema)
.withColumn('training_date', current_date())
)
Next steps
We have now constructed a forecast for each store-item combination. Using a SQL query, analysts can view the tailored forecasts for each product. In the chart below, we've plotted the projected demand for product #1 across 10 stores. As you can see, the demand forecasts vary from store to store, but the general pattern is consistent across all of the stores, as we would expect.
As new sales data arrives, we can efficiently generate new forecasts and append these to our existing table structures, allowing analysts to update the business’s expectations as conditions evolve.
To generate these forecasts in your Databricks environment, check out our Solution Accelerator for Demand Forecasting.
To access the prior version of this notebook, built for Spark 2.0, please click this link.