Broadcast join is an important part of Spark SQL’s execution engine. When used, it performs a join on two relations by first broadcasting the smaller one to all Spark executors, then evaluating the join criteria with each executor’s partitions of the other relation. When the broadcasted relation is small enough, broadcast joins are fast, as they require minimal data shuffling. Above a certain threshold however, broadcast joins tend to be less reliable or performant than shuffle-based join algorithms, due to bottlenecks in network and memory usage. This talk shares the improvements Workday has made to increase the threshold of relation size under which broadcast joins in Spark are practical. We cover topics such as executor-side broadcast, where data is broadcasted directly between executors instead of being first collected to the driver, as well as the changes we made in Spark’s whole-stage code generator to accommodate the increased threshold for broadcasting. Specifically, we highlight how we limited the memory usage of broadcast joins in executors, so that we can increase the threshold without increasing executor memory. Furthermore, we present our findings from running these improvements in production, on large scale jobs compiled from complex ETL pipelines. From this session, the audience will be able to peek into internals of Spark’s join infrastructure, and take advantage of our learnings to better tune their own workloads.
– Hello and welcome, my name is Jianneng and I am a software engineer at Workday. Today I’ll be talking about Broadcast Joins.
So, legal staff aside, today’s agenda, is first a little bit about how Spark works in my organization and then broadcast joins, how we improved it and finally a Production Case Study.
So, how Spark works with us. Spark is the engine that we use as the data processing backbone.
My organization works on a product called Prism Analytics and it’s service is to the customer in various ways but at the core of it, customers use our self-service product to build data transformation pipelines, which are then compiled to DataFrames and then ultimately executed by Spark. We have both financial and HR use cases. Financial such as, we try to manage your credits and debits and HR use cases where we try to gather insights from your work force. And as you can imagine financial use cases will have the traditional kind of big data sort of volume, where there are a lot of different transactions going in on a daily basis whereas HR use cases may not have as much data but what’s interesting about HR is the plans are much more complex and this talk is going to focus on HR use case. So if you see that the hardware that I’m using the data that I’m using is not necessarily big the focus is more on the complexity of the plans. And just to give you a sense of how complex things are, on the right side, we have a diagram of this physical plan of our HR use case. So if you’re used to seeing the physical plan shown in the Spark UI, the apache Spark open source logo on the top left in this picture is just a few pixels. Just to give you a sense of how big the plans are.
It’s typical to see up to hundreds or even thousands of operators. So it’s often very difficult to analyze.
If you are interested in seeing how Spark is integrated into our work load, then feel free to look at our presentation from the previous year.
Okay, now going on to Broadcast Joins in Spark. Firstly, a little review of what broadcast join means. So let’s say you have two nodes and you have two data sets, the blue table and the red table and you want to join them together. So a broadcast join would broadcast the smaller side of the table so that the table exists in it’s entirety in both nodes. So in this case, red table gets broadcasted to both nodes and then you can perform the join and this is fast because you don’t have to shuffle the bigger side of the table and each partition can execute a stand alone.
And just to compare this with the other popular join strategy which is shuffle join, broadcast join again is better because it avoids shuffling the bigger side whereas shuffle join is to shuffle both sides. And because there’s no shuffling involved, broadcast join naturally handles data skew because you don’t have to worry about skewed data going into one partition, making that partition process take a little longer to process than the rest and conversely shuffle can have this problem because you are doing the shuffling. And then on a similar vain, broadcast join is better for selective joins. So imagine you have multiple joins but in the last join, you just happen to have a very selective join that produces very few rows In the Broadcast case, you can execute all the joins in one shot and that doesn’t affect the output very much whereas, in shuffle join, for each join after you finish you have to shuffle it to disk and that can produce many unnecessary intermediate results. So for other benefits that broadcast brings, there are some draw backs namely we have to broadcast the data, which means the data needs to fit into memory. And then lastly broadcast joins cannot always be used for all joins, for example if you are using a left outer join then it’s not possible to broadcast the left side because then it’s not possible to know whether something exists on the left side but does not exist on the right side. So for left outer joins you can only broadcast the right side. For outer joins you cannot use broadcast join at all. But shuffle join is versatile in that regard.
So then all this considered, broadcast join really should be faster than shuffle join when memory is not an issue and when it’s possible to be planned.
Then let’s more talk about how broadcasting work in Spark.
Broadcasting works in Spark by broadcasting the data from executors to the drivers and then have the drivers broadcast it back to the executors.
So in other words driver does it collect to get all the data and then it broadcasts the data back to the executors. So as you can see this is little inefficient because driver now becomes the bottleneck and driver typically in our workloads are not provisioned with a lot of memory because it really doesn’t do very much. But when broadcasting is involved we have to make sure driver has enough memory.
And then going a little step further, broadcast joins in Spark work on top of this broadcast mechanism.
And it uses broadcast variables to broadcast the data back to the driver, first collect the data to the driver and use broadcast variable to broadcast it to the executors. And then when planning which join strategy to use, it plans based on sized estimations on a pre-join basis using this configuration threshold.
And there are two types of broadcast joins in Spark, one is broadcast hash join where the driver builds the in-memory hashtable to distribute it to executors. And the other one is broadcast nested loop join where it’s basically a nested for loop.
Broadcast nested loop is very good for non-equi joins or coalescian joins but it’s disabled in our workload for stability reasons and the main reason is that, we don’t know whether the customer’s going to do. The customer in-build avatory of pipelines and the broadcast nested loop join is known to not perform very well on large input data sets. So for stability reasons we prefer to use just sort merge join in those cases.
Okay, then on to how we are improving broadcast joins.
The goal of course is to have more broadcast joins because we have just said when possible we really want to do more broadcast joins because it’s more performant.
Then the question is, is broadcast join really a better as long as it fits in memory? Because that seems to be the really restriction here other than the join type while the answer actually is, as we will soon see is it depends.
The methodology that we are going to use, is we are going to increase threshold and basically see what breaks.
And as a spoiler alert, many things actually go wrong before the driver runs out of memory and we will soon see why. So to start off simple, let’s just consider a single join.
We have the experimental set-up is we are using a TPC-H dataset if you are familiar with it, the 10 giga dataset version of it. We have 60 minute rows for the fact table which is called lineitem and that joins to the second largest table called orders which has 15 million rows and the join is on the join key. The driver has one core and 12 gigabytes of memory, the executors has 18 cores and 102 gigabytes of memory.
And here are the results we’re simply doing the join and we are comparing the results of the lineitem and you see that sort merge join is actually faster than broadcast join. So why is that the case? Well, I have kind of mentioned this earlier that if you dig into the logs which you see is that the driver spends time collecting all the 15 million rows from orders back to its tone GVM, builds the hashtable and then broadcasts the hashtable via a broadcast variable to all the executors or in this case, only one and the executor de-serializes the hashtable. So this is kind of inefficient because the executor has the data but it has to send it to the driver just so that the driver can send it back to the executor. So we are kinda giving a slow merge join all the benefits here by only having one node so you don’t have to worry about network but in this case, the broadcast join actually incurs a lot more but network than with usage than sort merge join.
Then what is the solution? Naturally the solution is to, not to do driver set broadcast.
So the answer is executor side broadcast.
And with it this based on the prototype from pre-existing Spark (mumbles) and then added our own improvements as we see comparative prototype differences. And the concept of this is simple, basically data instead of being sent back to the driver, it’s being sent directly from executor to executor and the only thing driver does is what it always does which is keep track of where the data are.
And comparing the two implementations, the driver side broadcast has to deal with its own hashtable whereas the executor side doesn’t so that’s better and less data shuffled across the network because executors communicate among themselves without having to talk to their driver for data work loads.
And I guess another con with the executor side broadcast is that because the data exists entirely in executor, it’s a little more tricky to get the size information of how big the broadcast actually is to be shown in the OI. But all things considered, I think it’s safe to say that executor side broadcast hash join is better than driver’s side. Okay, then let’s run the results again and see what happens. So we have the same graph as we had before but now with executor side broadcast and as you see, yes, executor side broadcast is faster than driver side however, it’s still not as fast as sort merge join. Well, why is that the case?
Well, to understand what’s really happening here let’s compare the cost model of the joins. So start off by saying, assuming we have n cores, we have two tables A and B and B is the smaller table which means we want to broadcast it.
The steps we need if we consider the cost, first we need to read A with n cores sorted and then write A using n cores again.
We need to do the same thing for B and then finally we read those A and B using n cores and then perform the join.
If you think that only I/O is the most costly operation in this case, which usually is the case and if you consume that reading and writing takes about the same time then the total I/O cost comes down to be roughly three A over n cores and three B over n cores because we didn’t write it in total three times.
Similar thing for broadcast hash join, again assuming number of n cores and we have A and B, what it does instead now is read B using n cores build the hashtable and then write it out using a single partition. We can maybe write it out first and then build a hashtable at a similar cost. And then for the second step, you simply read A using n partitions and for each partition you read B using a single partition and you perform the join.
Again, if you only considered the I/O cost then the cost is roughly read A using n cores, read B using n cores and then re-write B.
If you’re comparing these two costs together, you subtract them. Then you see if after things cancel out it’s roughly the time it takes to read A and B using n cores twice versus rewrite B using a single partition. And if you cancel out more parts of it, then you’re basically getting comparing sort merge joins versus broadcast join as the cost of reading A and B to combine using n cores, comparing that with reading B using a single core. So if you squint a little bit and let’s say the size of B is one then the very back of envelope analyzed analysis that I have been doing right now basically it tells you, if A plus one is greater than n then sort merge join is slower because in this case, if A plus one over n is greater than one then sort merge join is slower.
And from this formula then you can sort of deduce two insights, one is the more cores you have, the better performance a sort merge join is going to have because it benefits from the parallelism and second, the larger the A is, the better performance broadcast join is going to have because it benefits from not having to read A multiple times. Okay, now let’s try the benchmarks again and this time, I’m varying the number of cores being used maintaining the memory and the executor, but only changing the number of cores running the exact same query. And this is where you see a difference now between, executor side broadcast, even driver side broadcast both of them compared to a sort merge join. And if you go back and think about the rough Math we did, in this case the big table is four times the size of the small table, so four plus one is five which means if we have more than five cores, then broadcast join is going to read better. If you have less than five cores then sort merge join is gonna do better. And that calculation works remarkably well in this diagram because you see that six cores is where we have the break-even point and as you decrease the number of cores, broadcast join start to out perform and as you increase the cores, sort merge join is starting to perform.
Then let’s go to the other one, we have the, what if we increase the size of A?
We also see that broadcast join is better when the size difference becomes greater because then the A becomes larger and that divide by the same size B is going to result in a larger number and that means sort merge join is going to be slower.
Okay, and then here is the sort of the money slide that really showcases the benefit of executor side broadcast. In this case, not only I increased A, I also increased B. So B is now 150 million rows instead of only 15 million rows. Sort merge join takes very long time to shuffle on line A as expected but executor side broadcast not because it doesn’t have to shuffle A again, it’s much faster. But driver side just completely fails because it’s not able to broadcast and this is interesting because if you kinda digging deeper and understand the statistics. So the drawing key is an integer. If you have 150 million real integer that comes out to be a gigabyte or so of space but once this lives in-memory, this actually becomes high digit gigabytes.
So eight to 15 gigabytes of space, it takes for the hashtable. And driver because we only configured it to be 12 gigabytes, you cannot broadcast that much data. And you cannot even, I guess collect the data back to the driver because its not able to have enough memory to contain it.
And therefore drivers that broadcast doesn’t really work and executor side broadcast is clearly the winner here. And this is sort of what this technique wants to showcase and that’s we’re able to prove that in this experiment.
There are some other experiments that we did and also some other improvements we did and I’ll just mention some of them here. One is pretty important actually to tune your starting JVM size, so the Xms and also it’s pretty important to tune the Metaspace size which is where JVM keeps all of it’s allotted classes because if you don’t set them to be big enough initially, then JVM is going to conservatively do GC first before it expands to take up all the allotted space. And because we don’t want to have GC in general to interfere with performance we want to set them up to be larger.
For my use cases, I set Metaspace size to be one gigabyte just to be sure. The next time, I always set it to be the same as Xms which is the largest size you can use for the JVM.
Another thing which you notice is the broadcast variables are all being fetched synchronously because during the codegen, the codegen is going to decide which variables are being read first and because all of the threads are executed in the same code all of them are blocking at one third at a time to read the broadcast variables. So instead of doing that, we made that concurrent.
There are also other memory improvements we made that are related to whole-stage codegen in context of broadcast joins but since they’re not directly related, I won’t be talking about it here but we do contribute, we do plan to contribute all of this back open source. If you click on the link, it will take you to a page where it tells you all the issues that we face and how we plan to address them.
Okay, moving on, we will talk about a production case study.
So, first before we dig into the case, we’ve run some statistics to understand the type of joins that we are running in production and as you can see on the diagram, about 98% of our joins are either inner joins or left outer joins and that’s great news because we don’t have full outer and this means most of our cases, vast majority of our cases can hopefully benefit one way or the other from broadcast joins.
And then just to understand, just because the join tab is possible we need to see that the threshold is being met. So in this diagram, what we’re showing is the threshold estimate for the smaller side of the broadcast and just to see what is the threshold we need to set in order for broadcast join to be planned. And in this case you see, if we set the default, if you set the threshold from the default, from 10 megabytes to 100 megabytes then 80% of our joins can be broadcasted.
Again, you may be thinking about this, is this really big data and I want to remind you that we are focusing here on really complex pipelines. So the data may not be big but they take a very long time because of the sheer number of physical operators that we need to take through.
Okay, so then back to the customer pipeline that I kind of hinted at you earlier, this is the same picture with many hundreds physical operators. It may look very complex but if you dig into it, what you’ll see is really only 30 tables. So we populated this with example data from the 29 of them being 10,000 rows and one of them being 3 million rows.
And if you executed this join, execute this pipeline what you see is roughly 160 joins. The number of joins actually changes depending on the threshold. They can increase by a bit and decrease by a bit, and we are running this with the same set-up, so with 18 executor cores and assuming that memory is not a concern here.
And the question we want to ask is, can broadcast joins make the pipeline run faster? In other words if we increase the threshold, can it run faster?
And here are our benchmark we run, we set the threshold to be zero megabytes, 10 megabytes and then one gigabytes. Zero is the case where you have nothing broadcasted, 10 is default and one gigabyte is when we are assuming basically everything that can be broadcasted is broadcasted.
And as you can see, the sort merge join case is performing not very good but when you use some broadcast joins in the default it’s about 20, 30% improvement. And as noted, the executor side broadcast is always faster but then once we increase the broadcast threshold to one gigabyte, what you see is actually is not very evident that there’s a performance key.
Well, then is this really, okay, maybe this is not a (mumbles) distribution. What if we increase this three million table again and see what happens?
And the story is actually similar. With 10 megabytes, it’s better but with one gigabyte it’s actually slower and, can you guess why?
Actually, the answer is actually something I’ve mentioned before.
It has to do with joins and if you think back to the formula that I mentioned before the cost of broadcast join really depends on the relative size between the table, the two tables and in our case our pipelines turns out to have many self joins than left outer joins where the left side is actually the smaller side and what that means is that when you increase the threshold to be as big as it is, the big table actually gets broadcasted and that’s not very ideal because then you have to overhead doing the hash tables, sending across the network and broadcast joins takes up a single thread for this last join case. So that reduces the parallelism. And most importantly it takes up storage memory because the hashtable needs to stay in memory and because of that, self joins made, basically broadcast joins not perform as well as it should. So these are something that we really need to look into and see if there’s something we can improve in terms of just improving the performance of self joins for large tables using broadcast joins.
So that is the end of production use case and lemme leave you with some closing thoughts. So, what is the conclusion that we made here? Well, broadcast joins are better but with some caveats. One thing we know for sure is executor side broadcast is better than driver side broadcast because in all diagrams we see, executor side performs equivalent or better than driver side, so that’s good. But when you are evaluating whether broadcast join is better we need to consider, as we see in the formula the number of cores available, the relative sides of the bigger and smaller tables and then of course, the table that you are broadcasting needs to fit in your memory. And finally, you need to watch out for self joins and outer joins where the thing you cannot broadcast is actually the smaller table.
Looking forward and thinking about future improvements, I think the adaptive query execution in Spark 3.0 is going to help tremendously because for one thing, it’s going to make size estimations much better. It’s going to look at the size estimates as the query runs and use that new statistics and more accurate statistics to decide whether subsequent joins are being broadcasted or used in sort merge. Another thing that you see is, when you are broadcasting 150 million rows of hashtable, it actually takes 50 seconds, I think in my benchmark to build the hashtable for 150 million rows. For large-scale workloads, maybe not so bad but when you have workload total run time, being in the lower minutes, the time becomes significant. So if you can use multiple cores, that’s great. Although this kind of goes against the executor model of using one thread per task, maybe we can work something into the model to accommodate this.
And because hashtables take so much space, we should work to make it more efficient. There’re implementations of the hashtable where if the hash key is along, then it uses a more efficient hashtable. In my case it saved memory by about 30, 40% but I don’t think it’s enough in the larger context of things. And finally, a little bit of forward looking, future looking almost.
We can actually make sort merge join better and make it handle skew and you do that by basically broadcasting the skew values if you shuffle around. If you send around the broadcast, if you send around the skew values across all other nodes then the skew is effectively handled. And in essence, Adaptive Query Execution will make it better by being able to broadcast the skew values, it’s going to make sort merge join even faster. Just to make joins run faster in short. And with that, that concludes my presentation. I wanna take a few seconds just to thank my colleagues for making this talk possible. So thank you Kevin, thank you Mike, thank you Alex and thank you Andre and all of my co-workers that have provided feedback. With that, I can take questions. Thank you.
Jianneng Li is a Software Development Engineer at Workday (and previously with Platfora, acquired by Workday in late 2016). He works on Prism Analytics, an end-to-end data analytics product, part of the Workday ecosystem, which helps businesses better understand their financials and HR data. Being a part of Spark team in Analytics org, Jianneng specializes in distributed systems and data processing. He enjoys diving into Spark internals and published several blog posts about Apache Spark and analytics. Jianneng holds a Master's degree in EECS from UC Berkeley and a Bachelor's degree in CS from Cornell University.