The architectural tradeoffs between the map/reduce paradigm and parallel databases has been a long and open discussion since the dawn of MapReduce over more than a decade ago. At Facebook, we have spent the past several years in independently building and scaling both Presto and Spark to Facebook scale batch workloads, and it is now increasingly evident that there is significant value in coupling Presto’s state-of-art low-latency evaluation with Spark’s robust and fault tolerant execution engine. To this end, we’ll present Presto-on-Spark, a highly specialized Data Frame application built on Spark that leverages Presto’s compiler/evaluation engine with Spark/Cosco’s execution engine. In this talk, we’ll take a deep dive in Presto and Spark’s architecture with a focus on key differentiators (e.g., disaggregated shuffle) that are required to further scale Presto. We’ll then present the Presto-on-Spark project in detail, and discuss the motivation, design and current status of this project. We believe this is only a first step towards more confluence between the Spark and the Presto communities, and a major step towards enabling unified SQL experience between interactive and batch use cases.
– Hello everyone. So it was a best of times, it was the worst of times. Today we are going to tell a tale of the two computation entries. I’m Wunlei, I’m a research scientist working data platforms in Facebook and today Andrii and I will present about Presto Spark.
And Andrii is also working Facebook platform as a software engineer. I will talk about the introduction and the what motivates us to do this Presto Spark. And Andrii will follow up with the design implementations and our current status. Okay, let’s start with the introduction part. So, let’s start with the feature use case at Facebook. Roughly speaking, there are three SQL use cases. The first is the so called reporting and the dashboarding. This includes serving custom reporting for both internal and external developers for business insights. And also in Facebook, a lot of those AB tests infrastructure is also built on Presto. The characteristics of this use case is low latency. So think about it requires tens to hundreds of milliseconds with very high QPS requirement. And not surprisingly this case group is almost exclusively using Presto since Presto is designed for that. The second use case is adhoc analysis. This is an interesting use case as Facebook internal users such as data scientist, business analytics, they want to perform complex adhoc analysis to understand for example, usage trends and how to improve the product.
Usually this means moderate latency. So thinking about seconds to minutes because the query is crafted adhoc. So you cannot expect a milliseconds latency and the QPS is quite a low because users has to type those queries. Interesting note, if users usually craft the query and iterate over those results. So user is kind of waiting and this category use Presto but we also use Spark CGU occasionally.
the final use case, which we call the batch pipelines. Essentially those are scheduled jobs. That’s really every day or hour whenever the data is ready. This often contains queries over very large volumes of data. And the latency can be up to tens of hours. Thinking about the largest pipelines when you’re on Spark today in Facebook. for this use case both Presto and Spark are used and generally we find Presto are more welcome for small batch jobs, say, last for a few hours and the Spark becomes dominator for those large batch jobs.
So as we discussed before for reporting dashboard and the adhoc analysis we mainly use Presto while for batch jobs, we use a mix of Presto and Spark. And this is mainly due to Presto doesn’t scale for large batch pipelines at Facebook. Usually such pipelines is to run many hours or to enjoin an aggregation of a huge amount of data.
This is definitely not ideal because it is quite costly and is inconsistent with SQL experience. First of all, we all know the two SQL engines has slightly different SQL dialects.
Another thing we find is about these subtle semantic difference. For example although both Presto SQL and the Spark SQL follows SOC2, SOC2 specs just say, in some case SOC2 spec says it’s up to the vendor engine to decide whether to return null or throw an exception.
Other cases, including structured data behavior, such as I rate.
You’ll probably, for companies running multiple SQL engines, the problem is user defined function difference is also a well known problem. Finally, we have also seen users develop a different best practice when writing SQL on different engines such as query hint or join ordering.
So this really pose a pain point for users. So for example, a user might want to test their query adhoc mode use Presto and then later they have to convert it to Spark when they really running a batch pipeline in production. And also someday user might want to say Hey, this is a dashboard the queries running very well for many months. But we want to run a batch as well over our larger volume. And in such case use, often you’d have to translate their query between the two SQL engines which is a huge pain point.
So now we want to dig a little bit into the Presto scalability.
So let’s do a first quick reveal of Presto and Spark architecture.
So as you can see, Presto is designed for low latency and follows this classic MPP architecture It’s used in memory streaming shuffle to achieve low latency. And also we are trying to schedule as much as possible queries on the same Presto worker to get a better multitenancy. And in the meanwhile Spark is designed for scalability from the very beginning.
So not surprisingly, it follows the MapReduce Architecture. The shuffle is disaggregated from computation by materializing to disc, as we will discuss later. Also Spark maintains the isolated executor for each query, which we have seen for bash jobs. It reduces the operational overhead.
So, okay, let’s go to this question about why Presto or other MPP they have doesn’t scale? This has been an open discussion for decades and the still get asked in our recent paper, EVB VLDB 2009 which compare the MapReduced style data processing systems and the traditional parent habit. So to this end, let’s examine our very simple aggregation queries. So essentially this query goes over the orders table in TPCH and doing aggregation of custom key computes the total price, the sum of total price. And, as we said, Presto leverages this in memory shuffle.
And so to execute it, Presto will do a shuffle on the custom key after reading the data and the doing aggregation for the same key on each worker. So, doing in memory shuffle means standards will rise to the in-memory multiple buffer and the wait for the data being fetched by the receiver. As a result, we have to execute all the tasks, before and after the exchange at the same time. So thinking about in the MapReduce world, all the mappers and reducers has to be run concurrently. This makes memory shuffle an all or nothing execution model. So for example, this causes inflexible scheduling and photography becomes more difficult because everything is running concurrently. And also, in the aggregation phase it might exceed the memory limit because everything has to be held in the main memory. So, this motivates the Presto Unlimited work. The high level idea here is to bring this MapReduce style disaggregator shuffle to an MPP runtime. And we did this by adding a materialization step right after the shuffle. So as you can see this intermediate shuffled data is now returned to disc and in Presto, this is modeled as a temporary partition in the payroll.
This indeed brings more flexible execution after shuffle. So thinking it has, in the reducer phase, we can now have better scalability because we can do petition level retry. We can schedule only a few reducers at the same time to reduce peak memory limitation, peak memory consumption.
So what’s the key here? The shuffle is now disaggregated from computation on the reducer side. Unfortunately not on the mapper side. So we improve scalability, but the mapper, it still doesn’t scale well.
so, finally that’s why Presto-on-Spark comes. Essentially we try to execute Presto Evaluation Library on Spark Runtime, as Andrii will talk about the details in the next part. So with Spark being used at the real time now we can do a fully disaggregated shuffle on custom key for both mapper and the reducer side.
And this means all mappers and reducers can be independently scheduled and can be independent and re-triable. We also bring other good things from Spark, including spectrum of execution and better resource management.
So, finally one question we often get asked is why we want to do Presto-on-Spark instead of making Presto Unlimited more scalable. So to answer that, that let’s recall what is missing for Presto Unlimited it to truly scalable. So first we need a fully disaggregated shuffle and we also find when fully disaggregated, the shuffle is dumped. Isolated executer, and the other interesting features such as the speculative execution and the schedule and tailor them for a batch job, batch jobs also quiet. So notice those actually lays down a foundation for general purpose parity of data processing systems such as barcode test. Such data processing system has its own usage and the well-defined program extractions. So instead of embedding such a mini Spark Runtime inside the Presto, we believe we should really leverage existing well developed system to scale large batch jobs. For example, Spark, which is the most successful parent in that processing system in the big data ecosystem. We also believe such collaboration between the Presto-on-Spark will help in general the whole big data community to better understanding the abstraction between SQL engines and the general data parallel systems as well as evolve and refine the execution primitives. So with this, I will now hand over to Andrii to talk about the actual design and implementation. – In the second part of this presentation I would like to discuss some key design principles and also dive in more details around implementation of Presto-on-Spark.
The key design principle in Presto-on-Spark is the Presto code is run as a library in Spark environment. So the classic Presto cluster is not actually needed to run queries with Presto-on-Spark From the Spark point of view Presto-on-Spark is just a custom batch application.
Presto query is passed as a parameter to that application. And another key a design detail is that Presto-on-Spark is implemented with LEDs and it doesn’t use data frame API and all the operation done by Presto code is completely passed to the Spark engine. So Presto-on-Spark doesn’t use the distributed file system client provided by Spark. It doesn’t use any file format decoder and coders. All of that is provided by the Presto library. So on the right side there is an example of Spark submit command that is used to run Presto-on-Spark. So it basically takes the artifacts. It takes package, it takes configuration and query as a parameter. So no conventional Presto cluster is involved. We start at the execution is started by running the Presto code on Spark driver to do pre-process the Presto query.
so first we run Presto parser and optimizer to generate logical plan. As you can see for this simple joint query we have a pretty simple logical plan. It’s basically a scan of two tables, applying filter and joining them together.
And then based on the logical plan we run Presto distributed planner to generate distributed plan. And the distributed plan is the same for both Presto, Presto-on-Spark, and classic Presto. In the future we may also add some additional stage, an adaptation stage that basically does some transformation for the Presto plan to make it more, it would make it better and more optimal for Presto-on-Spark model of execution. The next step is the translation of Presto distributed plan into a Spark RDD.
So the first step in this translation process is
a split enumeration. So we enumerate all the splits and events and then we create parallel RDDs by calling a sparkContext parallelize. And then we map the splits into a list of players by applying Presto evaluation as an effect mapper. So basically each output payer contains a partition ID as a key and the row as its value.
And then we run, then we repartition the output by the key with applying partition by function. And finally we zip partition together and run Presto evaluation to actually perform, to actually joint two tables together. For joints with more than two inputs we have a custom RDD that is based on zip RDD. The custom RDD allows to zip arbitrary number of inputs like for in way joins or for unions or for this type of operations.
On this slide you can see…
And the generated Spark DAG based on the Presto distributed plan.
So it has stage, it has three stages, stage zero and stage one. They run mapper operations on the input tables, and then basically stage two runs joint operation. To create a processor function, we serialize and send a plan for a specific fragment to an executer. And then based on the plan fragment, we create a local execution plan that is used to create a processor function. So the processor interface is very simple. So in case we fully fragment the process processor takes a set of splits and produces a set of ropes. For an intermediate fragment, the process takes a list of inputs containing rows from upstream stages and produces a set of rows as an output. It is important to mention that Presto Evaluation Engine is columnar based. So Presto operates on columnar data structure called PAGE. And as part of this project we had to develop row based representation for Presto data. It is needed, to be able to use Spark shuffle. And it is by itself fundamentally row-based operation.
So before the data is supplied through Spark it is converted from columnar pages to row based representation. It is also important to understand the efficiency implication of this conversion. So in conventional Presto, a page has to be serialized to something we called serialized page. So it can be sent over the wire to a downstream stage.
In Presto-on-Spark, we replaced the serialization with directly translating page into a list of serialized rows. Based on that we don’t expect a significant efficiency loss caused by this conversion. Another interesting query shape is the broadcast join. So in case of the broadcast join, the distributed plan has only two fragments.
One fragment scans one of the tables. The other one scans the other table and joins it with the first table without performing any shuffle operations. So the first table has to be broadcast it to every note that reads the second thing.
So how do we translate it to our RDD?
So the RDD interface doesn’t have the built in support for broadcasting data in Spark. So for this query shapes we actually need to create two separate RDDs.
So first RDD, it scans and filters the other table.
And then the result is collected on the driver and broadcast variable is created containing this, the rows from the first table.
Then the broadcast variable containing this, the result of the first RDD, is injected into the fragment that scans line item.
So as you can see, they’re like two separate jobs that are linked with the broadcast variable.
On this slide we can see how the first and second job look like, so basically these two jobs are just a simple map operations.
And the connection between them is basically a broadcast variable that is actually not visible on this page.
So the execution of a broadcast join is very similar. So the broadcast fragment executer API looks exactly the same as the leaf executer of your line item partition join. However, the joint fragment executer now accepts a broadcast variable containing the data for the first table.
So a couple of words about a Spark execution model versus Presto-on-Spark execution model.
So on Spark, the thread management is done by Spark executer. So Spark executer runs multiple tasks, one thread per task. Each task accepts a single partition produced by a shuffle.
So the shuffle has to produce one partition per thread.
With Presto-on-Spark, we always run a single Spark task per executor. The thread management is done by Presto-on-Spark itself. So single Presto-on-Spark tasks accept a single partition. Then it does another level of local in memory shuffle to assign sub-partition for every thread. And then it runs every sub-partition in parallel using the thread pool managed by Presto-on-Spark internally. So with this model, it allows us to reduce load on the shuffle service as now. It has to produce only a single partition per executer which is one partition per thread. Also it allows us to save memory of broadcast join as the internal hash table or presentation for broadcasted tables can be shared by all the threads within a single Presto-on-Spark task.
Another thing, another interesting problem that we encountered is dependency management. When we first tried to run Presto-on-Spark, we saw a lot of incompatible dependency clashes.
Basically at some point we realized that reshading our dependencies doesn’t really scale. It is not sustainable longterm. The dependencies tend to change from one version to another and it’s very difficult to keep track of these changes to support forward and backward compatibilities between different version of Presto-on-Spark and Spark. So instead of trying to relocate classes within dependencies, we decided to simply run Presto code and the isolated classloader.
So Presto-on-Spark, at Presto-on-Spark we actually create two artifacts. One artifact is the launcher and the other artifact is the Presto-on-Spark package. So we start Presto-on-Spark with the launcher
and then we pass the package as a parameter to the launcher. Then launcher knows how to extract this package and how to bootstrap Presto Classloader and all the Presto services based on this package. So the Presto itself also uses classloaders to run plugins.
So plugins are also supplied and the Presto-on-Spark package along with the main code with the main Presto code. Ideally the Spark engine should provide this classloader isolation by default, internally. Running all user code in a separate classloader. So this isolation might be generally useful for custom Spark application. And maybe at some point we would even want to contribute this classloader isolation to the opensource Spark. A couple of words about current status of the project. So the project is still under active development of GitHub and most of the query shapes are supported. We are still working on supporting some flavors of union.
And we also are going to invest some time in making this future being publicly available. We are going to work on the command commits so everybody knows how to write run it.
And we also conducted some initial scalability tests. So we were managed to scale
Presto-on-Spark to be run on 10,000 mappers and 10,000 grid users. So it is almost 10X the size of our existing Presto clusters.
We also were able to run queries that would otherwise require more than 50 terabytes distributed memory when run in classic Presto. And we’ve seen because we can run the query with much higher parallelism, we’ve seen a very nice wall time reduction for large batch queries. So the query that takes more than six hours in Presto, we managed to run in under two hours with Presto-on-Spark.
That’s gonna be it from us. Thank you for joining our presentation.
I work in the Data Platform Team at Facebook, with a specific focus on large-scale distributed database systems. I'm excited to be a part of growing and scaling Presto at Facebook. I received my PhD in Databases from Cornell University.
I work in the Data Platform Team at Facebook focusing on scaling distributed database systems. Previously i was working at Teradata on developing scalable, open source data processing systems. I received my degree in Applied Mathematics from Lviv National University.