Benchmark Tests and How-Tos of Convolutional Neural Network on HorovodRunner Enabled Apache Spark Clusters

Download Slides

The freedom of fast iterations of distributed deep learning tasks is crucial for smaller companies to gain competitive advantages and market shares from big tech giants. Horovod Runner brings this process to relatively accessible spark clusters. There have been, however, no benchmark tests on Horovod Runner per se, and very limited scalability benchmark tests on Horovod, the predecessor requiring custom built GPU clusters. For the first time, we show that Databricks’ Horovod Runner achieves significant lift in scaling efficiency for the convolutional neural network (CNN, hereafter) based tasks on both GPU and CPU clusters.

We also implemented the Rectified Adam optimizer for the first time in Horovod Runner. In addition to show test results, we will also discuss lessons we learned on how to do it such as:

  1. cluster settings
  2. distributed model retrieval
  3. how to avoid Horovod timeline but still get training time
  4. how to use Rectified Adam
  5. what type of models will gain scaling efficiency and why, etc.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hello everyone, my name is Wendao. Thanks for coming out to our talk today, today I gonna present with Jing Pan on benchmark tests and how to distribute a deep learning on HorovodRunners. So we both looked at eHealth for those of you not familiar eHouse is an insurance broker who helping people to find best fit and affordable insurance plan and medicare plans. And I work as a Senior Data Scientist at eHealth and I’m working on the engineering and data science are combined so data pipeline and end data product. Currently I’m studying a Doctor in a Business Administration. – Hello this is Jing Pan I am co-presenting with Wendao today. I work at the eHealth as a Senior Staff User Experience Researcher and a Data Scientist. – So in this talk, we start to talk about the Horovod and how it works. Then we gonna jump in to the HorovodRunner and a Horovod Benchmark. Lastly, we’ll have the change of cover in depth of how to use the HorovodRunners. So the first question that I wanted to address here is why do we need a distributed deep learning system?


So as increasing data size and the increasingly different deeper model structure, the change process it can take days and weeks while the way to speed it up is to leverage the distributed deep learning system. There are many systems out there but among all, one of the most popular one is Horovod it is open source and a well-maintained library, which make the distributed deep learning very easy to use and fast.


So use it only needs slightly modified a single note, deep learning code to make a distributed learning on the Horovod and it have demonstrated have great scaling efficiency and currently supporting four popular framework Tensorflow, Keras, pyTorch and MIXNet And it also supporting the two main category of Horovod deep learning system which is data and the model parallelism. So in this model parallelism approach, we are essentially achieving the same model on the different processes unit, whether you fit different parts of the data to the personal unit. But on the other hand model parallelism is a way of changing the same data on the present unit by your split model and fitting into the different processes unit. And most of the cases you can fit your model into one GPU, but not the data. So data is more likely to be the bottleneck. So in this talk we’re just focused on data parallelism and I will walk you through visual intuition of how Horovod works before data Parallelism.

So in terms of a data power listening approach, essentially your data is being divided close to even chunk size of data and sending into the different processing unit. So and also different process unit where it receives the replicate the same model. So when they start training they will have the same model setting and each of the processing unit will calculate gradient independently. But at the end of the each process, you will need to communicate with other devices to making sure they can average gradient. So start to next iteration they were using the same set of the model weight. The intuitive approach to average all this or communicate all this gradients is to sending all the gradients to one single machine and that machine to average all the gradients and send back to the all the processing unit. And that approach is called Parameter Server Approach. Many deep distributed learning system actually implement this such as the TensorFlow. But you will notice that the many know is communicating to the OneNote of which can have… Which will make this particular machine become bottleneck really quickly to the network communication costs. And as we increasing the size the complexity of such system is also gonna increase your model play. So instead the Horovod was used for what’s so-called Ring-AllReduce Approach. To explain this more intuitively I would like to use where the Horovod name come from.


The Horovod is actually named after traditional folk dance, where the dance participants they dance with the liquid hand in this kind of circle shape.

In this graph we are having the 16 dance participants and you can see of one of them, each of them as like a processing unit. And then we giving them the unit number from zero to 15 and once each processing unit finish the category and gradient will start to passing the gradient in this kind of circle shape from zero to one, one to two. And once the one circle back to the ranked zero machine that machine essentially has all the gradients on all the chain. Then you will take an average out and broadcast to the each process unit. So in start to next process, all the model will start to have the same exact way. So in this way we can making sure the motto is changed in a very consistent way.

And this is kind of like a simple by diversion of how Horovod is actually working. But you should have the general idea by now and when Jing Pan walks you all the code details, then you have the second part, your temporary first concept back and hopefully you can help you to understand it better. So how was the scaling efficiency, Uber actually published benchmark answer reports remodel in section ResNet 101 and VGG-16 where the XXS number of GPU while as NH per second which can be used at performance metrics.


You will see transparent rectangle bar that is the optimal scale efficiency. Just imagining that you have linear boozy through GPU you will gather more on performance like this 32 core on GPU we have 32 times faster than the one single GPU. Where the blue bar is the actual performance, the darker blue just means using the data network. So this benchmark really demonstrated it has have really great skill and efficiency, even in case of VGG 16 which has relatively dense network and relatively shallow. But it still skills pretty well. But one thing I wanna mention here is this benchmark is based on the optimized network and infrastructure. So you will require dedicated engineering resources to set it up, including the container MPI and NCCL Fine-tuning is not a trivial task even though many big implemented Horovod and achieve a great scaling efficiency. But there’s a one academic paper trying to replicate such system, no overall skill. In fact so imply that set up such system is not a trivial task and myself we’re a small company from doing so.

So they got break came out and provide the HorovodRunner. So HorovodRunner is a general API to around distributed the deep learning workloads on Databricks using the whole offering book. It was built on top of Horovod and there’s no need to set up on the line infrastructure. If you’re using a wrong time 5.0 or both, you can just enjoy the horrible run out of the box and you can choose a cloud provider from the AWS and agile, and since we’re running on a database so it will run on top of it Databricks Spark Ecosystem. So you will have Data prep and data training. Then we go over a window over around with the Horovod actually assuming you have all the data are preaching or preprocessed already and putting into a different machine. So it’s actually a separate process, but if you’re using the Horovod that you will be in one place and you will also enjoy all the benefit that come with Spark, like a random shuffling, fault tolerance and you were able to using the notebook. So lastly, I wanna mention that Databricks the spark is actually using the barrier execution mode to schedule all the job cause the spark job usually it’s running independently and they just run embarrassingly apparent or but the Horovod actually you require coordination among all the different notes to synchronize all the gradients. Let’s just take a quick look on the HorovodRunners diagram real quick. So HorovodRunners that diagram there you see is running you will have Spark driver and number of processing unit that’s actually runs on the Horovod. and you were using the barrier as solution mode to enable synchronized shootings you synchronize all the gradients and at the end of each batch and they will start all tasks gather and restart all tasks in case of a failure.


So all of this sounds really interesting, but how good does it perform actually. There’s no benchmark available so we decided go ahead and do our benchmark. But before I dive into details I would like to mention that Horovod and Horovodrunner actually come out with something Horovod timeline which can keep track on the each of the units status and performance. But it’s impact on the performance will be badly and even documentation says so, so in Spark here we were just using the standard output log coming from each off the machine and we organize that to get their actual performance.


So the first task of first go of benchmark, where we replicating the example from Databrick for our tutorial, we were just using the very simple CNN model with two on in two convolutional layer. We’re using the AWS C4.2xlarge instance and which is CPU to compare the wrong time on the 50 Epochs from the single instance to this core CPU cluster up to 32 cores. So on the right hand side of the graph on the top, is a single instance performance. It took about 15 seconds, 1500 seconds to finish but as we increasing the number of CPUs in a cluster around 10 review outomatically in the full CPUs in the Horovod cluster he was able to reduce the runtime time more than half but as increasing more and more GPU you will see the skill and efficient kind of died down. But it just because that the data is relatively small, but this demonstrated the HorovodRunner can have great skewing efficiency on the CPU cluster. And we using this array all the epochs, we haven’t done any optimization. So it’s pretty replicatable. And next we are trying to replicate Uber’s Benchmark, and kinda compare Epoch to Epoch to see how good HorovodRunner is actually performed, but it’s not actually the Epoch to Epoch ’cause we are not able to using the same infrastructure. But if you’re using the whole rounded outbox, you will get the same performance and we didn’t do the rest in one-one to two-two versioning capacity issues with HorovodRunner and the tens approach to Horovod that that’s actually what the rest of the Caltech101 was using. So here we are was just presenting the Inception V3 and the VCG-16 benchmark. So the graph is to follow the same format of Uber’s benchmark so(internet breaks) representing the optimal efficiency. So you will see the injection at a core of GPU. (internet breaks) if 80% of us go in efficiency, which quite amazing. And, but as we increasing the number of a GPU, you reduce the skill efficiency to 63% and a 49% respectfully at serge 16 Cores and 32 core. But even the 32 core the 50% of us give efficiency, it’s essentially 16 times faster than running on a single machine. On the other hand the VGG is slightly harder to scale, which is same as Uber’s benchmark, but we were able to scaling Fonda 50% to 20%. So overall he demonstrated it can scale pretty well, but it wasn’t good as performance as Uber’s benchmark.

There other model we also try, we also try to wrong on the graphical convolutional network, which is kind of like a primary motivation launch try it for a run at the first place. ‘Cause graphical convolution now will have multiple performance but it was separate from higher additional costs. We were able to implement it on the HorovodRunner, but currently has no scaling efficiency ’cause the input off the TCA is actually an adjacent matrix which can not be divided so we are not able to leverage the data prevalence benefit. And we think the stock has the GCN might be able to help out but we haven’t tried this out yet. Horovod usually outperforms on the multithreading. So if you have a one single instant with multiple GPU, we will highly recommended and you also try to Horovod. And usually most of the time we were all performing multithreading as well. So this kind of summarize the first part of the talk. Now I will part in to Jing Pan to cover in depth of how you can actually use the HorovodRunners.

– Hello, Databricks previously published the how not to scale the plenty in six easy steps. We are going to talk about how to use Horovodrunner and the avoid the pit hose.

When you set up your Databricks cluster firstmake sure to use Tensorflow 1. Second disable SSL encryption last but foremost run this initiation script on the cluster to fix timeout error for all optimizers except the RMS prop and it not published that anywhere.

Uber summarizes five steps to implement HorovodRunner as a predecessor Horovod.

After the input statement specific to a deeper learning framework and the initialization of Horovod environment, you need to move your entire single node code into your Horovod HVD function, string HVD then pass your Horovod the size to your instance and the HorovodRunner.


Now, congratulations, you’ve achieved the step four


of how to draw a horse and the way are going to add small details.

This slide shows the order and the location of the original five steps in HorovodRunner code. Initialization, pin, wrap, synchronize and checkpoint.

We will explain three additional pin points, parallelize data, retrieve model, and log time.

Next, for ring-all reduced to function. We need to ensure that every worker is using one particular GPU. Instead of using a random GPU, we need to list the all the GPUs in our slaves and then assign an invariant rank to each GPU.


The essence of HorovodRunner comes from data personalization. Conceptually data in the train HBD function equals the data in one GPU and the your entire data set should it be allocated evenly to each GPU. In a indexed solution you pick every next road jumping by the Horovod size into one particular GPU. In this example, all the red rules go to GPU zero, all the green rows go to GPU one and all the purple rolls go to GPU K. In each GPU, the number of roles equals to the floor of the total number of rows divided by the Horovod size. Upon this question, what is the number of steps per epoch, steps per epoch equals the flow of roles in a GPU divided by batch size?

So what if the problem there is absolutely no shuffling for pocket files petastorm can shuffle by default. Then how about images?

We can use imagery data generators to shuffle at each epoch and we need to properly set the steps to avoid repetition of a training images within an epoch.

We can use Imagery Data Generator to share for each epoch and the way you need to properly set the steps to avoid the reputation of training images within an epoch.

Giving the generator batch size, which is the batch size in one GPU, you get the total steps for training which is M in the example. Then you divide the total step size for training by the Horovod size to get the steps per epoch in each GPU which is four in this example. Without the proper step setting, your code will actually run smoothly. The damage I would say isn’t detrimental. However, because a shuffle is down only at the end of each epoch. If you have the most steps per epoch, you will train on the batch again with the same images. This is not as efficient as training on a random in news selected a batch of images.

Next, to avoid too many requests to arrow. You actually need to know that your save model from S3.

Next, when wrapping the single machine optimizer in a distributed fashion, the most important thing to do is to linearly scales the linear rate by the Horovod size.

Here is the logic behind it. First, you want to preserve the same number of epochs in the HorovodRunner to achieve a single machine comparable accuracy by increasing your linear rate. Second, you have less steps per epoch thanks to increase the synchronized the batch size.

Vwallah! You can actually hit two birds with one stone.

Rectified Adam is a new optimizer with linear rate, warm up, fast to convergence and the accurate initial direction finding. We are the first to implement it in Horovodrunner. On top of a parameters specific to rectify the Adam. We need to add three additional callbacks for varying learning rate to work in HorovodRunner. First metric average callback will average metrics across all processes at the end of the epoch. It is useful in conjunction with reduced linear rate on plateau and it needs to be written before the next two Callbacks.

Let me rate warm up. We’ll begin with a much smaller linear rate and then increase it. Reduce some plateau will reduce the linear rate while approaching the plateau.

Earing or radios GPU 0 updates the weights from the average at the gradient.


For all optimizers, you need to broadcast the updated the weight front GPU 0 to the rest of the GPUs. And the checkpoint is the updated the weights from GPU era.


Lastly Horovod timeline takes way too much time and the prevent us from seeing scaling efficiency. How do you get the detail the time usage more than just the wall clock time. We add our own timestamps to this standard the Spark Master Output. With the Python logging library. In fact, you can use ML flow to recall the detail, the log output in from pack a slave nodes.

In the end, we can extract a how much time we spend on each steps from the log.


The log shows the progress of each step of each epoch on each GPU. On the left side, it is the timestamp that we added on the right side. It is the standard output. It contains the information on the HorovodRunner the current step, the total steps per epoch, the current epoch, and the total epochs.

To summarize HorovodRunner is a great choice for distributed a deeper learning out of epochs. It achieves significant lift in scaling efficiency compared to single machine implementation. Even if it is multithreaded, Compared to it’s a predecessor Horovod. It takes away the overhead of engineering effort at a tiny cost of the scaling efficiency. The cold is easy and the simple to ride and the data needs to be divided. Improvement can be achieved with better bandwidth and the easy to instantly store. Do not use timeline and the do use network levels of security. Here are the links to our full code and our paper accepted at a top AI conference workshop triple AI this year. Thank you everyone for attending this session. We tried our best to make it helpful. Please do not forget to rate and review the session because we way want to help you more next time.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Jing Pan

eHealth Inc.

Dr. Jing Pan is a Sr. Staff Data Scientist/User Experience Researcher at eHealth Inc. She oversees all customer facing modeling projects and technical evaluations of third party services and/or merger-acquisitions. She is passionate about the productionization of deep learning models on Spark clusters. She is the first in the world to apply Rectified Adam optimizer on HorovodRunner enabled spark clusters for distributed deep learning training in 2019. At Fanatics Inc., she was perhaps the first one in the world to serve deep learning model trained on Keras in a distributed fashion on Spark slave nodes in 2017.

About Wendao Liu

eHealth Inc.

Wendao Liu received his master's degree from the prestigious Drexel University's LeBow College of Business. He is a Ph.D student in Business Administration and at the same time works at eHealth, Inc. as a full-stack data scientist. With his rare combination of business mindset and strong technical skills, he can not only tackle data issue, but also leverage data to drive business performance. He identifies business opportunities, optimize product performance and provide recommendations. He builds end to end customer unification data product, which leverage the machine learning techniques to provide reliable linkage across disparate systems.