Optimizing the Catalyst Optimizer for Complex Plans

May 28, 2021 11:40 AM (PT)

Download Slides

For more than 6 years, Workday has been building various analytics products powered by Apache Spark. At the core of each product offering, customers use our UI to create data prep pipelines, which are then compiled to DataFrames and executed by Spark under the hood. As we built out our products, however, we started to notice places where vanilla Spark is not suitable for our workloads. For example, because our Spark plans are programmatically generated, they tend to be very complex, and often result in tens of thousands of operators. Another common issue is having case statements with thousands of branches, or worse, nested expressions containing such case statements.

With the right combination of these traits, the final DataFrame can easily take Catalyst hours to compile and optimize – that is, if it doesn’t first cause the driver JVM to run out of memory.

In this talk, we discuss how we addressed some of our pain points regarding complex pipelines. Topics covered include memory-efficient plan logging, using common subexpression elimination to remove redundant subplans, rewriting Spark’s constraint propagation mechanism to avoid exponential growth of filter constraints, as well as other performance enhancements made to Catalyst rules.

We then apply these changes to several production pipelines, showcasing the reduction of time spent in Catalyst, and list out ideas for further improvements. Finally, we share tips on how you too can better handle complex Spark plans.

In this session watch:
Jianneng Li, Developer, Workday
Asif Shahid, Principal Software Development Engineer, Workday

 

Transcript

Jianneng Li: Hello everyone. Thank you for coming to our session on Optimizing the Catalyst Optimizer for Complex Plans. My name is Jianneng and my co-speaker is Asif. We’re both software engineers at Workday. As for the agenda for today, we’re going to first talk about what we do and how we use Spark. And then we’ll go right into what complex plans mean to us, as well as some of the problems we see and how we handle them. Okay. First, we work on a product called Workday Prism Analytics, and we use Spark as a way to execute the jobs. So customers are able to use our UI to build ETL pipelines. And these ETL pipelines are compiled through data frames to be executed by Spark. We’ll have both finance and HR use cases, both of which can involve quite complex pipelines. And just to give an example of what a complex pipeline looks like, here’s a Spark UI with our physical plan shown.
And just for reference, the open source Spark logo is basically the top-left pixel. To hear a little bit more about the business side of our product, feel free to check out our talk from two years ago. All right, on to complex plans. Complex plans to us means thousands of operators. And it’s mostly there because of many self joins, self unions, and large expressions. And it can take catalysts hours to optimize and to generate the physical plan. And because it’s so large, it’s very difficult to understand or inspect visually. Just to give you an example, let’s walk through a data validation use case. We start with a few columns and a few rows. And let’s say the first requirement is simply filtered rows based on some criteria. In this case, filters are ID and name, and we have two rows being filtered out.
That’s very simple, in terms of SQL it’s just a WHERE clause. But it gets complex very quickly. Let’s say if a row is filtered, we also want to make sure that all the other rows with the same group ID are also filtered. Now, suddenly, instead of just doing a single select from WHERE, we also have to do a left anti join to subtract out the bad rows. And just to make it even more interesting, let’s want to compute the other side of the filters as well. To compute all the rows that didn’t pass, we have to basically first get all the ones that didn’t pass the filter, and then union that with all the ones that did, but also have the bad group ID. The SQL here is not really important. The important part here is that, for something seemingly simple, we already have many self unions and self joins.
And just for fun, let’s do one more step. And let’s say for every single one of these filters, we want to show a custom error message. And for that, instead of having one WHERE clause with multiple filters, we have to split it into a multiple ones and then union them together just to show different error message for each filter. And that’s why complex plans are complex. The complexity increases gradually over time. And of course you can ask customers to optimize these, but it is much better if our performance is good out of the box.
So how do we handle a complex plans? First and very common way to handle the plans is Common Subexpression Elimination. And this is just a fancy term for identifying commonly shared subplans, and then make sure you only do the computation once. So in the example above, we have many self joins and self unions, and we want to just make sure all these repeated legs are only completed once. So you compute it once and you cache it so that the next time you see something like that, you simply reuse the cache result. There are different ways to implement CSE and you can do it heuristically, you can do it algorithmically. We started off by using heuristics to insert these cache operators based on essentially how the ETL pipelines are structured. But then we realized that’s usually not enough when you get complex enough. So now we’re using the algorithmic approach to go through each node and creating proper hashes, and then identify which ones we can cache.
Because this algorithm is done on outside Spark, I won’t go into detail and stuff, how that’s done. But I will give you an example. So here we have a union of some joins. Right off the bat, you can notice that dataset A and the parse, that’s repeated multiple times. So instead of scanning it multiple times, we simply cache it the first time we see it. And the next time we see the same operator, we reference the cache. Next, we see that actually the join is also repeated. So we can cache that as well. And just to illustrate how important CSE is, we have tested a pipeline where there are four data validation use cases that I described previously, in the single pipeline. And you see that without CSE, the number of optimized operators become tens of thousands. But with CSE, it’s much more controlled. And it just comes from all these replications of the same operators over and over again. And it also shows during runtime as well, without CSE, it’s 10 minutes, but with CSE, it’s much more manageable. 30 seconds is still not great, but it’s much better than 10 minutes.
And just a node about logging, because when you have plans that are so big, more than tens of megabytes in size, you have to be careful on how you log. So you want to make sure that instead of generating the entire stream in memory, you want to stream the logs into disc. And if you’re sending these logs to a centralized log aggregation service, make sure you only sent truncated logs because otherwise it’s going to overwhelm the receiver. Next, let’s move on to large case expressions. What do I mean by large case expressions? In this case, it’s simply you have a case expression with many branches. We’ve seen in production, thousands of branches are created. And essentially the customer is trying to do some sort of data validation or categorizing data based on some attributes. And this is a problem for multiple reasons.
One, is that the function that you’re referencing in each of the WHEN branches, that’s going to be computed for every single one of these branches because Spark doesn’t optimize them away during the optimization time. During code gen, Spark does have something to eliminate these repeated computations, but it doesn’t work enough for us. And that’s for reasons you’ll see soon as well. And secondly, Spark likes to collapse adjacent project operators together, so that you don’t have additional operators. But if you have these large case expressions and you have very nested expressions, this collapse actually blows up your plan really big. And even if it doesn’t, it’s going to generate code that’s so large on that Janino dream compile time is going to run the GBM out of memory. And just to showcase what this plan looks like in open source Spark today, you see that the top two project operators are actually collapsed.
So, the case expression is replicated in two places. So what can I do then? Obviously we want to make sure these two bad things that Catalyst does is mitigated. So first, we come up with a way to identify large expressions so that we can see when a case expression is expansive. And we’ll make sure to not collapse them, make sure the collapsed project rule does not apply. Secondly, we identify these functions that are shared between different WHEN branches. And if it’s used more than once, we will extract it. And we do this during Catalyst time. So when we generate from analyze plan to optimize plan, we generate a plan like this. So that the entire plan, it’s sort of skinny as it goes through a physical plan generation. Just with the same example, what we do then is we take F which is used in multiple places, and we put them into its separate project operator. And we create an alias called K, which we’ll reference here later on.
And just to show why this is very significant, let’s go through an example, benchmark. Starting off, we have a base skin where we create this case expression. And then next, we create an outer level select that takes the inner created computed field, and create an alias out of it to CF1 and CF2. Next, we use both of these created aliases in another case statement. So this is a nested case expression benchmark. And if you look at how performance ranges for between before and after optimization, you’ll see that with only 100, 200, 300 branches, the open source Spark way of approaching nested expressions becomes quickly not scalable.
We can see that it must be some sort of exponential algorithm and you care, otherwise you wouldn’t have time started it [inaudible]. And that’s why being able to identify these large case expressions and handle them properly is still important to us. Because you don’t want to enlarge data use cases, you don’t want to have compile time and optimization time to actually take longer than the actual job time. Because large data is meant to be large data, not large complex plans. Okay. Now, Asif it’s going to take over and talk about constraint propagation.

Asif Shahid: Thanks Jianneng. Hello everybody. The introduction of aliases in the large case expression which Jianneng mentioned, that interacts with the constraint propagation rule and causes some unexpected problems. Before we dive into that problem, let’s try to understand what the constraints are and how Spark optimizer utilizes it. Constraints are nothing but filters on the columns, which optimizer encounters as it traverses from bottom to top, and it keeps track of those constraints. How these constraints are useful. Well, they can be used to optimize the plan by generating new filters. Example, is not null filters. They can be used to prune redundant filters, and they can be used to pushdown new filters on the other side of the join. Let’s see each of this with an example. In this case, as you see there’s a filter node with the condition A greater than 10. So the constraint code will store this filter and it will also generate a new filter, A is not null and store it as a constraint also. A is not null is implicitly generated by the virtue of the inequality on attribute E.
So, the plan actually after the optimization, would introduce A is not null A filter on the filter node along with E. Now let’s take the example of pruning of redundant filters. If you see here on the filter node, we have again, constraint A is not null and A greater than 10. Then there’s a project node where A is aliased as even. And on top of it, we have a filter even greater than 10. Now, since even as an alias of A, and we already have a filter A greater than 10, it implies that even greater than 10 is actually a redundant filter and that should be removed from the plan. So the optimizer would modify this plan and remove the filter even greater than 10. Now, let’s take the example of pushdown offer new filter on the other side of the join. In this case, as you see, there’s a join condition on attribute even. And this attribute even is aliased with E. And we find that the constraints has a condition A greater than five and A is not null. Which means indirectly X is also not null and X greater than five.
As a result, the optimizer will be able to pushdown condition X greater than five and X is not null on the other side of the join. So, the new optimized plan would have this additional filter. Now, how does the current constraint propagation algorithm works? It traverses the tree from bottom to top. When it encounters the filter node, it stores the filter as a constraint and it creates additional not null constraints and store it in the constraint set. When it encounters a project node where the alias to the original attribute, then it will try to create all the possible combination of constraints. By that I mean the following example. If you see, there’s a constraint generated filter node, A is not null and A greater than 10. And when it reaches the project node, since A is aliased as even, the constraints are duplicated. We have A is not null, A greater than 10, and then we have even is not null and even greater than 10. Apart from that, it also generates another equal null safe constraint, which equalizes A to even.
So the number of constraints on project node has increased because of the alias. To prune the filter, it will simply check whether the filter exists in the constraints or not. If yes, it will be able to remove it. In the previous case, we saw that there was even greater than 10 and since even greater than 10 exists in the constraint, it will be able to remove it. And same thing for the pushdown of the filter on the other side of the join. It will check whether there is any constraint available on the join key. If yes, it will use it. Current open source Spark only pushes down filters which are referencing exactly one key, one column. It will not be able to handle compound filters. Now this algorithm actually is memory intensive. The reason being, it creates constraints for all the aliases and it is a combinatorial logic to create those constraints.
So, if we have a simple function F A which is dependent on attribute A and B, and if the number of aliases of attribute A is M, and the number of aliases of attribute B is N, then the total number of constraints generated for one such filter expression is approximately given by this formula. Where the first term M into N, it corresponds to the total number of constraints generated for filter F(A, B) for each of its aliases. The M plus N terms refers to the total number of is not null constraints generated for all the aliases. And MC two an NC two, these are the total number of constraints generated for equal null safe, which establishes the equality among various aliases. So, given that the number of constraint increases drastically as the number of aliases increases. If we look at the fix for large case expression, where we introduced a lot of aliases, this causes an explosive mix in terms of Catalyst optimization. The number of constraints can result in OOM or can take inordinately huge time.
So what is the solution? We have proposed an optimized constraint propagation logic, which also traverses the tree from bottom to top. And on filters’ node, it stores the existing cause filter as a constraint, creates a is not null constraint. But when it reaches the project node, instead of duplicating the constraints, we just drag the aliases in a list so that the constraint is stored in terms of its original attribute only. So let’s take a look at this example here. On filter node we have A is not null and A greater than 10. When a project node comes, we just store the original constraint in terms of original attribute A. So we have A is not null and A greater than 10. And we keep track of the aliases that A and even are in a list where even is equivalent to A.
So how does it? We’ve pruned the filter. We simply rewrite the filter, even greater than 10 in terms of the original attribute, which is A greater than 10. And then we check whether this is A greater than 10, which is the canonicalized version, exists in the constraint set or not. If yes, it can be removed. So here you see [inaudible] this logic, the plan is able to remove, the optimizer is able to remove even greater than 10. How it helps in pushdown of the new filters on the other side of the join? Well, we take a look at the join keys, and rewrite those join keys in terms of the original attributes. In this case, even is an alias to A and B1 is alias to B. Even and B1 are the join keys. So even and B1 are written in terms of A and B. And then we check, is there any constraint available on A, B or involving both?
We find that there is a constraint A plus B greater than five, which indirectly means that X plus Y greater than five is a valid constraint. Along with X is not null and Y is not null. And they can be pushed down on the right side. So the new plan after optimization would ensure that the compound filter also gets pushed down on the right hand side of the join table. This is just a comparison of the existing constraint propagation logic and the optimized one. In this case, if you see the number of constraints, in the current algorithm it is combinatorial. It is highly dependent on the number of aliases. Whereas in the optimized algorithm, it is independent of the number of aliases. We just store the original constraints themselves. And that benefits in terms of the memory usage. The memory usage of current algorithm, because it has to generate so many aliases combinations, is high.
In case of improved algorithm, it is bare minimum. Filter pushdown for join. The current algorithm only pushes down single reference filters, something like X greater than five or Y greater than five. But with improved algorithm, we are able to pushdown the compound filters on the other side of the join. And in terms of creation of is not null constraints, what we have found is that the current algorithm may miss is not null constraint in certain situations, while the improved algorithm is able to detect the is not null constraints more robustly. So that’s it from my side, I will hand it over to Jianneng who has done some benchmark studies on this to take it further. Thank you,

Jianneng Li: Thank you for Asif, for explaining constraint propagation. So after all that theory, let’s actually dive into a benchmark and see how it works in practice. So here on the screen, we have again, a base scan and we’re creating just some aggregate functions to ensure the behavior we need. And then we do a group by. And above that, we have a case expression that references these columns that we’ve created. So C zero one, C zero two, these are the columns we created. And each column is referenced twice, essentially to trigger the logic for extracting the same used functions. And then above that, we create a filter on the case expression. And then finally, we do an inner join. That’s a self join on decondition with this created field. And the whole point of this is to trigger the pushdown predicate on the other side of the join. And the effect is quite drastic.
Even if we’d only have five, 10, 15 columns, you see that the time it takes for optimization increases exponentially. So on the graph here, the very small ones are baseline, which is opensource Spark. The red one is only with the large case expression organizations enabled. And then the right side is both the large case expression, and also constraint propagation improvement. And of course we want to have everything on, so we get the benefit of large case expression improvement, and also constraint propagation improvement. And just to show that this actually impacts our customer workloads. Here, we examine a financial use case for a large insurance company. And they use nested case statement exactly to validate and categorize data as I have stated. And from our benchmarking, we see that baseline Spark, it’ll take roughly 600 seconds to compute. Whereas if you only enabled the large case expression organization, the driver actually runs out of memory. So we’re unable to complete a job. And then with the constraint propagation optimization, we’re able to bring the time down even further than open source Spark. So that’s a net win for us and also for the customers.
So closing thoughts. What did we learn from handling large case expressions and also all the other aspects of complex plans? Roughly, mostly just summarizing what we’ve touched on. We want to take advantage of CSE because you see how drastic improvement CSE can bring. If you don’t have to compute the same thing multiple times, then obviously that’s going to save you time. Continuing along the same trend. We want to reduce the number of operators in general, and also limit the number of aliases. Because the more of these you have, the harder it is for a catalyst to handle them. And of course, if you want to uptake our improved constraint propagation algorithm, please follow the open source Spark Jira. We will try to push it into Spark, and hopefully you will see this in mainland Spark in the future.
And for future work, we are continuously looking at Catalyst, to see what we can improve. And currently we’ve noticed that pushdown predicates and also collapse project, both of these Catalysts rules are a little less efficient than we would like it to be. We’ve been experimenting around this part. We don’t have anything concrete to share yet, but keep on the lookout if we have anything in the future. Secondly, if you see this case expression usage, we’re really trying to implement a rules engine in Spark. And using case expression for that is not the best tool for the job. So ideally we want to use some of these research that’s been done over the past decades. For example, if the rules are simple enough, maybe simply convert the case expression to a lookup table. Or if it’s more complex, we can use something like 3D algorithm to be able to convert this into a graph, and then also execute the case expressions much quickly. And that’s it for our talk. Thank you for your time, and we’re here to answer any questions.

Jianneng Li

Jianneng Li is a Software Development Engineer at Workday (and previously with Platfora, acquired by Workday in late 2016). He works on Prism Analytics, an end-to-end data analytics product, part of t...
Read more

Asif Shahid

Asif works in the Spark team of Workday's Prism Analytics division. He enjoys the nitty-gritty of Spark internals, and has over 20 years of experience in the field of distributed caching, SQL and obj...
Read more