To get started with SparkR, download Apache Spark 1.5 or sign up for a 14-day free trial of Databricks today.
Apache Spark 1.5 adds initial support for distributed machine learning over SparkR DataFrames. To provide an intuitive interface for R users, SparkR extends R’s native methods for fitting and evaluating models to use MLlib for large-scale machine learning. In this blog post, we cover how to work with generalized linear models in SparkR, and how to use the new R formula support in MLlib to simplify machine learning pipelines. This work was contributed by Databricks in Spark 1.5. We’d also like to thank Alteryx for providing input on early designs.
Generalized Linear Models
Generalized linear models unify various statistical models such as linear and logistic regression through the specification of a model family and link function. In R, such models can be fitted by passing an R model formula, family, and training dataset to the
glm() function. Spark 1.5 extends
glm() to operate over Spark DataFrames, which are distributed data collections managed by Spark. We also support elastic-net regularization for these models, the same as in R’s
Since we extend R’s native methods for model fitting, the interface is very similar. R lets you specify the modeling of a response variable in a compact symbolic form. For example, the formula
y ~ f0 + f1 indicates the response
y is modeled linearly by variables
f1. In 1.5 we support a subset of the R formula operators available. This includes the
. (include all), and intercept operators. To demonstrate glm in SparkR, we will walk through fitting a model over a 12 GB dataset (with over 120 million records) in the example below. Datasets of this size are hard to train on a single machine due to their size.
The dataset we will operate on is the publicly available airlines dataset, which contains twenty years of flight records (from 1987 to 2008). We are interested in predicting airline arrival delay based on the flight departure delay, aircraft type, and distance traveled.
> airlines <- read.df(sqlContext, path="/home/ekl/airlines", source="com.databricks.spark.csv", header="true", inferSchema="true") > planes <- read.df(sqlContext, "/home/ekl/plane_info", source="com.databricks.spark.csv", header="true", inferSchema="true") > joined <- join(airlines, planes, airlines$TailNum == planes$tailnum)
We use functionality from the DataFrame API to apply some preprocessing to the input. As part of the preprocessing, we decide to drop rows containing null values by applying
dropna() to the DataFrame.
> training <- dropna(joined) > showDF(select(training, “aircraft_type”, “Distance”, “ArrDelay”, “DepDelay”)) aircraft_type | Distance | DepDelay | ArrDelay ----------------------------|----------|-----------|---------- "Balloon" | 23 | 18 | 20 "Fixed Wing Multi-Engine" | 815 | 2 | -2 "Fixed Wing Single-Engine" | 174 | 0 | 1
The next step is to use MLlib by calling
glm() with a formula specifying the model variables. We specify the Gaussian family here to indicate that we want to perform linear regression. MLlib caches the input DataFrame and launches a series of Spark jobs to fit our model over the distributed dataset.
> model <- glm(ArrDelay ~ DepDelay + Distance + aircraft_type, family = "gaussian", data = training)
Note that parameter “lambda” can be used with glm to add regularization and “alpha” to adjust elastic-net constant.
As with R’s native models, coefficients can be retrieved using the
> summary(model) $coefficients Estimate (Intercept) -0.5155037863 DepDelay 0.9776640253 Distance -0.0009826032 aircraft_type__Fixed Wing Multi-Engine 0.3348238914 aircraft_type__Fixed Wing Single-Engine 0.2296622061 aircraft_type__Balloon 0.5374569269
Note that the
aircraft_type feature is categorical. Under the hood, SparkR automatically performs one-hot encoding of such features so that it does not need to be done manually. Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.
To evaluate our model we can also use
predict() just like in R. We can pass in the training data or another DataFrame that contains test data.
> preds <- predict(model, training) > errors <- select( preds, preds$label, preds$prediction, preds$aircraft_type, alias(preds$label - preds$prediction, "error"))
Since the returned DataFrame contains the original columns in addition to the label, features, and predicted value, it is easy to inspect the result. Here we take advantage of the built-in visualizations from Databricks to examine the error distribution with respect to the aircraft type.
> display(sample(errors, F, .0001))
In summary, SparkR now provides seamless integration of DataFrames with common R modeling functions, making it simple for R users to take advantage of MLlib’s distributed machine learning algorithms.
To learn more about SparkR and its integration with MLlib, see the latest SparkR documentation.
R formula support in other languages
SparkR implements the interpretation of R model formulas as an MLlib feature transformer, for integration with the ML Pipelines API. The RFormula transformer provides a convenient way to specify feature transformations like in R.
To see how the RFormula transformer can be used, let’s start with the same airlines dataset from before. In Python, we create an RFormula transformer with the same formula used in the previous section.
>>> import pyspark.ml.feature.RFormula >>> formula = RFormula( formula="ArrDelay ~ DepDelay + Distance + aircraft_type")
After the transformation, a DataFrame with features and label column appended is returned. Note
that we have to call
fit() on a dataset before we can call
fit() step determines the mapping of categorical feature values to vector indices in the output, so that the fitted RFormula can be used across different datasets.
>>> formula.fit(training).transform(training).show() +--------------+---------+---------+---------+--------------------+------+ | aircraft_type| Distance| DepDelay| ArrDelay| features| label| +--------------+---------+---------+---------+--------------------+------+ | Balloon| 23| 18| 20| [0.0,0.0,23.0,18.0]| 20.0| | Multi-Engine| 815| 2| -2| [0.0,1.0,815.0,2.0]| -2.0| | Single-Engine| 174| 0| 1| [1.0,0.0,174.0,0.0]| 1.0| +--------------+---------+---------+---------+--------------------+------+
Any ML pipeline can include the RFormula transformer as a pipeline stage, which is in fact how SparkR implements
glm(). After we have created an appropriate RFormula transformer and an estimator for the desired model family, fitting a GLM model takes only one step:
>>> import pyspark.ml.Pipeline >>> import pyspark.ml.regression.LinearRegression >>> estimator = LinearRegression() >>> model = Pipeline(stages=[formula, estimator]).fit(training)
When the pipeline executes, the features referenced by the formula will be encoded into an output feature vector for use by the linear regression stage.
We hope that RFormula will simplify the creation of ML pipelines by providing a concise way of expressing complex feature transformations. Starting in Spark 1.5 the RFormula transformer is available for use in Python, Java, and Scala.
As part of this blog post, we would like to thank Dan Putler and Chris Freeman from Alteryx for useful discussions during the implementation of this functionality in SparkR, and Hossein Falaki for input on content.