SparkCruise: Automatic Computation Reuse in Apache Spark - Databricks

SparkCruise: Automatic Computation Reuse in Apache Spark

Download Slides

Queries in production workloads and interactive data analytics are often overlapping, i.e., multiple queries share parts of the computation. These redundancies increase the processing time and total cost for the user. To reuse computations, many big data processing systems support materialized views. However, it is challenging to manually select common computations in the workload given the size and evolving nature of the query workloads. In this talk, we will present Spark Cruise, an automatic computation reuse system developed for Spark. It can automatically detect overlapping computations in the past query workload and enable automatic materialization and reuse in future Spark SQL queries.

SparkCruise requires no active involvement from the user as the materialization and reuse is applied automatically in the background as part of query processing. We can perform all these steps without changing the Spark code, thus demonstrating the extensibility of Spark SQL engine. Spark Cruise has shown to improve the overall runtime of TPC-DS queries by 30%. Our talk will be divided into three parts. First, we will explain the end-to-end system design with focus on how we added workload awareness to the Spark query engine. Then, we will demonstrate all the steps including analysis, feedback, materialization, and reuse on a live Spark cluster. Finally, we will show the workload insights notebook. This Python notebook displays the information from query plans of the workload in a flat table. This table helps the users and administrators to understand the characteristics of their workloads and the cost/benefit tradeoff of enabling SparkCruise.


 

Try Databricks

Video Transcript

– Hi, everybody. This is Priyanka. I’m a developer in the Spark team in Azure data. I mostly work on building analytics as a service platform. Today we’re gonna look at SparkCruise and automatic computation reuse in Spark system that we have built.

SparkCruise: Automatic Computation Reuse in Spark

It’s been a crazy time with the pandemic. I’m so glad that the conference was made virtual and I wanna thank you all for attending our session today. Let’s quickly go over the agenda for today. Today’s talk is gonna be split in to two parts. In the first half, we’re gonna go over SparkCruise, which is the automatic computation reuse system that we’ve built. We’ll look at the problems faced and the system design and go over a demo of how it actually works in action. And in the second part of this talk, we’re going to go over a Workload Insights Notebook. It’s a tool that we’ve built for you to be able to analyze whether SparkCruise is something that your workload can benefit from. – Thank you, Priyanka. We are in an age where the database administrator is completely overwhelmed. Traditionally the databases were located on premise. These systems had experienced database administrators who can tune the query workloads with sophisticated administrator tools. These installations also have offline cycles which can be used to schedule maintenance tasks such as collecting statistics or creating views.

Today, cloud providers like Azure offer fully managed database services. In these cloud-based data services, the users can simply point to their datasets and start querying without paying any upfront costs. This has led to tremendous increase in the number of users and the sizes of query workloads.

Though in this setting both the database administrator and the cloud developers can tune the query workloads but they lack the tools to analyze and optimize such massive workloads. Also note that there are no offline cycles in serverless systems which can be used to run maintenance tasks such as updating views.

Workload Optimization

We have invested a lot in building excellent query optimizers. Spark itself added Cost-Based Optimizer in 2017 and Adaptive Query Execution in Spark 3.0. But still, these query optimizers process one query at a time. This is because the queries are assumed to be independent of each other. And same set of rules is applied at each time.

However, in production workloads the queries interact with each other. The queries can share computations, can have dependencies in terms of execution ordering. For example, the next query can start only after the previous query has finished. And some queries of the workload can share a common template. Hence, we believe that we should optimize the workload, not just individual queries. And there are so many optimization opportunities in the workload. Optimizing the workload will also reduce the total cost for the user.

Now to make this journey from query optimizer to workload optimizer, we need a workload-based feedback loop. We can learn models from past query workloads and apply them again in the future. Our system should also be able to adapt to changing workloads. Therefore we need a continuous feedback loop to the optimizer that can react to changes in the workload.

In our SparkCruise system, we have added a workload-based feedback loop in Spark query engine.

Computation Reuse Problem

Now, let us look at a specific problem in workload optimization, the problem of computation reuse. In this problem, parts of computation are duplicated across queries. For example, here we see two queries Q1 and Q2, processing the same set of datasets. These two queries share a common filter and a common join. Our analysis revealed that 95% of queries in TPC-DS have overlap with atleast one other query. We get such a high percentage of overlap in TPC-DS workload because it has many queries over the same set of data or over the same set of tables.

We’ll present our analysis of TPC-DS workload later in this talk. Similarly, in production we observed that 60% of queries have overlap.

A challenge for us in applying computation reuse is that things are constantly evolving.

For example, the workloads are not fixed but recurring. In recurring workloads, queries are submitted at a fixed frequency over changing data, possibly with different parameters. We solved this problem by implementing two distinct query hashes. Our research paper has details about their implementation.

Reuse with SparkCruise

Now, let us look how SparkCruise systems work. Consider the same query workload with the common join operation. SparkCruise can automatically find such repeated computations with high utility.

Now once SparkCruise is turned on and the first query is run again, possibly on a newer version of the dataset, the query will automatically materialize the result of the join operation.

Then when the second query runs it can directly read the materialized output rather than computing it again. Note that none of these steps require any active intervention by the user. SparkCruise is also robust to small changes in query workloads. For example, in this workload the date filter changed from 26 June on day 1 to 27 June on day 2. But SparkCruise was successfully able to handle these kind of changes.

SparkCruise Design

Now, let us look at components of SparkCruise that make automatic computation reuse possible.

First, the queries are submitted to Spark server. We have added listeners that can log query plans annotated with identifiers for sub-plans.

These logged query plans are then passed to generate a workload table.

On this generated workload table we run view selection algorithm to select common computations with high utility.

This set of selected computations is then feedback to the optimizer. We have already enabled the optimizer extensions feature in Spark. We supplement the traditional Spark optimizer with two rules. One rule for materialization and another rule for reuse.

These rules can directly read the feedback and apply them in an online fashion when the queries are submitted again. Now, Priyanka will present a demo of SparkCruise on Azure Synapse. – Thank you Abhishek. In today’s demo, we’ll first have a pre-run workload of queries which we’ve already run and collect the application logs of that workload. After that we’ll actually run through the second step of analysis which goes over these application logs and identifies potential queries that can be reused. And finally we re-run the same set of queries and we take a look at the comparison between the plans with SparkCruise enabled and without, that is with materialization and reuse and without. So, let’s take a look at the demo. I have a notebook here in an Azure Synapse analytics workspace which is now in public preview to provide a limitless analytics experience for our customers. I ran this notebook just a couple minutes ago. It’s a simple notebook where we read some sample data and run a set of queries on it. We do a count and some projections and filters. We’ll take a closer look at this notebook in just a few minutes. I also have the Spark history server open for this notebook session which we’ll use as a reference after we’ve applied SparkCruise. Now, we have a Spark job that we have and we will make available for users and this job is to analyze the application logs and provide a feedback file with queries which can be materialized and reused. I’m gonna go ahead and run this job against the same workspace and while this is running I’ll walk you through exactly what we expect to see.

With the feedback file that’s generated as a result of this job, if I run a query that has already been run then the result first gets materialized as a view and for subsequent runs of the same query we’ll be able to reuse the result automatically. Depending on your workload and how often your data changes you can run the analysis job per your desired frequency. This can be every day, every week or every month. The location of this feedback file can be provided as part of this job and in my case I have provided a location which is part of the primary storage account which is associated with my workspace which I can also browse right from here. Once this job has finished running we’ll see a feedback file that’s generated here. And we can also provide the custom location where we want the views which are materialized to be generated. So, let’s give this a few more seconds. All right. So we have the file now and this is what will be used in subsequent runs. So let me go ahead and re-run this same job.

And while it’s running let’s take a closer look at what the notebook has. So I have a CSV file with some sample table which I read and write into a temporary view. And there’s a bunch of Spark SQL queries that I run on this. So the first one is a select count *. And you see that there are two occurrences of this. The reason for this is, the very first time I run this based on the feedback file we’ll know that we have seen this kind of query in the past and we’ll decide to materialize it here and in the second instance of the same query being run we’ll be able to reuse the query result. Similarly, the first time when I run this which is a select distinct market where I project one of the columns where querytime like 11 is the filter that’s being applied we’ll see that this filter has been applied in the past so we’ll decide to materialize the result of that filter. So when I run the second query which is projecting different columns but with the same filter we’ll actually just use the existing materialized result to run this query on. So let’s just give this a few more seconds and we’ll be able to look at the difference between the query plans that have been generated in the old job and the new job. So let me pull this out. This is going to be the old job which is one the right side and this is the currently executing job. Let’s give this a few more seconds to finish running.

So the first instance of the count has been done. The second instance of the count happened now. We’re running select distinct market from sample table where querytime like 11.

Followed by selecting state and country where querytime like 11.

All right. So let’s stop this session and view the Spark history server for this session. And to remind you, the old job is on the right side and the new Spark history server will be on the left side.

Great. Now this is the application for the notebook which just ran.

So let’s look at an instance of collect. This is what we saw in the old job and I’m going to select the collect from the new job. To remind you, the one on the right side is the older job and the one on the left side is the new job. So on the right side, we see that we have to scan and aggregate versus on the left side both the scan as well as the aggregation step is skipped and we can reuse the existing view. Similarly, let’s take a look at another action that we performed which is the show which was a part of applying a filter.

And here again, we can see that the scan and filter have been skipped and we just use an existing view. Another thing to note is that SparkCruise is pretty robust to smaller changes in filters such as like, if you change the querytime to 12 that should be fine, you don’t have to re-run the analyze job and because it’s a recurring job and it’s just a change in the filter because maybe you have a daily job or an hourly job we will still be able to materialize and reuse views automatically. And another way to make this as part of your workload is you can add a pipeline with notebooks and add the analyze job as part of your pipeline right from Synapse. And you should also be able to run batch jobs the way I ran it from IntelliJ or you can also go ahead and create a Spark job definition and submit the same job. We evaluated our system on the TPC-DS workload. The TPC-DS workload consists of a 102 queries. And what we see in this plot here is the ratio between the query run times with SparkCruise enabled and without. A ratio that’s less than one means that the query runtime was faster with SparkCruise enabled and red bars here indicate queries which have over a ratio of one. We do see these red bars because these are the queries to first hit a sub-query which means that they pay a materialization overhead cost but we see that overall in the workload there are many green bars which means that most of the queries actually benefited from reusing these materialized views.

Overall, we saw an improvement of 30% in wall clock time and CPU time.

SparkCruise Summary

This concludes the first part of our session today. We looked at a hands-free computation reuse system for Spark.

We saw that there’s a workload-based feedback loop in Spark query engine and I want to mention that these were built entirely using Spark extensibility points which means that you don’t need a special fork of Spark to get this to work. It just works out of the box. And we have these automatically applied. And I’ll go over to Abhishek to summarize how you can analyze if SparkCruise is something that your workload can benefit from. – Thank you Priyanka. We saw that SparkCruise can reduce the overall cost of the workload by automatically reusing the computation. To enable this feature we have added a workload-based feedback loop in Spark query engine. SparkCruise can select high utility common computation and apply automatic materialization and reuse as part of query processing. And none of this requires any changes in Spark whole. The user can simply add our libraries and set a couple of configuration parameters to get started.

Workload Insights Notebook

But users still want to understand whether SparkCruise will benefit their workload or not. Generally, users can understand individual queries. But it’s very difficult to analyze the complete workload.

Spark history server also has a nice visualization for individual queries but there is no place where we can see the workload statistics in aggregate. Therefore to solve this problem and inform users about redundancies in their workloads and potential savings from SparkCruise we developed Workload Insights Notebook.

Before we demo the Workload Insights Notebook, let’s illustrate what a workload in Spark looks like. In Spark, a workload contains application, an application contains queries with logical plan and executed plan. These entities also have metadata and metrics associated with them. And in an workload the queries evolve over time. We link all these entities together to create a tabular representation of the workload. This workload table is available for instant querying.

To see what insights we can derive from this workload table let’s go to the demo of Workload Insights Notebook. Workload Insights Notebook is an interactive notebook to analyze the query workload. We are using Azure Synapse studio for Workload Insights Notebook. In this demonstration, we will analyze and derive insights from the TPC-DS workload. We will also show the benefits of SparkCruise on this workload.

The workload analysis job shown in the SparkCruise demo creates a tabular representation of the query workload. The first step is to load the tables and a list of selected views.

Now, let’s take a look at a subset from the table.

The attributes in the table include application and query level information like query execution time, operator details, signatures to identify sub-plans as well as metrics like row count and exclusive time. There is one row per operator in the query plans. For example, the first row is an aggregate operator.

We can now start querying the workload table.

How many queries and operators are in this workload? It says that this workload has 102 queries and 3824 operators. Another question for my task is, what is the frequency of operators in the workload? For example, how many filters and joins do we have in this workload? We can see here that we have 933 filters and 735 joins.

Note that we are querying this workload table using SQL. And we can easily visualize these tables in the Azure Synapse notebook.

Now, let’s check if there are redundancies in the workload. SparkCruise removes these redundant computations to achieve performance gains.

In this workload, we find that 95 out of 102 queries have overlap with atleast two other queries at non-leaf levels. This is expected since TPC-DS workload has many queries over the same set of tables.

Now, let’s try to get a sense of which operators are overlapping. So we group the overlapping computations by operator name. In this table, first column has operator name, second column has operator frequency, third column has number of repeating computations, and fourth column has number of distinct repeating computations. We see that 97 filters are repeated 513 times. And there are 18 joins and 10 aggregates in common. Let’s go to a plot for this table.

In this workload, more than 50% of filters are repeating. Since joins and aggregates often appear later in query plans fewer percentages of them are repeating. But it could still be useful to reuse their results as they are generally more expensive. It is encouraging to see these redundancies in TPC-DS workload. Next we try to identify how much we can reuse with SparkCruise.

This table summarizes the result of our view selection algorithm. The view selection algorithm has selected 18 filters that are repeated 138 times, 12 joins and other operators for reuse. And we also report the benefits in terms of computation cost and materialization cost per view. This analysis can be used to identify redundancies in any workload. There are additional sample queries in this notebook that can, for example,

report the selectivity of filters.

Or find sizes of intermediate shuffles.

And even identify recurring jobs independent of job submission systems in the workload. This notebook is extensible and the users can write their own custom queries. We hope that this notebook helps you in answering questions about your query workload. Thanks for watching this demo. Thank you for watching the demo of Insights Notebook. We hope that you are excited to try SparkCruise and Insight Notebooks on your query workloads. SparkCruise is available as an experimental feature on Azure HDInsight and it will be released on Azure Synapse soon. If you want to know more details about this SparkCruise system please refer to our papers listed in this slide or contact us directly.


 
Try Databricks
« back
Abhishek Roy
About Abhishek Roy

Microsoft

Abhishek Roy is a Research Engineer in Gray Systems Lab (GSL) at Microsoft. His research focuses on improving performance of big data query engines in the cloud. Before joining GSL, he was a graduate researcher in the database group at University of Massachusetts Amherst. As part of his dissertation, he collaborated with New York Genome Center to build a big data platform for genomic data pipelines. Ultimately, he hopes that his work will make data processing faster and more resource efficient for the end users.

About Priyanka Gomatam

Microsoft

Priyanka Gomatam is a Software Engineer in Azure Data at Microsoft. Prior to Microsoft, she worked on building an AI and ML Platform at Visa. Priyanka has completed her Masters in Computer Science from The Pennsylvania State University. Her interests lie in building cloud-scale Analytics-as-a-Service platforms and big data systems.