Announcing Ray Autoscaling support on Databricks and Apache Spark™
Ray is an open-source unified compute framework that simplifies scaling AI and Python workloads in a distributed environment. Since we introduced support for running Ray on Databricks, we've witnessed numerous customers successfully deploying their machine learning use cases, which range from forecasting and deep reinforcement learning to fine-tuning LLMs.
With the release of Ray version 2.8.0, we are delighted to announce the addition of autoscaling support for Ray on Databricks. Autoscaling is essential because it allows resources to dynamically adjust to fluctuating demands. This ensures optimal performance and cost-efficiency, as processing needs can vary significantly over time, and it helps maintain a balance between computational power and expenses without requiring manual intervention.
Ray autoscaling on Databricks can add or remove worker nodes as needed, leveraging the Spark framework to enhance scalability, cost-effectiveness, and responsiveness in distributed computing environments. This integrated approach is far simpler than the alternative of implementing OSS autoscaling by eliminating the need for defining complex permissions, cloud initialization scripts, and logging configurations. With a fully-managed, production-capable, and integrated autoscaling solution, you can greatly reduce the complexity and cost of your Ray workloads.
Create Ray cluster on Databricks with autoscaling enabled
To get started, simply install the latest version of Ray
The next step is to establish the configuration for the Ray cluster that we're going to be starting by using the `ray.util.spark.setup_ray_cluster() ` function. In order to leverage autoscaling functionality, specify the maximum number of worker nodes that the Ray cluster can use, define the allocated compute resources, and set the Autoscale flag to True. Additionally, it is critical to ensure that the Databricks cluster has been started with autoscaling enabled. For more details, please refer to the documentation.
Once these parameters have been set, when you initialize the Ray cluster, autoscaling will function exactly as Databricks autoscaling does. Below is an example of setting up a Ray cluster with the ability to autoscale.
This feature is compatible with any Databricks cluster running Databricks Runtime version 14.0 or above.
To learn more about the parameters that are available for configuring a Ray cluster on Spark, please refer to the setup_ray_cluster documentation. Once the Ray cluster is initialized, the Ray head node will show up on the Ray Dashboard.
When a job is submitted to the Ray cluster, the Ray Autoscaler API requests resources from the Spark cluster by submitting tasks with the necessary CPU and GPU compute requirements. The Spark scheduler scales up worker nodes if the current cluster resources cannot meet the task's compute demands and scales down the cluster when tasks are completed and no additional tasks are pending. You can control the scale-up and scale-down velocity by adjusting the autoscale_upscaling_speed and autoscale_idle_timeout_minutes parameters. For additional details about these control parameters, please refer to the documentation. Once the process is completed, Ray releases all of the allocated resources back to the Spark cluster for other tasks or for downscaling, ensuring efficient utilization of resources.
Let's walk through a hyperparameter tuning example to demonstrate the autoscaling process. In this example, we'll train a PyTorch model on the CIFAR10 dataset. We've adapted the code from the Ray documentation, which you can find here.
We'll begin by defining the PyTorch model we want to tune.
We wrap the data loaders in their own function and pass a global data directory. This way we can share a data directory between different trials.
Next, we can define a function that will ingest a config and run a single training loop for the torch model. At the conclusion of each trial, we checkpoint the weights and report the evaluated loss using the `train, report` API. This is done so that the scheduler can stop ineffectual trials that do not improve the model's loss characteristics.
Next, we define the training loop which runs for the total epochs specified in the config file, Each epoch consists of two main parts:
- The Train Loop - iterates over the training dataset and tries to converge to optimal parameters.
- The Validation/Test Loop - iterates over the test dataset to check if model performance is improving.
Finally, we first save a checkpoint and then report some metrics back to Ray Tune. Specifically, we send the validation loss and accuracy back to Ray Tune. Ray Tune can then use these metrics to decide which hyperparameter configuration leads to the best results.
Next, we define the main components to start the tuning job by specifying the search space that the optimizer will select from for given hyperparameters.
Define the search space
The configuration below expresses the hyperparameters and their search selection ranges as a dictionary. For each of the given parameter types, we use the appropriate selector algorithm (i.e., sample_from, loguniform, or choice, depending on the nature of the parameter being defined).
At each trial, Ray Tune will randomly sample a combination of parameters from these search spaces. After selecting a value for each of the parameters within the confines of our configuration that we defined above, it will then train a number of models in parallel in order to find the best-performing one among the group. In order to short-circuit an iteration of parameter selection that isn't working well, we use the ASHAScheduler, which will terminate ineffective trials early i.e. trials whose loss metrics are significantly degraded compared to the current best-performing set of parameters from the run's history.
Tune API
Finally, we call the Tuner API to initiate the run. When calling the training initiating method, we pass some additional configuration options that define the resources that we permit Ray Tune to use per trial, the default storage location of checkpoints, and the target metric to optimize during the iterative optimization. Refer here for more details on the various parameters that are available for Ray Tune.
In order to see what happens when we run this code with a specific declared resource constraint, let's trigger the run with CPU only, using cpus_per_trial = 3 and gpu = 0 with total_epochs = 20 for the run configuration.
We see the autoscaler start requesting resources as shown above and the pending resource logged in the UI shown below.
If the current demand for resources by the Ray cluster cannot be met, it initiates autoscaling of the databricks cluster as well.
Finally, we can see the run finishes the output of the Job shows that some of the bad trials were terminated early leading to compute savings
The same process works without any code change with GPU resources as well without any code change. Feel free to clone the notebook and run it in your environment:
What's next
With the support for autoscaling Ray workload, we take one step further to tighten the integration between Ray and Databricks and help scale your dynamic workloads. Our roadmap for this integration promises even more exciting developments. Stay tuned for further updates!