Ray (https://github.com/ray-project/ray) is a framework developed at UC Berkeley and maintained by Anyscale for building distributed AI applications. Over the last year, the broader machine learning ecosystem has been rapidly adopting Ray as the primary framework for distributed execution. In this talk, we will overview how libraries such as Horovod (https://horovod.ai/), XGBoost, and Hugging Face Transformers, have integrated with Ray. We will then showcase how Uber leverages Ray and these ecosystem integrations to simplify critical production workloads at Uber. This is a joint talk between Anyscale and Uber.
Speakers: Travis Addair and Richard Liaw
– Hi, my name is Richard. I’m in software engineer at Anyscale, and this is Travis who’s also will be talking in the second half of this talk. He’s a software engineer at Uber. We’ll be talking about Ray and growing ecosystem of Machine Learning Libraries. So, for quick overview of today’s talk, I know a couple of folks tuning in today who haven’t heard about Ray. So we’ll begin with a quick whirlwind tour. We’ll then be overviewing some of the new exciting integrations around Ray’s broader machine learning ecosystem. And finally Travis will be diving into the integration of Ray into Uber’s open source machine on their ecosystem. So, you’ll be seeing the forces joined between Horovod Ludwig, RayTune and beyond on dusk on Ray. So I want to start with a quick overview of Ray for those who are not familiar. Ray is a project with a mission to simplify distributed computing. So it’s a project that we’ve been working on for the last four years, and it’s associated with three different labs at UC Berkeley. These labs have had a long tradition with open source and products that have come out these lab include, Spark, Caffe, Apache MESOS and now Ray. So, Ray itself consists of two primary components. The first part is a Ray Core which is a simple framework for distributed computing. We’ll be talking about the primitives that Ray Core exposes in a bit of the section of this talk. The second primary component is the Ray Ecosystem. So, there’s many libraries that are packaged with Ray for domain specific utilities including a Hyperparam Tuning reinforcing learning model serving. So, I just want to talk really quickly, where does Ray stand in relationship to the rest of the Compute and Python Ecosystem. So, Ray is very similar to Dask or Celery if you’re familiar with those sort of systems, it’s a framework for distributed computing for Python. Its with respect to the rest of the compute ecosystem Ray runs on any cloud. So it works with AWS, GCP, Azure, even Kubernetes. And it’s also very broadly compatible with other libraries in the Python ecosystem. So that includes NumPy, Pandas, TensorFlow, PyTorch, SpaCY etc. So now that we have some context about Ray, you might be wondering well, why does Ray exist in the first place? So in order to really understand Ray, we have to first take a look at the history of distributed computing. Now distributed computing is not new. Almost every decade a new category of applications were distributed. In the 1980s, we saw the emergence of high-performance computing or HPC for short. This led to powerful simulators which allowed engineers and researchers to do weather forecasting, molecular modeling, and much more. In the 1990s, we saw the rise of the Internet and the Web. This led to building highly distributed systems to support popular websites with millions of users. Next, in the 2000s, we saw the emergence of Big Data. So we had many of these internet companies accumulating large vast troves of user data to improve the user experience while also improving their own business strategies. And finally as we enter 2010s, we’re seeing the emergence of Deep Learning. So companies are starting to leverage these data collected to build powerful feedback systems and models enabling new applications. And these models are getting larger by the day and often tied to distribute training capabilities. So as you can see, over the years, more and more workloads became distributed. And what’s interesting is that actually, once they became distributed, they remained distributed. So now after four decades, we have these four preexisting workloads HPC, Big Data, Microservices and Deep Learning. And what we’re seeing is that a common trend, is that these workloads are starting to become tightly integrated with each other. And this is growth, and this consensus city is largely due to the growth of machine learning and AI workloads. So for one, many of these artificial intelligence workloads are already over that being with High Performance Computing. Distributed training frameworks, like Horovod leverage NPI underneath the hood, a system developed by the HPC community in the last 30 years. We’re also seeing microservices which power many of today’s web services to start to integrate with machine learning components. This allows applications to better be tailored towards specific users ultimately improving that utility of these different microservices. Finally Deep Learning Applications, AI Applications, and Big Data are strongly overlapping because many of these AI models and applications today require huge amounts of data to train and become a performance. Creating these app, distributed AI applications is actually quite hard. Despite the convergence of these four different workloads, the systems used to support these different workloads are as diverse and as distinct as ever. There’s really no single framework built to support these end-to-end distributed applications. So, ultimately the promise of the Ray project is to allow developers to build end-to-end distributed application that are both general and easy to use. So now that we’ve motivated Ray, let’s quickly talk about the core perimeters of Ray which are promised to Simplify Distributed computing. The Ray API mirrors the way that programming is traditionally done with functioning classes. In Ray, functions are converted into tasks. while classes are converted into actors. I’ll elaborate more about this in their slides, but first let’s first talk about Functions to Tasks. So here we have a couple of simple Python functions. The primitive that Ray exposes here, is the ray.remote decorator which takes in a arbitrary Python function and designates it as a remote function. You can then invoke these remote functions with function in .remote, which immediately returns a future. This future is a object reference that represents the results. In the background, Ray will schedule the function invitation on the cluster, or on your machine on a separate thread and execute it. Now, this way, many different functions can run at the same time. And you can also construct different, these computation graphs using this API. Now, if you really want to actually get the results of this computation you can go read a get which blocks until all the tasks have finished executing and will retrieve the results. This API though is simple, already enables the straightforward parallelization of many Python applications. And you can read more about this on the great documentation. So the next thing is Ray Actors. So, these are essentially remote classes. Again, here’s a simple Python class with a method of incrementing a counter, and you can add a remote decorator to convert it into an actor. Then if you instantiate this class with counter.remote or a class named .remote and a worker process that contains this class is created somewhere on the cluster. Each method that creates tasks that are scheduled on that worker process. In this example, we have three or two different tasks Inc that are remote and that’s called twice and they’re executed sequentially on that Actor. The all share the same state as the same Actor obviously which is the semantics of a class. And you can also easily access GPUs with the Ray API by simply specifying the resource request. This allows users to easily paralyze their AI applications. So given that what we see here, this is a really powerful API. And now we’re seeing a large growth of different libraries using the Ray core framework to distribute and paralyze their workloads. So, one area that we’ve seen enormous growth is with the libraries that we’ve developed on top of Ray. So there’s basically two categories of ecosystem libraries. So the first category is libraries that we developed as part of Ray, there’s Native Libraries. And the second half is a Third Party Libraries that integrate with Ray. Two of the first libraries that we began building as part of Ray were for reinforcement learning and tune for high premiere search. Today these are some of the most popular libraries for reinforcing Ray in hyper-parameters session in the world today. And more recently, we’ve been began working on libraries for a model serving the plain lawless and production and distributed training. Now of course, the most exciting part of this growth is this third party libraries that are built on top of Ray. So SpaCy and hugging face which are a Hugging face transformers which are two of the most popular libraries for NLP, integrate with Ray to train on multiple GPU’s and Tune to start to deploy models. You can also scale Horovod, PyTorch, XGBoost on your Ray cluster and use them with Ray tune and Ray serve. So Travis will talk a little bit more about how Uber’s open source ecosystem is leveraging this in the second half of this talk. We also see auto ML frameworks such as Ludwig from Uber, explore integrations with Ray. And Travis Will explain about the roadmap for this integration in a couple of minutes. In addition to these training libraries, we’re seeing major cloud machine learning platforms such as Sage maker and Azure ML integrate with Ray tune and all that for providing training and reinforcing learning services. And then this just goes on. So we see Hyperparameter Tuning some the most popular Hyperparameter Tuning libraries integrate with RayTune for a high parameter search. Dask which is the popular distributed system in Python with a great data frame library, can now be run on top of Ray. We have weights and biases in Seldon which integrate with RayTune and also leverage Ray for massively parallel model explainability. And some of these are just the smallest section of the main libraries are integrating with Ray today. So what we’re actually seeing is that Ray’s becoming the go-to framework for not only scaling a simple Python code but also to scaling Python libraries. The benefit here is not just that you can use one of these libraries, but rather you can use them altogether and the skill entire application chain with all these different libraries on a single cluster. So if you’re a library developer and you’re interested in scaling your library with Ray, do reach out to us on the Ray.core on the you’d love to help out. So that’s it about the Ray ecosystem for where it is today and Travis we’ll now talk about some new exciting developments for Ray with Uber’s open source machine learning stack.
– Thanks, Richard. Yeah, so as Richard said, I’m Travis I’m a software engineer at Uber. I am the maintainer for the VOD project that does a distributed deep learning. And I also am a core maintainer for the Liberty project that does auto ML. And today I’m gonna tell you a little bit about how we see Ray fitting into both the internal work that we’re trying to do at Uber in terms of providing NL infrastructure to our internal data scientists, as well as how we’re leveraging Ray and the open source products that we use within Uber and also contribute to externally in order to provide a more seamless experience for our open source community as well. So let’s start by talking a little bit about Horovod. So Horovod is a project that was originally developed at Uber. It’s a framework to do large scale distribute training of deep neural networks. Its goal is ultimately to make distributed training fast and easy for any framework in any platform. So when we originally opened sourced Horovod back in 2018, or at least it became a member of the Ray foundation in 2018. And since then, one of our primary goals with Horovod has been to figure out ways that we can make it more accessible to people who maybe don’t have the kinds of sophisticated high-performance computing clusters that research labs or big companies do. And that’s one of the reasons that we we’re really interested in exploring Ray because it enables this kind of capability to scale up your compute resources very naturally without having to get involved with a lot of data infrastructure kind of expertise. So Horovod and Ray was a project that started actually just a couple of months ago I think, and Richard actually was able to implement it in I believe about 400 lines of code. So, it just kind of speaks to the, the very seamless kind of APIs, a very like reusable APIs that Ray provides to kind of make such an integration possible. And as I’ll explain in some of the further slides, being able to run Horovod on top of Ray and isolation is cool unto itself because a lot of people struggle sometimes with launching a Horovod cluster being able to train your job in the cloud, something like that. But what really makes us particularly powerful is how it interacts with other stages within the machine learning workflow process, And to kind of talk a little bit more about that. I wanna mention as well and kind of deep dive as a case study the Ludwig project. So, Ludwig is another open source project that came out of Uber AI, and its goal is to make deep learning kind of an abstraction on top of just the problem space of, I have some inputs I have some outputs, I wanna train a model. So it’s effectively an auto ML toolkit for you. So given any inputs and outputs literally we’ll build the right model for any task. And to do this we have a very novel architecture that we call Encoder, Combiner and Decoder, where we take your input features where they happen to be. We encode them into a lower level of representation. We combine them and then we decode them into the outputs that you’re trying to predict. So Ludwig is a good case study for lots of NL problems at Uber because when Ludwig started, and this is also true up to this point, it focused fundamentally on the research problem. How do I solve this research problem of being able to predict any attribute I wanna predict in any input data I happen to have. And typically in these sorts of scenarios, the researcher will do what makes, what’s easiest to do for them and their local machine, their local development environment, which is usually something like pandas. It’s a very elegant API for doing data processing. And so we read the data and whatever formats it’s in, CSE, parquet etc. We read a very simple Yammel file config that defines the steps that you wanna, what your features are in your data set, how you wanna interpret them as text or categorical data etc. And then we do the preprocessing on the single worker using Pandas, and output the results as a set of training data sets as an NamPY rate. So we have a training set as NamPY validation set etc. In the next stage, we use TensorFlow to actually do the training on these datasets, we output a train model. And then we similarly run evaluation on the held out test set. And then finally output the results to some kind of storage layer, usually a local file system, something like that. And so, this isn’t the point where the scalability challenges start to creep in as you say, okay, well, this works very nicely on a single machine. It’s very easy to iterate on, very easy to develop, but how am I going to scale this up to process, very large data sets. How am I gonna run this in production and kind of a retraining scenario, something like that. And this is the kind of situation that we encounter very frequently on the machine learning platform on Ubers, data scientists having this exact scenario wanting to scale up their Pandas and local TensorFlow code to very large scale. So with Ludwig, we have a single worker for pre-processing. We do support Horovod currently for doing training, but simple word for pre-processing. And one of the biggest constraints is that because we’re using Pandas, the whole data set has to fit in memory. So okay, we definitely have to address that. In terms of other challenges, hyperparameter optimization is a big one. So in our setup and situation, we’re not just optimizing over learning rate or something like that or even the architecture of the model or even optimizing over the pre-processing steps or what you might call feature engineering and trying to find the best total combination that produces the best model. So, what this means is that if we wanna scale this up, we also have to think about not only how do we scale the training, but how do we scale the pre-processing and how do we scale that together within the hyperparameter optimization? So conventionally, how you would typically approach this problem. And this is how we approach it at Uber today, is that you would break apart your single worker training script into a series of components that run on specific infrastructure. And this infrastructure is typically a heterogeneous. A scientist infrastructure for one stage, and a different piece of infrastructure for another stage. For example, if you’re doing a lot of data processing you almost always are gonna be using Apache Spark. It’s a very popular framework for doing large scale horizontal scaling of your data processing workloads. And then once you want to do the training you would want to use something like Horovod to distribute the TensorFlow process across multiple chips you use and sync the parameters. And then when you do evaluation because we don’t have to sync any state between workers, you might wanna switch back to something like Apache Spark again, maybe this time using GPUs to accelerate the inference. So how are you actually going to stitch all these things together? Well, since these are very heavyweight steps very heavyweight piece of infrastructure, you typically use some kind of Workflow engine like Airflow, Apache Airflow, to define each of the steps that you wanna perform, and make sure that they spin up the right infrastructure. You have the right sources and sync for your data and that you transition with full tolerance to the next stage because these operations are so heavy weight, you need to be prepared for any mishaps that might occur along the way. So after the training step occurs you write your model to some large intermediate storage like HDFS, something like that. And then because all of this is happening in a fully remote setting, you then might need to have a final deployment stuff that actually takes the model from the distributed file system and puts them to your models store. So why is this not ideal? Well, I think one of the first challenges that we came across when we were looking at doing this for Ludwig was that we looked at the pre-processing code that we had written in Pandas and we realized that we were going to have to basically do a complete rewrite, to make it work with spark transformers. And so, what this effectively means, is that we’re having to maintain two distinct code paths, one for local or small scale training and one for large-scale training. When you talk about scalability, we really don’t wanna think of scalability as just tens or 100s of machines to thousands or millions of machines. I mean, that’s certainly great, but really the difficulty is then going from one machine to the tens, to the hundreds to the thousands of machines. And so, that’s where something like Ray can make a really big difference. So similarly, these heavyweights steps, this very heavyweight infrastructure that’s requiring air flow, that’s kind of an inhibitor to getting this laptop to massive scale from making sense. Because no one’s going to be wanting to run these heavyweight steps with airflow on their local machine as part of their normal development process. And what about hyperparameter optimization? So, airflow is really great for defining a static series of steps that you wanna perform. But hyperparameter optimization is a dynamic process difficult to model with static workflow definitions. So you really have to think creatively if you’re going to solve this using this traditional in all workflow design. And that’s why we started looking at the Ray ecosystem. So one of the great things about Ray, is that not only does it simplify the infrastructure component for you, but also comes with this huge ecosystem or bag of frameworks that you can pick and choose from and find the one that fits your scenario the best. And this is a really powerful thing I wanna mention because, very often because infrastructure is usually such a heavyweight thing, you’re typically locked in. So it’s like if your company is investing in Spark, like you gotta use Spark for everything. And Spark is great, but sometimes you might say well, maybe for this problem, I wanna use Desk. So with Ludwig everything’s in Pandas. It’d be great to use something like Dask or Modin to swap out the Panda’s data frame for a distributed data frame, very seamlessly. And that’s something that Ray makes very simple to try out. Similarly, Dask is also appealing to subsume Python data processing engine. So it’s very low overhead when you’re debugging you don’t have like Java calls in your call stack and it has GPU acceleration as a core feature through the Rapids API and the QDF. Horovod as I mentioned before, at Uber we heavily rely on Horovod to do the heavy lifting and distributed training. It’s very fast. It works with TensorFlow pyTorch and MXNet which is important because in Ludwig we want to maintain framework agnostic code so that’s we currently use TensorFlow but in the future We also want to be able to support PYTorch. So Horovod makes that very simple. With the elastic API introduced in version 0.2, we also have fault tolerance and auto scaling out of the box, which can also be run on Ray without any special considerations. And it’s very flexible too. So there’s no restrictions that Horovod puts on the structure of your training code. Most other frameworks require that you have to kind of restructure your code to make them work with their method of doing distributed training. But with Horovod, you just add a couple of extra lines in your codes, wrap your optimizer and then you’re good to go. And finally, Ray of course, it brings everything together into a single infer layer and also provide several additional frameworks that we can’t find any similar alternatives to, and that are really great. Like the RayTune framework for hyper parameter optimization, Ray serve for doing serving. So those kinds of capabilities are not only just provided by Ray for free, but they’re also the best in class at what they do. And being able to leverage that it’s really powerful as well. So what’s Ludwig on Ray looked like then? So, the nice thing about this setup is that from the user’s perspective, from the code that we actually write, everything is exactly the same as in the single note scenario, for the most part except that we have an additional abstraction layer where we replace the preprocessing side with Dask, which effectively distributes our Panda’s code. And we replace the TensorFlow training side with Horovod on Ray, which distributes the training code. And then finally we can do evaluations similarly with Dask running separate copies of the model and each worker to do the large-scale evaluation. And then finally, we don’t have to have any separate deployment step because we can return the model, the training model directly to the client running on the gray head node and write it directly to our model stories without any additional overhead. And similarly for RayTune, we can take our entire processing pipeline and scale it up into multiple parallel trials. And because it’s all running on a common infrastructure layer, we can even run the Dask distributed pre-processing, the Horovod distributed training within a single trial using the resources available to us from the cluster. And let’s talk a little bit about serving. So, as I mentioned before we can deploy our model to a model store or write it to a file system, something like that. And then for our open source users, this is then very simple for them to just take the model that’s been granted to Dask and use the Ludwig API to launch re-servers on top of the Ray cluster that can serve the model for prediction. So what’s next? So, this is a, this is work in progress that you’ll see in the Ludwig, the 0.4 release. I’m really excited that this work is progressing so quickly and, really thankful to Anyscale folks for all the work they’ve contributed to us on the Uber side as well. Going forward, there are some other things that are very interesting that Ray enables, I wanna briefly touch on. One of them is the fact that, when we moved from preprocessing to training, there’s this additional barrier where we have to materialize the dataset to Dask, so that we can hand it off to the training side. But people often ask, is it possible that we can eliminate this somehow? And with Ray, we actually have for the first time, a very simple way that we can. So, theoretically, because these Ray workers for the preprocessing with task remoting for example, and the Horovod training are all running on the same infrastructure we can theoretically do things like co-located them together, we can have the Horovod workers directly access the shared memory of the dask promoting workers or the partitions that they’re operating on and be able to do the training that way. And so, this is something that we’re very interested in exploring, and again, it’s all possible things to the Ray the way that grade is ultimately designed. So that’s it for me, please check us out on GItHub if you’re interested in contributing or trying out Horovod or Ludwig, the links are there, the URLs are hopefully very simple Horovod.ai and Ludwig.ai, and with that I’ll hand over to Richard to close this out. Thank you.
– So that’s it for this presentation on Ray and it’s growing ecosystem. So, this project wouldn’t be where it is today without extremely passionate community. And that’s something that we are really grateful for. If you’re looking to try out Ray or Horovod or Ludwig, we encourage you to get involved and reach out to us either on Slack. Go ahead and try downloading Ray or Horovod and Ludwig, and just let us know if you have any thoughts or questions. Thanks again, for coming to this talk.
Travis Addair is a software engineer at Uber and technical lead for the Deep Learning Training team as part of the Michelangelo AI platform. In the open source community, he serves as lead maintainer for the Horovod distributed deep learning framework and is a co-maintainer of the Ludwig auto ML framework. In the past, he's worked on scaling machine learning systems at Google and Lawrence Livermore National Lab.
Richard Liaw is currently a team lead at Anyscale, where he leads a team working on training libraries and integrations on top of Ray. He was previously a PhD candidate in the UC Berkeley RISELab working with Ion Stoica and Joseph Gonzalez, where he focused on problems in distributed deep learning, reinforcement learning, and ML cluster scheduling.