Efficient Distributed Hyperparameter Tuning with Apache Spark

May 27, 2021 03:50 PM (PT)

Download Slides

Hyperparameter tuning is a key step in achieving and maintaining optimal performance from Machine Learning (ML) models. Today, there are many open-source frameworks which help automate the process and employ statistical algorithms to efficiently search the parameter space. However, optimizing these parameters over a sufficiently large dataset or search space can be computationally infeasible on a single machine. Apache Spark is a natural candidate to accelerate such workloads, but naive parallelization can actually impede the overall search speed and accuracy. 

 

In this talk, we’ll discuss how to efficiently leverage Spark to distribute our tuning workload and go over some common pitfalls.  Specifically, we’ll provide a brief introduction to tuning and motivation for moving to a distributed workflow. Next, we’ll demonstrate best practices when utilizing Spark with Hyperopt – a popular, flexible, open-source tool for hyperparameter tuning. This will include topics such as how to distribute the training data and appropriately size the cluster for the problem at hand. We’ll also touch on the conflicting nature between parallel computation and Sequential Model-Based Optimization methods, such as the Tree-structured Parzen Estimators implemented in Hyperopt. Afterwards, we’ll demonstrate these practices with Hyperopt using the SparkTrials API. Additionally, we’ll showcase joblib-spark, an extension our team recently developed, which uses Spark as a distributed backend for scikit-learn to accelerate tuning and training.

 

This talk will be generally accessible to those familiar with ML and particularly useful for those looking to scale up their training with Spark.

In this session watch:
Viswesh Periyasamy, Software Engineer, Databricks

 

Transcript

Viswesh Periyas…: Hi everyone and thanks for coming to my talk today. My name is Viswesh Periyasamy and today I’ll be talking to you all about distributed hyperparameter tuning and specifically some best practices when tuning with Apache Spark. For the purpose of this talk, I’m going to assume a basic understanding of hyperparameter tuning as well as Spark. If you’re familiar with each of these subjects, this should be a good primer to start scaling up your tuning with Spark. So, a little about me, I’m currently a software engineer at Databricks, on the machine learning team, primarily focused on building tools to help improve the model training experience. Prior to that, I was an engineer at Confluent and before that, I got my master’s in machine learning and bioinformatics from The University of Wisconsin, Madison.
And a quick note about the company that I work for. Databricks is a unified platform for data analytics and AI, that’s built on a paradigm we’ve innovated called the Lakehouse architecture. We’re the creators of many open source projects, such as Apache Spark, Delta Lake MLflow and we’ve contributed to several others along the way. Now, before we dive into the details, let’s walk through a quick scenario. You’re a budding data scientists in the analytics department at Krusty Krab LLC and you’re tasked with building a propensity model to predict which Krabby Patties specials will be the most lucrative. And Mr. Krabs gives you transactional data from the Rock Bottom location to help you get started. Fortunately, the first model you whip up performs great, sales are through the roof and now, Mr. Krabs wants to try 10 different specials in 1,000 cities across the ocean. Unfortunately, the customers in other locations don’t respond well to the first model that you trained.
You realize you’re going to have to retrain this propensity model on the entire transaction database and tune the hyperparameters every day. To help you scale, Mr. Krabs gives you access to the team Spark cluster but after your first attempt, the cluster crashes. You realize the cluster is running out of memory, so you down sample the dataset as a quick workaround. Okay, now it’s not crashing but it’s taking days to get any results. Turns out, Mr. Krabs is a cheapskate and is forcing all of his data scientists to share the smallest cluster available. Well, after a hearty debate, you convince Mr. Krabs to give you a dedicated cluster and that seems to then speed things up. But hold on, now the model you got back is performing even worse than before and you’re utterly confused because distributed hyperparameter tuning was supposed to make your life easier. Jokes aside, these problems are inspired by real anecdotes we’ve gathered from speaking to customers in the field. Hyperparameter tuning is hard and throwing Spark in the mix can make it even more challenging.
But fear not, after this talk you’ll be predicting Krabby Patty revenue at scale with ease. So, now that we set the context, let’s get into the agenda for today. First, I’ll provide a brief overview of hyperparameter tuning and some motivation for when and how we’d move to a distributed workflow with Spark. After that, I’ll go over some of the pitfalls you might encounter when applying Spark to your own tuning workloads, as well as best practices for how to mitigate them. And finally, I’ll finish up with a demo showing you some of these principles in action, with was Hyperopt and joblib-Spark, which are two open source frameworks with distributed tuning capabilities.
Before we get started, let’s quickly define what hyperparameters are. If we take a look at this two dimensional data, we can cluster it using K means by varying K, for example, K equals two might look something like this. Whereas, K equals four could look something like this. Now, the centroids of these clusters are called parameters because they’re internal variables that are inferred from the data. But K is specifically referred to as a hyperparameter because it’s an external configuration that controls how the model derives its centroids. In this case, the K means algorithm requires us to give it a value for K because it cannot be learned via training. Hyperparameters typically fall into two categories, the first being model hyperparameters, which are problem specific and affect the architecture of the train model. K and K means can be classified in this bucket because it determines how many clusters we derive.
The second category is what I’ll call optimization hyperparameters, which don’t directly impact the model but instead affect the speed or the quality of the training. Taking K means as an example, once again, we could run the algorithm any number of times to try and refine our clustering and the number of iterations or epics can be considered an optimization hyperparameter. Choosing the right hyperparameters is a key step in productionizing a model because it can have a significant impact on the model’s performance. In fact, each time you retrain and deploy your model, you might need to update these values, as they’re really dependent on the data and the problem at hand. But as I mentioned before, these hyperparameters have to be set by us, the user. So, how does one go about choosing these values? Domain expertise can bring some intuition as to how to set these values but in all other cases, we can take a more systematic approach to tuning these values.
Now, there’s a variety of methods to do this but I’ll just touch on a few and the first is grid search. Most people are familiar with this method, which brute forces every combination in a discrete space. In this example, the contour lines illustrate the optima that we’re searching for, with the yellow peaks and the global optimum is denoted by the green dot. Another common approach is to do a random search, by sampling from a distribution of our choosing. This method is popular because it can achieve similar or better results than grid search with a much smaller computation budget. However, it’s still a blind search and we may end up wasting a lot of resources on less promising models. The last approach I want to cover is Bayesian optimization. If you’re not familiar with it, it’s an adaptive search algorithm that uses previous samples to help inform the next sample.
In our example, we can see that the sample start to gravitate towards this peak, as they learn the distribution that we’re trying to optimize. This lets us spend our competition budget much more wisely, since we don’t explore less promising models or areas in our search space. There’s a lot of great literature on this topic, so I won’t get too deep into the details. Now, Bayesian optimization is a huge improvement over the other two approaches but it doesn’t solve everything. And that’s because hyperparameter tuning is inherently challenging. For starters, hyperparameters can be really unintuitive. Even with approaches like Bayesian optimization, how do we know which hyperparameters to optimize? Or how large of a search space do we define? And which distributions should we even sample from? These are all really tough questions to reason about without domain expertise. Additionally, there’s a great deal of computational complexity associated with this problem.
We’re solving a non convex optimization problem, like the one depicted in this polynomial. Essentially, what this means is that, we can’t guarantee that we’ll land at the optimal solution in any reasonable amount of time. It’s an NP-hard problem, which is why we need black box strategies like Bayesian optimization. Not only that but evaluating hyperparameters can get very expensive very quickly. If you’ve heard of the curse of dimensionality, hyperparameter tuning definitely suffers from it. As you can see in the plot on the right, the number of potential configurations will skyrocket with even just a few hyperparameters. At that scale, tuning on a single machine is just impractical to sheer computational costs. And we can maybe build some intuition around the problem over time and that can help with the first point. But what can we do about the second one? Well, as you may have guessed from the title of this talk, we’re going to use Spark to accomplish this.
If you’re unfamiliar with Spark, a cluster consists of a single driver node and several worker nodes. The driver note is the machine that the user interacts with, which takes care of partitioning the workload and scheduling which tasks are going to be executed where. The workers are then responsible for executing their assigned tasks. Coming back to hyperparameter tuning, let’s apply this cluster topology to a tuning workload. In the driver node, you’ll define your hyperparameters search space, the model that you want to train and the objective that you’re going to try and optimize. The worker nodes will then take care of training each model configuration as individual Spark tasks. If you’re familiar with Spark, you might notice that we’re actually repurposing it to execute some black box functions in parallel, as opposed to operating on the core paradigm of Spark, which is resilient distributed datasets or RDDs. That’s totally fine because we can still leverage all of Spark’s built in functionality to handle the complexities of running a distributed workload.
With this pattern, you can run as many evaluations in parallel as your cluster permits. It’s a great way to quickly accelerate your tuning workloads, by making use of the full compute power of your cluster. In practice, there are a variety of ways to achieve this, using libraries such as Hyperopt or joblib-Spark, which I’ll elaborate more and more in the demo. Now that we define what distributed tuning looks like with Spark, let’s get into some pitfalls and best practices that help you get the most out of your cluster. First up, let’s talk trade-offs around parallelism and performance. The parallel task execution model works great for methods like grid search and random search with Spark, since trials don’t need to share any information with each other. However, parallelism does not play very nicely with Bayesian optimization methods. Earlier, we mentioned that Bayesian optimization will use the outcome of previous trials to inform how we should sample our next trial. But when we execute trials in parallel, we’re ignoring that intermediary knowledge from past trials.
Taking this to the extreme, if we reduce the parallelism to one, we take full advantage of Bayesian optimization to find a high quality model. But at that point, we’re wasting the clusters resources and have made the search as slow as possible. On the other extreme, we could increase the parallelism to the total number of evaluations and we’d finish every trial in the time it takes to train a single model. However, the trials wouldn’t be able to learn from each other and we’ve effectively devolved into a random search. So, which one should you sacrifice on, speed or performance? Well, ideally neither. Your parallelism is bounded by the number of cores available to you. But a good rule of thumb is to keep it an order of magnitude smaller than the total number of trials or models you want to evaluate. This way, you can distribute a good chunk of your workload while still allowing methods like Bayesian optimization to learn from previous iterations. So, for example, if you had 32 cores available and wanted to run 300 evaluations in total, a good parallelism value would be 16 or 32.
I’d like to add a caveat to this note, which is, what happens when your training library is already distributed? For example, you might be using Spark’s MLlib or Horovod or Distributed XGBoost for your model training. These libraries already make full use of the clusters compute resources for each evaluation. In these scenarios, it doesn’t make sense to evaluate models in parallel, instead, you should train each model sequentially and fully utilize the Bayesian optimization approach. Now that we have a better understanding of the trade-offs around parallelism, let’s get into data distribution and how we can do that as efficiently as possible. One thing you’ll notice is that, each trial here is almost identical, the only variable is the set of hyperparameters we’re evaluating within that task. So, we have a really great opportunity to share or reuse a lot of that data.
Spark has a handy mechanism for just that, known as broadcasting. Essentially, what this allows you to do is reduce the number of times the data is sent to each worker, from once per task to once per node. Additionally, you get the added benefit of reducing the memory footprint, since every task can reference the same data in memory. Here’s a snippet of what the default behavior of Spark looks like. If we reference the data object directly, Spark will serialize the objective function along with the data and send it to each evaluation task. With larger datasets, this can stress the bandwidth of the network and requires each task to materialize its own copy of the dataset. There’s another method to data distribution, which is, using a distributed file system. And in that scenario, you’d load the data from within each task. This helps lift the burden from the driver to serialize and send the data but each task still needs to materialize its own copy from the file system.
The best way to improve this is to leverage Spark’s broadcasting mechanism, which can be implemented with just two lines of code. By referencing the broadcast variable instead, Spark will only serialize the objective function and each task can read from its workers copy of the data during execution. You can apply this principle to any shared data that your trials need, such as large reprocessors or even models that are in your pipeline. To help illustrate the power of broadcasting, we’ve benchmarked these methods with Hyperopt on both storage optimized and memory optimized clusters. You’ll notice a data point missing for the serialized method in the first plot and that’s because the serialization cost was so high that the workers crashed due to insufficient memory. With smaller datasets, these methods all look comparable. However, in the realm of hundreds or even thousands of megabytes, the distribution method starts to have a big impact. And in some cases, the cost can be high enough to cause network or memory failures. In any case, broadcasting consistently reduces the overhead of data distribution.
There’s one caveat to this, which is that, broadcasting is limited by the maximum result size in your Spark configuration. For example, on Databricks, this is two gigabytes. In these scenarios, you should default to using the load method with a distributed system, to avoid overloading the driver. But in all other cases, broadcasting is your friend and should be used generously. The last thing I want to cover is a topic that data scientists may not typically be concerned about but is important when using Spark to scale your workload. And that’s choosing an appropriately sized cluster. If you move to a distributed tuning workflow and still encounter slow execution or even failures, it’s important to understand these limitations and how to address them. First, you’ll want to consider using instance types that are appropriate for the problem at hand because shared or general purpose clusters will typically struggle with specialized workloads. For example, you might want to use compute optimized clusters for data intensive algorithms or a GPU accelerated cluster for deep learning workloads.
Another thing to be aware of is that, your training algorithms may be able to use multiple cores and it’s in your best interest to leverage that. Again, this is because Bayesian optimization learns iteratively, so it’s better to have evaluations finish faster in sequence than slower in parallel. For example, if your scikit-learn model has an end jobs parameter to use multiple threads during the fitting process, it’s typically advantageous to maximize this value. The last thing I want to highlight is that, your cluster should have enough workers to match the size of the search space. This one might seem like a no-brainer but hyperparameter tuning is an extremely expensive process and you won’t get far without the right resources. Even with Bayesian optimization, you may still want to try hundreds or even thousands of configurations and you’ll want to make sure that you’re not bottlenecked by the number of nodes in your cluster.
Conversely, if scaling the nodes up is not an option, you should try reducing the total number of evaluations in your search budget. Another solution to this problem is to use an auto-scaling cluster but you should be careful with this, since some libraries will ask you to provide the parallelism value upfront before the tuning starts. In these cases, you’ll want to set the max parallelism to the value you intend to scale to, otherwise the tuning process may be unaware of the newer resources available to it. To help illustrate some of these ideas, let’s see them in action from a Databricks notebook.

Speaker 1: All right. So, here we are in a Databricks notebook, where I want to train a random forest classifier on the credit card fraud detection dataset. For this demo, let’s take a look at how we can accomplish this using Hyperopt and SparkTrials. Hyperopt is an open source tuning library that uses a search algorithm called The Tree Parzen Estimators. For those of you who are unfamiliar with it, it’s another form of Bayesian optimization and it’s sort of the bread and butter of Hyperopt. I’ll first show you how we can use Hyperopt for single node hyperparameter tuning but after that, we’ll modify it to use the SparkTrial’s extension, which is a Spark powered backend that our team contributed. In this notebook, I’m using a memory optimized cluster with 16 cores per worker. I’ll skip over most of the data loading and preparation but a brief look at the data shows that it has 30 features and a label column that denotes whether these transactions were labeled as fraudulent or not.
Most of these features have been anonymized through a PTA transformation. For this example, we’re going to tune four hyperparameters. The number of trees in the forest, the split criteria, the minimum number of samples for a leaf node and the minimum number of samples for a split. And because I don’t have too much intuition on how to define the search space, I’m just using a range centered around the default values. I’m also defining that I want to try 200 different evaluations or models out in total. Over here in the objective function, we’re going to instantiate the models with the hyperparameters that were just passed to this trial. Then we’re going to train the model, calculate the F1 score and then, finally, negate it as our loss function. Before we get into the results, I’m going to quickly show you what Hyperopt standard API looks like.
Here, we’re calling the minimization function fmin and we’re passing to it the objective function we just defined, the search space, the algorithm which we want to use, which is TPE, the total number of evaluations and finally, a trials object to return the results. For brevity, I’ve already run these trials and you can see that they took almost six hours to complete, with these hyperparameters being selected as the best. Yikes, that’s a really long time. Now, let’s take a look at how we can quickly speed this up using SparkTrials. The first thing we want to do is broadcast the data to the workers. You can see here that I’ve broadcasted each variable after splitting the data into training and validation sets. This is because I want to make sure the same data split is used in every trial. Next, I’m going to redefine the objective function to grab the broadcasted variables. The rest of this function is exactly the same.
After that, all we need to do is swap out our trials object with a SparkTrials object and set the parallelism. The cluster I’m using has 64 cores available on the workers but since we decided on 200 evaluations, I think 32 should be an appropriate choice here, to stay in an order of magnitude less than the evaluations. Looking at the execution time of this all, SparkTrials is able to cut the tuning duration to 16 minutes, that’s a whopping 95% reduction in training time. As we can see, it’s pretty easy to use SparkTrials to make your tuning workloads much faster but there’s actually another way that we can do that, that requires a lot less instrumentation. So, let me switch over to my other demo here. Now, I want to take the same problem and the same dataset but show you how we can boost your performance if you’re in the scikit-learn ecosystem, using joblib-Spark. joblib-Spark is a Spark powered backend that our team built as an extension to joblib, which is used to paralyze work in scikit-learn.
For this example, let’s take a look at the randomized search CV API, which is scikit-learn’s random search implementation with cross validation. The majority of this notebook is pretty similar, although you’ll notice a few things. First, the search space only difference in the sample distributions. We’re using the same ranges and number of evaluations as before but using the SciPy stats API to define the distributions. The next thing is that, we don’t need to explicitly broadcast the data, since joblib-Spark will handle that for us in the background. We also don’t need to define an objective function because scikit-learn is going to automatically score and cross validate our models for us. Let’s first look at the results when we run this on a single machine. For our call to randomized search CV, we’ll pass it the model, the search space and the number of evaluations, just like we did for Hyperopt. We’re also going to use twofold cross validation, to keep the workload smaller. But keep in mind that with twofold, we’re really training 400 models in this example.
Lastly, randomized search CV supports the end jobs parameter, so we’ll use the 8 cores available on the driver to paralyze as much as we can on a single machine. As you can see, this finished in 47 minutes, which is much faster than the Hyperopt trials and this is what doubled the number of evaluations. We can attribute that to the single machine parallelization we set with the end jobs parameter. But let’s see if we can do better than that. To make this tuning distributed, all we need to do is add this line here to register our Spark backend and then, declare that we’re using it with the parallelism of our choice here. Since we’re not using Bayesian optimization, we can use the full 64 cores available across the entire cluster. And if we look at the execution time here, we can see that joblib-Spark cut it down to 11 minutes, found an equally strong model and required virtually no extra code.

Viswesh Periyas…: All right. So, hopefully that demo helps solidify some of the concepts that we talked about today but I’d like to reiterate them one more time. The first idea we talked about was this trade-off between parallelism and performance, when using Bayesian optimization methods. And the key takeaway there was to set this parallelism to be in order of magnitude smaller than the total number of evaluations that you want to run. The second idea we discussed was the benefits of broadcasting your data, so that it can be cached on the worker nodes for all of the evaluations to share. As long as your Spark configuration supports the data size, this is always beneficial and you can default to loading from a distributed file system in special cases. The last note was about allocating cluster resources and some examples of when you might want to use certain instance types or scale up the number of cords or nodes in your cluster.
And to recap from the demo, I showed you two frameworks that allow you to transition to a distributed tuning workflow. The first was Hyperopt with the SparkTrials extension. As we demonstrated, Hyperopt is extremely flexible and makes use of The Tree Parzen Estimator algorithm, which is a form of Bayesian optimization. While it requires a bit more instrumentation, it’s applicable to any framework or optimization problem. The other framework that we looked at today was joblib-Spark, which allows you to distribute scikit-learn workloads by using a Spark powered backend for joblib. It’s extremely easy to use and requires just a few lines of code. However, it is limited to scikit-learn APIs and does not have Bayesian optimization out of the box. In the SparkTrials demo we looked at, we accelerated the sequential tuning by over 20X. And in the joblib-Spark demo, we were still able to speed up parallel tuning workloads by 3X.
Both of these extensions are very straightforward to adopt and can achieve immense improvements in performance. So, I’ll leave it up to you to decide which one is more apt for your use case. I’ve added a tiny URL link to the demo, if you want to check out the notebooks from today. As a final note, I’d like to leave you with some resources, including recent talks, blog posts and documentation that explore some of these topics in depth. And a special thanks to my colleagues, Joseph Bradley and Sean Owen, for help with this talk. They have a ton of great content, some of which are listed here. These slides will be posted, so feel free to come back and check these out. And that’s it, thanks for your time.

Viswesh Periyasamy

Viswesh Periyasamy is a Software Engineer on the Machine Learning team at Databricks, primarily focused on building infrastructure for model training and tuning. Previously, he was a Software Engineer...
Read more