While struggling to choose among different computing and machine learning frameworks such as Spark, Dask, Scikit-learn, Tensorflow, etc. for your ETL and machine learning projects, have you thought about unifying them into one ecosystem to use? In this talk, we will present such a framework we developed – Fugue. It’s an abstraction layer on top of different frameworks, also providing a SQL-like language that can represent your pipelines from end to end, which is highly extendable by Python. With the Fugue framework, it’s a lot easier and faster to create reliable, performant and portable pipelines than using native Spark, especially for non-expert users.
In this talk we will demonstrate how we implemented the Node2Vec algorithm on top of Fugue, so it can run on different computing frameworks and can process graphs with 100 million vertices and 3 billion edges in a few hours using Spark as the backend.
We have also built a unified interactive environment based on Kubernetes, Spark and Fugue, and will demonstrate great performance improvement on the projects migrated into this system. We will also talk about the future plan of the Fugue Project including Fugue ML and Fugue Streaming. Our goal is to create a unified ecosystem for distributed computing and machine learning.
– Hello everyone, my name is Han Wang. Today, we will introduce a new programming model for distributed computing, Fugue, to unify spark and non-spark ecosystems for big data analytics in Python. We are going to explain our motivation, and then we will talk about Fugue Core, Fugue SQL, ML and streaming. After that we will go through Fugue use cases. In the end, it’s our opensource plan. Working on different big data problems, I find that I always have to re-evaluate the frameworks I’m using. And in order to be scalable and fast, I always have to figure out special treatments.
I think the problem is we have too many options. Each of them is great at certain things but many of them are overlapping and many of them try to exclude the other options. So, what if we can have an obstruction layer to unify different ecosystems?
What if we can focus on describing what to do instead of how to do? Plus, this layer should not prevent direct usage of any underlying systems. This is the motivation of Fugue. It is a pure abstraction layer. It is to unify and simplify core concepts of distributed computing and to decouple your logic from any specific solution. It is easy to learn and easy to on port and off port. It’s not invasive, not obstructive, and not exclusive. It is not something to hijack your project.
For example, Node2Vec is a great idea for graph embedded. The orders of the paper opensource this algorithm and make it Python. But when we try to apply it on large scale graphs it couldn’t work, so we want to redesign it.
The original version is built on NetworkX Library on top of native Python. We decide to use operations join and transform to replace NetworkX.
So we no longer have single machine constraint. By the way, in Fugue transform is similar to map additions and group by key operations in Spark. We use Fugue workflow to coordinate and join and transform operations. So it’s purely abstract and can run on native Python, Spark, and Dask, whatever Fugue supports. Later in this presentation Jintao is going to show its great performance. But first, let me show you the Fugue code.
Over 90% of the source code is pure native Python, so I didn’t put them here. Here the first 15 lines of Fugue code is everything we need to express the high level logic.
And line 18 to 20 shows how to write on native Python, Spark, and Dask. You just need to change one parameter.
Now, let’s take a look at the core of Fugue.
On top of different computing platforms, we firstly viewed an abstraction layer called ExecutionEngine. It’s an abstraction of basic operations such as map, partition, join, and the sequence relax statement. On top of ExecutionEngine we use directed acyclic graph to describe the workflow, this is similar to airflow. And then we add the programming interface and built-in extensions such as save, load, and print. On the top level we provide a new way to express in workflow, Fugue SQL. We also have motion learning and streaming features to be released later this year. So, what is special about the Fugue programming model?
First of all, it’s cross platform. Fugue workflows can run on different Fugue execution engines. Fugue is not only adaptive to the underlying systems, but more importantly it’s adaptive to users.
In order to use frameworks such as Spark and Vee, commonly you have to use their interfaces, decorators, or data frames. You have to adapt to the frameworks. But in Fugue the framework adapts to you. With this programming model you can keep your majority of your logic in native Python with no dependency on Fugue. Also, if you migrate something onto Fugue you can also keep most of your code untouched.
Testability is another pinpoint of distributed system, but Fugue makes it easier. It helps you write modularized code to keep most of your logic native, which is easier to test for the part involving Fugue, because you can run with native Python ExecutionEngine with small mark data. It’s also easier and faster to test. For example, we achieve the 100% unit test coverage on Node2Vec using native Python ExecutionEngine. We also run these tests on local Spark, it’s over ten times slower.
Let’s look at a workflow example. From line one to 12, it’s native Python. Fugue starts at line 14.
Plus_n and plus_n_pd are doing the same thing but with different signatures. We use both of them as transformers at lines 16 and 17, and Fugue adapts to your native functions and provide the data types according to the type annotations.
Here plus_n_pd is used as a map additions function, and the plus_n is used as a groupby apply function. In Fugue, transformer unifies the two concepts and can use native icon code directly.
Now let’s have a closer look at Fugue extensions and the DAG. The Fugue DAG consists of five different types of nodes what we call extensions, and the edges of the graph are strictly scheme art data frames. On the driver side or orchestration side, we can customize creator, professor, and outputter. Each arrow means an input or output data frame.
On the work side or computation side, we can customize transformer and the cotransformer. Actually, you can write native Spark code inside any driver side extension. This architecture does not stop you from using any native Spark features. There are benefits to use DAG. Here is an example, given the data frame we run run mapper A and let result be X and the next two steps run different mappers B and C on X and save separately.
If you write this in native Spark the execution order will be A, B, A, C which is counterintuitive. If you add persist on the first step, then it becomes A, B, C.
But using Fugue with auto persist enabled, B and C will run in parallel after A is auto persisted. So in Fugue, intuitive expressions can generate optimal executions. However, in order to achieve the same thing in Spark you need extra effort.
Besides auto parallelization and auto persist, there are other benefits to use DAG. More errors can be captured when you construct the DAG before you submit a Spark job, so you can fail faster. You are also able to resume an execution with checkpoints. Please notice the difference between persist and the checkpoint. Persist is to cache for the following tasks. It will be cleared after one execution. Checkpoint is to cache for multiple executions. It will be kept after one execution. As I mentioned, we created a new approach to describe your end to end work load, the Fugue SQL.
Let’s look at an example. We create a data frame by create statement and assign to a. Then we transform it using plus_n and partition by k. Then we use a standard SQL join statement.
Then, we have a simplified anonymous select statement, and then we print and save this data frame.
And then we are doing an expensive transformation on b. Notice the double question mark after df means checkpoint. In the end, we use another outputter extension to assert the two data frames are equal.
Here is a comparison between Fugue SQL and Spark SQL. The main difference is that Fugue SQL is our workflow level, and Spark SQL is out task level. Fugue SQL enrich Spark SQL and can glue Spark SQL and Fugue extensions together to be an end to end workflow. Also, Fugue SQL is cross platform; it can run without Spark. In addition, Fugue SQL supports caching and checkpointing. Although it is SQL like language, it is designed for workflows. It’s worth to mention that if running on Spark, Fugue also supports Spark UDFs.
The features of Fugue SQL and the programming interface are imparity. However, they can put you in different mindsets. For example, I want to partition a data frame by a column and get the first record of each partition. If you use programming interface a transformer seems nice and clean, but if you are in SQL mindset, you may firstly consider using over codes. Which one is better? Different people can have different opinions, but here we only provide options.
Next, my co-speaker Jintao will present Fugue machine learning and the streaming. Thank you. – Hello everyone, my name’s Jintao Zhang. I’ll be talking about Fugue machine learning and Fugue streaming.
Fugue ML have four major components. First, you will create adapter to proper ML frameworks so that a Fugue workflow can directly use models to create you these frameworks. For example, Fugue can directly train, tune, and infer a compatible model Tensorflow. Next, we’ll create an abstract layer for popular ML models, for example, a linear regression model in Fugue can have different adapter such as Sklearn and Spark ML. We will collaborate with the NeuronBlocks team on this competent. NeuronBlocks is a high level description language for deep learning and has been opensourced by Microsoft. We have also created algorithm libraries for popular ML domains and topics. For example, disputed low to work for graph data. There’s another library for time series data and more are coming.
Finally, we will provide abstract layer of common ML utilities, for example, parameter tuning, data validation, so that you don’t have to do duplicate work from time to time.
This code block is an ML example of Fugue SQL and it’s a demo of how to use adapters and utilities mentioned in the previous slide. Here first, we need to decide to learn adapter SKModel with a training dataset. Next, we say some common param to define the model. The next section is for hyper-param tuning. The amazing part is that you can conduct model sweeping and hyper-param tuning simultaneously in a definitive way. In this example just read through our three different model to select from: linear regression, linear SVR, and xgboost regressor. Finally, you can use n for the cross validation and the scoring for more robust model selection.
We have 40 distributed entire process including model and parameter sweeping, training, and cross validation. Fugue makes it easy to follow through the practice for machine learning.
Here is an example ML library: Node2Vec, which we have briefly discussed in the previous slides. The core step of Node2Vec it’s randomly walk at every vertex of the graph to generate a large set of random paths as a text. This text are then fit into a Node2Vec premium engine for the final embedding results. We have implemented the distributed Node2Vec algorithm on Fugue, and it use adjacency lists to represent a graph and distributed Breath-First Search algorithm for random walk to achieve high scalability.
We use Fugue to do benchmark testing and coordinate low to work computation, numerous computer frameworks.
The blue curve is a run type performance on load to work code from original author, and other curves are for the performance of the same Fugue code running on different execution engine. For the graph, the 1 million end vertices, the Fugue version on Spark is significantly faster.
This is the greatest show case of the flexibility of Fugue and it’s scalability on load to work.
We also conduct testing on very large graph. For graph of 10 million vertices and a 300 million edges, the load to work process takes two to three hours with 500 cores and three terabyte memory. For the graph with 100 million vertices and 300 billion edges, it takes six to eight hours with 2,000 cores and 12 terabyte memory.
Another ML library is for time series seasonality forecasting, Using Kalman Filter, this library had decent performance on noisy data set. You can simulate special event and anomalies you can store the data. You can take time series at any intervals: hourly, daily, weekly, yearly. It can handle very large number of time series with seasonality to build a way.
Fugue streaming use Fugue components to deal with real time and non-real time applications. We have a Fugue Spark streaming time line in production, which running efficiently and reliably. You can use Fugue to coordinate abstract layer of streaming connectors, coerce framework such as Spark streaming, Flink. We have tested cases on Kafka connectors for Spark streaming, and we are looking for partnership on additional Fugue streaming features.
In the next section, I will demonstrate some Fugue use cases.
Firstly, the traditional set up of an interactive development environment with Jupyter Notebook. With Spark you have to use– user standby Spark plus is required. In our case, we use Fugue as an abstract computing layer and entry point to interact with any of the computing frameworks. Our cluster appear on top of Kubernetes and each user can start and stop their own cluster in their own notebook session. The latency to start and stop cluster is on seconds level. The system is purely on demand. We don’t use centralized standby clusters. Please be aware that Fugue is an abstract layer and it doesn’t depend on Kubernetes or Spark. The unity of this system is introduced by Kubernetes and Spark, while Fugue provides consistency, simplicity, and flexibility. In addition with EFS, elastic file system, mounted to each part the updated code and dependency will be available in the Spark cluster immediately, which makes it very agile for data scientists to try new ideas or packages. And Fugue makes code to be modular and consistent. The workload code and be very easily moved to production phase.
We are also interested to collaborate with Lee Gall from Data Breaks on the environment of Fugue on very large scale cross cluster Kubernetes Spark scheduled by Uniforum.
I also had a talk about Spark Kubernetes schedule meeting in this year’s AI summit what is the system of Kubernetes + Spark + Fugue in the previous slide.
We have collaborated with quite a few product teams to migrate their legacy pipelines to Fugue pipelines. Higher legacy means training each model in a different computer independently. So if you have three of the models, you already need three computers for instance. The migration create large cost and runtime saving. Increase code testability and it reduced your runtime.
All the migrated projects observed the performance improvement in various uses. The average total CPU hours has reduced by 74.6%, and the average total runtime has reduced by 83.9%.
This is a huge case of multi-region regression problem with a few hundred region-based models to be trained and tuned. In each one the legacy pipeline costs about $630 on all models for than seven hours and the reliability only about 80%. Fugue pipeline’s only cost about $23 and finish all models within 30 minutes and the reliability’s 99.5%. In other words, Fugue pipelines have reduced cost by more than 95% and it reduced runtime more than 90%.
Another huge case is for time-series forecasting problem. Suppose you want the forecast valued for some business metric in next week, next month, or next quarter so that you can have better budget planning and decision making. We have used such forecasting model in production. After migrating to Fugue pipelines, the overall cost of runtime saving of those more than 90%.
In summary, Fugue unifies where it’s computing frameworks with uniform interfaces. It incurs many code change on pipeline migration.
Fugue SQL is a novel language for workflows. It can optimize your workflow execution.
Kubernetes + Spark + Fugue is a great combination with high flexibility and efficiency for distributed computing.
There have been a lot of real use case showing Fugue pipelines achieved greater performance improvement on costs, runtime, and the reliability simultaneously.
The Fugue project will be a unified ecosystem for integrating distributed systems and machine learning. We are looking for greater partnership from all that who are interested.
Next, my co-speaker Han Wang announce our opensource plan. Hello everyone, today we have opensourced Fugue and Node2Vec on Fugue. Currently, Fugue can run on Spark, Dask, and native Python. Fugue SQL is also released. You can pip install them or clone on your own machine to give it a try. The tutorials are short and fun. It’s a great way to start. We plan the release the first version on Fugue ML by September 2020, including Sklearn and Testflow adapters, parameter tuning, and data validation features. And by the end of 2020, we hope to release the first version of Fugue Streaming, including the abstract interface of streaming compute backed up by Spark streaming.
We are looking for collaborators. Let’s work together to make the opensource world more unified.
Thank you for attending our presentation of Fugue. Please pip install Fugue to get it a try.
Han Wang is a Staff Engineeri at Lyft, leading the company's Spark Machine Learning projects in Lyft. Before Lyft, he was working at Quantlab, Amazon, Microsoft and Hudson River Trading, focusing on distributed computation and Machine Learning problems.
Dr. Jintao Zhang achieved his PhD in machine learning from University of Kansas in August 2012. Since then he has been working in various companies as data science, machine learning, and engineering roles. Dr. Zhang has extensive experience on using Spark and other distributed computing platform to productionize machine learning pipelines end to end.