Advances in time series forecasting are enabling retailers to generate more reliable demand forecasts. The challenge now is to produce these forecasts in a timely manner and at a level of granularity that allows the business to make precise adjustments to product inventories. Leveraging Apache Spark™ and Facebook Prophet, more and more enterprises facing these challenges are finding they can overcome the scalability and accuracy limits of past solutions.
Go directly to the forecasting notebook referenced in this post
To see this solution for Spark 2.0, please read the original post here
In this post, we’ll discuss the importance of time series forecasting, visualize some sample time series data, and then build a simple model to show the use of Facebook Prophet. Once you’re comfortable building a single model, we’ll combine Facebook Prophet with the magic of Spark to show you how to train hundreds of models at once, allowing you to create precise forecasts for each individual product-store combination at a level of granularity rarely achieved until now.
Improving the speed and accuracy of time series analyses in order to better forecast demand for products and services is critical to retailers’ success. If too much product is placed in a store, shelf and storeroom space can be strained, products can expire and retailers may find their financial resources tied up in inventory, leaving them unable to take advantage of new opportunities generated by manufacturers or shifts in consumer patterns. If too little product is placed in a store, customers may not be able to purchase the products they need. Not only do these forecast errors result in an immediate loss of revenue to the retailer, but over time, consumer frustration may drive customers towards competitors.
New expectations require more precise time series models and forecasting methods
For some time, enterprise resource planning (ERP) systems and third-party solutions have provided retailers with demand forecasting capabilities based on simple time series models. But with advances in technology and increased pressure in the sector, many retailers are looking to move beyond the linear models and more traditional algorithms historically available to them.
New capabilities, such as those provided by Facebook’s Prophet, are emerging from the data science community, and companies are seeking the flexibility to apply these machine learning (ML) models to their time series forecasting needs.
This movement away from traditional forecasting solutions requires retailers and the like to develop in-house expertise not only in the complexities of demand forecasting but also in the efficient distribution of the work required to generate hundreds of thousands or even millions of ML models in a timely manner. Luckily, we can use Spark to distribute the training of these models, making it possible to predict both demand for products and services and the unique demand for each product in each location.
To demonstrate the use of Facebook Prophet to generate fine-grained demand forecasts for individual stores and products, we will use a publicly available dataset from Kaggle. It consists of 5 years of daily sales data for 50 individual items across 10 different stores.
Next, by viewing the same data on a monthly basis, it’s clear that the year-over-year upward trend doesn’t progress steadily each month. Instead, there is a clear seasonal pattern of peaks in the summer months and troughs in the winter months. Using the built-in data visualization feature of Databricks Collaborative Notebooks, we can see the value of our data during each month by mousing over the chart.
週単位では、売上のピークが日曜日（weekday 0）にあり、月曜日（weekday 1）に大きく落ち込んだ後、残りの平日に徐々に回復する傾向がみてとれます。
As illustrated above, our data shows a clear year-over-year upward trend in sales, along with both annual and weekly seasonal patterns. It’s these overlapping patterns in the data that Facebook Prophet is designed to address.
Facebook Prophet follows the scikit-learn API, so it should be easy to pick up for anyone with experience with sklearn. We need to pass in a two-column pandas DataFrame as input: the first column is the date, and the second is the value to predict (in our case, sales). Once our data is in the proper format, building a model is easy:
import pandas as pd from fbprophet import Prophet # instantiate the model and set parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model to historical data model.fit(history_pd)
Now that we have fit our model to the data, let’s use it to build a 90 day forecast:
# define a dataset including both historical dates & 90-days beyond the last available date, using Prophet's built-in make_future_dataframe method future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) # predict over the dataset forecast_pd = model.predict(future_pd)
That’s it! We can now visualize how our actual and predicted data line up as well as a forecast for the future using the Facebook Prophet model’s built-in .plot method. As you can see, the weekly and seasonal demand patterns shown earlier are reflected in the forecasted results.
predict_fig = model.plot(forecast_pd, xlabel='date', ylabel='sales') display(fig)
This visualization is a bit busy. Bartosz Mikulski provides an excellent breakdown of it that is well worth checking out. In a nutshell, the black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval.
Training hundreds of time series forecasting models in parallel with Facebook Prophet and Spark
Now that we’ve demonstrated how to build a single model, we can use the power of Spark to multiply our efforts. Our goal is to generate not one forecast for the entire dataset, but hundreds of models and forecasts for each product-store combination, something that would be incredibly time-consuming to perform as a sequential operation.
Spark DataFrames を使用した時系列データの分散処理
Data scientists frequently tackle the challenge of training large numbers of models using a distributed data processing engine such as Spark. By leveraging a Spark cluster, individual worker nodes in the cluster can train a subset of models in parallel with other worker nodes, greatly reducing the overall time required to train the entire collection of time series models.
The key mechanism for achieving distributed data processing in Spark is the DataFrame. By loading the data into a Spark DataFrame, the data is distributed across the workers in the cluster. This allows these workers to process subsets of the data in a parallel manner, reducing the overall amount of time required to perform our work.
Of course, each worker needs to have access to the subset of data it requires to do its work. By grouping the data on key values, in this case on combinations of store and item, we bring together all the time series data for those key values onto a specific worker node.
store_item_history .groupBy('store', 'item') . . .
We share the groupBy code here to underscore how it enables us to train many models in parallel efficiently, although it will not actually come into play until we set up and apply a custom pandas function to our data in the next section.
Leveraging the power of pandas functions
With our time series data properly grouped by store and item, we now need to train a single model for each group. To accomplish this, we can use a pandas function, which allows us to apply a custom function to each group of data in our DataFrame.
This function will not only train a model for each group, but also generate a result set representing the predictions from that model. But while the function will train and predict on each group in the DataFrame independent of the others, the results returned from each group will be conveniently collected into a single resulting DataFrame. This will allow us to generate store-item level forecasts but present our results to analysts and managers as a single output dataset.
As you can see in the abbreviated code below, building our function is relatively straightforward. Unlike in previous versions of Spark, we can declare our functions in a fairly streamlined manner, specifying the type of pandas object we expect to receive and return, i.e. Python type hints.
def forecast_store_item(history_pd: pd.DataFrame) -> pd.DataFrame: # instantiate the model, configure the parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model model.fit(history_pd) # configure predictions future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) # make predictions results_pd = model.predict(future_pd) # . . . # return predictions return results_pd
Now, to bring it all together, we use the groupBy command we discussed earlier to ensure our dataset is properly partitioned into groups representing specific store and item combinations. We then simply applyInPandas the function to our DataFrame, allowing it to fit a model and make predictions on each grouping of data.
from pyspark.sql.functions import current_date results = ( store_item_history .groupBy('store', 'item') .applyInPandas(forecast_store_item, schema=result_schema) .withColumn('training_date', current_date()) )
We have now constructed a forecast for each store-item combination. Using a SQL query, analysts can view the tailored forecasts for each product. In the chart below, we’ve plotted the projected demand for product #1 across 10 stores. As you can see, the demand forecasts vary from store to store, but the general pattern is consistent across all of the stores, as we would expect.
As new sales data arrives, we can efficiently generate new forecasts and append these to our existing table structures, allowing analysts to update the business’s expectations as conditions evolve.
To generate these forecasts in your Databricks environment, please import the following notebook:
To access the prior version of this notebook, built for Spark 2.0, please click this link.