Running Emerging AI Applications on Big Data Platforms with Ray On Apache Spark

Download Slides

With the rapid evolution of AI in recent years, we need to embrace advanced and emerging AI technologies to gain insights and make decisions based on massive amounts of data. Ray (https://github.com/ray-project/ray) is a fast and simple framework open-sourced by UC Berkeley RISELab particularly designed for easily building advanced AI applications in a distributed fashion. Nevertheless, it is not straightforward for Ray to directly deal with big data, especially the data from real-life production environment. Instead of running big data applications and AI applications on two separate systems, we hereby introduce our work for RayOnSpark, which could gracefully allow users to run Ray programs on big data platforms. In this session, we will discuss our implementation of RayOnSpark in detail. You will have an intuitive understanding on how to run various emerging AI applications (including distributed training of deep neural networks, scalable AutoML for time series prediction, distributed reinforcement learning, etc.) on Apache Hadoop/YARN clusters by utilizing Ray and RayOnSpark. In addition, RayOnSpark allows Ray programs to be seamlessly integrated with Apache Spark data processing pipelines and directly run on in-memory Spark RDDs or DataFrames to eliminate expensive data transfer overhead among different systems.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Welcome to our session. In this session, we will present RayOnSpark, a new feature we provided in Analytics Zoo, which allows the user to write a Ray code directly in line with your Spark code, so that you can run those new emerging AI applications on your existing big data cluster in a distributed fashion.

So this is the agenda for the talk. We will first give a quick overview of the backgrounds, Analytics Zoo and then, Ray, and then, we’ll dive into the details on the RayOnSpark, and as well as some of the real-world use cases.

So at Intel, we have been working on a lot of those initiatives to bring AI to big data. So one example is BigDL, which is a distributed deep learning framework, we open-sourced in 2016.

Alon Big Data

It allows the user to write those new deep learning applications as standard Spark program. On top of those lower-level deep learning frameworks and big data systems, we have also open-source Analytics Zoo, which is a unified data analytics and AI platform, which allows the user to apply those AI technologies such as TensorFlow, PyTorch, Keras, and so on to big data platforms, for instance, Spark, Flink, Ray, and so on.

Analytics Zoo (intel Unified Data Analytics and Al Platform for distributed TensorFlow, Keras and PyTorch

This slide gives you a very high-level overview of the Analytics Zoo technology stack. So as I mentioned before, Analytics Zoo is built on top of those deep learning frameworks, TensorFlow, PyTorch, OpenVINO, for instance, as well as those distributed analytics systems like Apache Spark, Apache Flink, Ray, and so on. You can run Analytics Zoo on your single laptop, and then, transparently sketch on your cluster such as Kubernetes cluster or a big data cluster. Inside the Analytics Zoo, there is three layers. At the bottom layer, there is what we called integrating data analytics and the AI pipeline, which is a horizontal layer that allows user to apply AI models, AI evidence to choose their big data cluster in a distributed fashion. For instance, a lot of our users are running distributed TensorFlow on Spark to process very large data set.

On top of this pipeline layer, there is a automated and machine learning workflow layer, which you try to automate a lot of the machine learning tasks when the user try to actually build the engine pipeline. For instance, AutoML for time series analysis, automatically distributed the amount of serving and so on.

The top layer will also provide a set of built-in models and evidence for common use cases. For instance, a recommendation, time series analysis, and then so on.

User can directly use those built-in models in the underlying pipeline and workflow. In addition, we also allow user to use any standard TensorFlow, PyTorch models, actually is published by other community, you can just use the standard model in Analytics Zoo.

Unified Data Analytics and A Platform

So before going to the details, I want to provide a high-level overview of what our objective is. So the goal we have when building Analytics Zoo is trying to provide a unified data analytics and the AI platform, so that the user can easily scale their AI applications from a single laptop to a distributed big data cluster. So if you think about the life cycle of a data science project, you will start with prototyping with some sample data on your laptop. For instance, maybe you’re writing a Python notebook on your laptop. If you’re happy with the notebook, happy with the prototype, then, you may want to experiment with your history data, your last year’s data, for instance, which could be pretty large, and started on a big data cluster.

And then, if we’re happy with the experiments, you may want to deploy your pipeline on to your production environment for say A/B testing. Today, going from laptop to cluster, to production environment is a very complex, error-prone process. You will need to rewrite your code, transfer your data, convert your model, and so on. So what we try to accomplish with Analytics Zoo is allow user to transparently scale from the laptop to their distributed cluster. You can directly build it, the entire end-to-end pipeline on your laptop, processing your production data in your bigger data cluster, and then, with almost no code changes, you can run your single-node notebook on your cluster in a distributed fashion. So that’s the goal we want to achieve with Analytics Zoo, and that’s exactly the reason why we try to provide RayOnSpark in Analytics Zoo. So next, I will let Kai to explain what Ray is and how we implemented RayOnSpark, and how to use by our users. – Okay, so thanks Jason for giving an impressive opening and a high-level overview of our work these years in enabling the latest AI technologies on big data, especially the Analytics Zoo project that we have been currently working on. Next, I will continue this session, and focus on introducing the RayOnSpark functionality of Analytics Zoo.

Ray

I will elaborate how to use RayOnSpark to run emerging applications on big data clusters. Okay, so I will get started. So at the very beginning, in case some of you may not be that familiar with Ray, I would first of all, give a quick introduction to Ray. So Ray is a fast and simple framework, open source by UC Berkeley, which is particularly designed for building and running distributed applications. Ray Core provides simple primitives, and a friendly interface to help users easily achieve parallelism.

intel Al

So for Python users, they only need to add several lines of Ray code to run Python functions or class instances in parallel. So this page shows some code segments of using Ray. First of all, to start Ray services, users just need to import Ray and call ray.init. So let’s take a closer look at that part of the code. It shows how to use Ray to run Python functions in parallel. So given an ordinary Python function app here, which computes the square of a given number, so normally if you call this function in a for loop for five times, then, these five function calls are executed sequentially, one after another, right. So however, if you add the ray.remote.creator to this function, then, this function magically becomes a Ray remote function that can be executed remotely by Ray. So again, if you call the remote function in your for loop for five times, then, these five remote function calls are executed in parallel.

So the only difference in coding you need to pay attention to is that instead of just calling the function name as you would normally do, you need to add .remote to the function name, when you call remote functions. So here, you need to call f.remote instead of just f. So finally, you can call ray.get to invoke the execution of remote functions and retrieve the corresponding return values. In addition, as you may notice here when we add the ray.remote decorator, you can specify the number of resources needed for this function. So for example, how many number of CPU cores are needed to run this function. And if you specify this, Ray internally would allocate such amount of resources for you. Though similarly for Python classes that are stateful, you can also add a ray.remote decorator to the Python class to make it a Ray actor, and the Ray would initiate the instances of actors remotely. So the example on the right creates five remote counter objects with the count value as its state. So now, these counters are all Ray actors, and we can increment the value of the counters, five counters at the same time. Though still you need to add .remote when you create the counter object as a reactor, and you need to add .remote as well when you call the masters of an actor. Okay, so these are two simple examples of using Ray to achieve simple parallelism in a change of several lines of code, and that kind of Ray code, our Ray applications can either run locally or scale to a large cluster.

So actually Ray is more powerful and useful than simply writing such kind of trivial code. So Ray is packaged with several high-level libraries to accelerate machine learning workloads. So first of all, Ray Tune is a Python library built on top of Ray for experiment execution and hyperparameter tuning at any scale. So secondly RLlib provides a unified API for a variety of deep reinforcement learning applications and RaySGD implements thin wrappers for TensorFlow and PyTorch for the ease of data parallel distributed training. So these three libraries would be useful for you to build emerging applications easily. So this is just a quick overview of Ray, and if you want to know about Ray, you can visit their website for more details.

Motivations for RayOnSpark

So Ray is quite a good framework for building and running emerging AI applications such as hyperparameter tuning and reinforcement learning. And actually, in the industry, there is now more and more demand to embrace emerging AI technologies and apply them on the production data to bring benefits. However, we observe that developers are facing several challenges when they try to do this. First of all, in the production environment, the production data is usually stored and processed on big data clusters. However, quite a lot of efforts and steps are required to directly deploy Ray applications on the existing Hadoop or Spark clusters. Secondly, it could be a concern for PySpark or Ray users to prepare the Python environment on each node, without bringing side effects to the existing cluster.

Last but not least, conventional approaches would set up two separate clusters, one for big data applications and the other for AI applications, and this inevitably introduces the extra expensive data transfer overhead, and additional efforts to maintain separate systems, and workflows in production. So it would be great and cost-saving if we can build a unified system for big data analytics and advanced AI applications. And that’s why we take the opportunities to do our work for RayOnSpark. RayOnSpark can easily enable users to inject the advanced AI applications of Ray into the existing big data processing pipelines.

intel

Okay next, I will talk about the design implementation details of RayOnSpark. We develop RayOnSpark to allow distributed Ray applications to seamlessly integrate into Spark data processing pipelines. So as the name indicates, RayOnSpark runs Ray on top of PySpark on big data clusters. Here we, in the following discussion, I would take the YARN cluster as an example, but the same logic can be applied to other clusters, such as the Kubernetes cluster or the Mesos cluster as well. First of all, for the environment preparation, we leverage conda-pack and YARN distributed cache to automatically package and distribute the pass-in dependencies across all the nodes in the cluster at runtime.

Implementation of RayOnSpark

And in this way, users do not need to pre-install the necessary dependencies on those beforehand and the cluster environment remains clean after the task’s finished. So the figure on the right here gives an overview of the architecture of RayOnSpark. So in Spark’s implementation, we are quite familiar that we create a SparkContext object on the driver node, and the SparkContext launches multiple Spark executors across the YARN clusters to perform Spark tasks. So in our RayOnSpark’s implementation, we additionally create a RayContext object on the Spark driver, and it utilizes the existing Spark context to automatically launch the Ray processes across the YARN cluster. The RAY processes exist alongside Spark executors. And one of the Ray processes is the Ray master process, and the remaining are RAY slave processes, and they are also called Raylets.

In addition, the RayContext is also responsible for creating a RayManager inside each Spark executor to manage the Ray processes. That is to say the RayManager would automatically shut down the Ray processes and release the corresponding resources after the Ray application’s finished.

So in the setting of RayOnSpark, we have Ray processes and Spark processes exist in the same cluster and therefore, it makes it possible for a Spark in-memory IDDs or data frames to be directly streamed into Ray applications for advanced AI purposes. So this is basically the architecture of RayOnSpark.

Interface of RayOnSpark

Okay, with regard to the usage of RayOnSpark, users only need to add several lines of code to directly run Ray applications on the YARN clusters. So three steps to do this. First of all, you need to import the corresponding packages in our Analytics Zoo project, and create a SparkContext object using the API init_spark_on_yarn we provide. Of course, you can use an existing SparkContext if you wish. In this Spark on YARN setup, Spark on the underlying YARN cluster, it helps to package and distribute the specified conda environment with all the paths and dependencies across all the Spark executors. So when calling this function, you can also specify the Spark configuration such as the number of executors and the executor cores, et cetera. So after we create a SparkContext, step two is to create the RayContext object, and RayContext is the contact point between Ray and Spark. So you can also input some very specific configurations, such as the object memory store when you create the RayContext object. And you call RayContext.init to start all the Ray processes across the YARN cluster. So now, after doing these two steps, we have both Spark and Ray ready in the YARN cluster, and now, we can directly write some Ray code, and make them run on the YARN cluster. So the red box on the right is the RayOnSpark code you need to add, and the black box is the pure Ray code that you have already seen in the previous slide to create several Ray actors and do the increment.

And after the Ray application’s finished, you can call RayContext.stop to shut down the Ray cluster. So this is basically the code you need to add to use RayOnSpark, which should be straightforward and easy to learn. And if you want to have more instructions on running RayOnSpark, you can visit our documentation page for more details.

In the last part of this session, I’m going to share some advanced real-world use cases that we have been built on top of RayOnSpark, which I suppose many of you might be more interested in. First of all, we have built AutoML in Analytics Zoo for scalable time series prediction. The AutoML automates the process of feature generation, model selection, and hyperparameter tuning for a time series application, and we have already some initial customer operations for AutoML. So actually in this conference, my colleagues have another session to particularly discuss AutoML and its use cases. So here, I won’t go into further details now, and but if you are interested in our work for AutoML, you can visit our GitHub page to find more details and related use cases.

Apart from AutoML, we have built data parallel in our deep learning model training pipeline on top of RayOnSpark. So in our pipeline, first of all, we support users to use either PySpark or Ray for parallel data loading and processing. Then, we implementing wrappers for different deep learning frameworks to automatically set up a distributed environment on big data clusters using RayOnSpark. So the RaySGD I mentioned before has already done some of this work that we can extend and refer to. But in addition to using the native distributed modules provided by TensorFlow or PyTorch based on the parameter server architecture, we also support users to choose the Horovod framework from Uber, as the other backend for distributed training. So with such data parallel distributed training pipeline, users do not need to worry about the complicated setup for the distributed training on big data clusters. What they need to do is just to first of all, write a training script on a single node, and we do the work for you to make the distributed training happen, and you only need to add several lines or to modify several lines of code to your original code to achieve this.

Drive-thru Recommendation System at Burger King

Lastly, I would share our, the successful cooperation between Intel and Burger King to build a recommendation system for Burger King’s drive-thru scenario using RayOnSpark. So drives-thru, first of all, drives-thru is a common scenario in the fast food industry, where the guests purchase food without leaving their cars. So the guests first browse the menu on the outside digital menu board, and they talk to the cashier inside the restaurants through a microphone system to place their orders, and the guests would be given recommendations displayed on the outdoor digital menu board when they place their order. So, but as a world-famous fast food company, Burger King collects a large number of transaction records every day, and they use Spark to perform ETL data cleaning and pre-processing steps on their big data, and they have their own big data clusters. And after the data and processed, and they conduct distributed training on these data. So they choose MXNet as their deep learning framework, and before cooperating with us, they would allocate a separate GPU cluster dedicated for distributed MXNet training but they find that such a solution is not quite efficient, since in the entire pipeline, a large portion of the total time is spent on copying data from the big data clusters to the GPU cluster. Also they need quite a lot of additional efforts to maintain the GPU clusters regularly, and it is often the case that for most of them, for many companies, GPU resources are not that, are relatively limited and compared with the CPU server resources. So after adopting the RayOnSpark solution, and the entire solution becomes more efficient and easier to maintain since we run the distributed MXNet training on exactly the same cluster, where the big data is stored and processed. Similar to RaySGD, we implement a lightweight wrapper layer around native MXNet modules to handle the complicated distributed environments setup of MXNet on the YARN cluster. And each MXNet worker takes a portion of the data set from Spark on its local node and trains the recommendation model. And MXNet workers and servers, both run as Ray processes, and they communicate with each other through the distributed key values or natively provided by MXNet. So in this way, the entire pipeline runs on a single cluster, and there is no extra data transfer needed. And such a solution has been successfully deployed into Burger King’s production environment to serve their drive-thru customers. And this solution has been proven to be efficient, scalable, and easy to maintain.

Conclusion

So here comes to the end of this session, and as a conclusion, in this session, we mainly talk about our work for RayOnSpark, and we develop RayOnSpark to enable users to directly run emerging applications on big data platforms. And I introduced the implementation details of RayOnSpark, and our RayOnSpark solution has been adopted by Burger King in their production environment, and we are also cooperating with other customers to seek for more use cases of RayOnSpark. So if you want to have a review of the details of RayOnSpark, don’t hesitate to look at our blog of RayOnSpark with the link given here. RayOnSpark is a key feature of Analytics Zoo, and we have developed Analytics Zoo as a unified platform for data analytics and AI. So if you are interested in Analytics Zoo, you can go to our GitHub page or documentation page for more details, and I’m sure that you may find other functionalities be useful to you as well. So if you have a GitHub account, please kindly give us a star, and so that you can find us on GitHub, whenever you need. So for the future work, we are now working on the full support and more out-of-box solutions for easily scaling out Python AI pipelines from single node to cluster, based on RayOnSpark, and we will be glad to share our progress and more use cases in the future if we have chances.

So the last page here is the overview of the Intel-optimized end-to-end data analytics and AI pipeline.

Accelerate Your Data Analytics & A Journey with (intel)

So Intel is devoted to help our customers build optimized solutions on Intel platforms, from the bottom hardware architectures to the software optimizations. So if you want to know more about how Intel can help you to build your pipeline, you can go to our website, intel.com/ai, or software.intel.com for more details. So that’s pretty much for this session. And thank you all for choosing this session, and hopes that what Jason and I have talked about will be helpful to you. So thank you so much, and if you have any questions, feel free to raise.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Kai Huang

Intel Corporation

Kai Huang is a software engineer at Intel. His work mainly focuses on developing and supporting deep learning frameworks on Apache Spark. He has successfully helped many enterprise customers work out optimized end-to-end data analytics and AI solutions on big data platforms. He is a main contributor to open source big data + AI projects Analytics Zoo (https://github.com/intel-analytics/analytics-zoo) and BigDL(https://github.com/intel-analytics/BigDL).

About Jason Dai

Intel Corporation

Jason Dai is a senior principal engineer and CTO of Big Data Technologies at Intel, responsible for leading the global engineering teams (in both Silicon Valley and Shanghai) on the development of advanced data analytics and machine learning. He is the creator of BigDL and Analytics Zoo, a founding committer and PMC member of Apache Spark, and a mentor of Apache MXNet. For more details, please see https://jason-dai.github.io/.