Performant Streaming in Production: Preventing Common Pitfalls when Productionizing Streaming Jobs

Download Slides

Running a stream in a development environment is relatively easy. However, some topics can cause serious issues in production when they are not addressed properly. In this presentation we want to cover 4 topics that, when not addressed, can lead to serious issues for streams in production. The first topic considers what happens if input parameters of your stream are not properly configured. This can result in your stream having to suddenly process much more data than anticipated, causing considerable performance degradation.

The second topic will be about stateful streaming parameters and the consequences of not tuning these parameters correctly. This can lead to infinite state accumulation, and can be another source of degraded performance, as well as memory issues. In the third topic we discuss Structure Streaming output parameters. When not addressed, this can lead to a severe case of the small files problem. In the final topic, we will cover what to think about when you want to modify your streaming job while it is already in production and checkpoints are involved. We will provide practical hands-on examples on when aforementioned issues manifest and how to prevent them from occurring in your production streams. By the end of the talk you will know what to look out for when designing performant and fault-tolerant streams.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hello, everyone, and thank you for joining us for the “Performant Streaming in Production” presentation. So this is a two-part series. And in this session, we’re going to cover part one, which I will be presenting.

So a quick introduction about us, my name is Max Thone. I’m a Resident Solutions Architect at Databricks and my colleague who will be presenting part two, is Stefan van Wouw, who is a Senior Resident Solutions Architect at Databricks as well. So what are we going to cover in these two sessions?

So, in the first part, we are going to discuss briefly what actually is a Spark structured streaming ETL pipeline. And what parts of this stream should be tuned. Once we have covered this, we’re going to talk about the first set of the parameters, which are the input parameters. In the second part, Stefan will take you through how to tune the state parameters in a Spark structured streaming pipeline. And lastly, he will also discuss how to tune the output parameters of the stream. Once he have discussed how you can tune your stream, Stefan is then going to discuss how you can deploy your stream into production and what considerations you then need to take into account.

So, I’m now going to talk briefly about what is actually a stream.

Suppose we have a stream set up like this

So, a typical Spark structured stream pipeline can look like something like this, right? Where it’s a message source based stream where we use Kafka as a source and Delta Lake as a sync. (beeps) The query can look something like as you see on the right. So on the top right in the dark color code, that is all the code that considers that we want to read from a Kafka message source. Direct part of the code are a bunch of transformations that we don’t want to do on our input streaming data frame. Then in the yellow part, we want to write our streaming data frame to a sync. In this case, the Delta Lake sync. Lastly, we have a bunch of parameters where we want to configure, for example, how often we want to rent each mini-batch, which in this case is one minute, and where we want to store our checkpoint files.

The stream can, of course, also look like this. Where instead of a message based source, we use a file source instead. For example, a Delta like file source. (clicks) The stream, of course, looks very similar. The input part of the stream now uses Delta Lake as a source. But again, in the red part we just do another transformation.

Or a stream like this

We, again, right to Delta sync. And lastly, we then have these parameters where we trigger, for example, where we’ve considered trigger as the checkpoint location.

Maybe even a stream like this (joins)

There are also more complex streams. For example, the streaming query where we want to apply a join to a static data frame. Or in other words, where we actually make use of some sort of external state.

The query could look something like this. Again, we have the readStream part, but then we also do a join on static data frame before we write our streaming data frame to a Delta Lake sync.

We also have streams where we have stateful operations. Most notably groupBy operations. In this example that you see here, we groupBy some column called item ID and then do a count before we write our data frame again to a Delta Lake sync.

So what is important in all these streams that we saw is that there are two ways that dimensions can be scaled once the stream grows bigger. It can grow bigger in terms of input data that you see on the left. So we have records from one to N, but N can of course grow very large. Especially when you move your stream from some dev datasets to a production dataset.

The second way a stream can grow break is through the states that can grow larger and larger. So here we have some sort of state that comes in from record end to end and that can also grow larger as well.

So on the right, you see a graph and this kind of shows you how a stream can scale in both of these dimensions. So horizontally we have our mini-batch size that can grow bigger and bigger.

Scale dimensions

And vertically we have the state size that can grow bigger and bigger as well.

So the main question is if a stream grows bigger and bigger, especially in cases where you move, for example, a stream from dev to production. How do we then correctly tune the parameters to make sure our stream can still handle these increasing input sizes and state sizes.

Let’s use this example! KR

So to explain all this, we’re going to make use of this example stream that I’ll explain here. We’ll be using a Delta Lake file source and then we will make– So make use of a Delta file source. The data frame that we’re going to use, that we’re going to read from our Delta Lake file source will be a simple sales data frame that contains a sales data by timestamp.

We’re then going to join our streaming data frame to some static data frame, where we want to look up the category for each item that’s in the sales table. And this will then be through a stream static join. Finally, we don’t want to do an aggregation because we don’t want to, for example, have to total sales for each aggregation to get to revenue per category.

So in the first part, we’re now about to discuss the parameters. So as we just saw, we have a Delta Lake file source from which we stream our sales data frame. We are now going to talk about what parameters we really need to think about when reading this data frame.

Limiting the input dimension

So more precisely what we really want to see is how can we limit the input dimension? Or how can we really manage to limit the size of each mini-batch that comes into our streaming query? So more specifically, we want to limit N into O and timestamp complexity.

Why are input parameters important?

Yep. So why are input parameters important? Well, most importantly, as I just mentioned, when you configure your input parameters correctly it really allows you to control the size of your mini-batch. And if you have an optimal mini-batch in your Spark structured streaming query, this means you use your cluster optimally. If you do not have a optimal mini-batch size, this can really lead to performance cliff especially when your mini-batch is way too large. So when you have to shuffle step in your streaming query, this can then lead to shuffle spill for mini-batch that’s too large. The second thing that can happen is that a different sequel query plan is happening under the hood. Because for large mini-batch, for example, a sort-merge join can occur instead of a broadcast join, which also can lead to sub optimal performance.

What input parameters are we talking about?

So what input parameters are we talking about? Well, actually we’re pretty much talking about only one input parameter, but it’s always just called differently depending on what source you use for your streaming. When it comes to file source, we’re talking about the maxFilesPerTrigger. Which essentially denotes the max number of files you want to take to your stream for each mini-batch. For a message source that equivalent is maxOffsetsPerTrigger for Kafka, fetchBufferSize for Kinesis and maxEventsPerTrigger for EventsHubs. But they all accomplish the same thing, right? They all allow you to control the mini-batch size. And again, this is really important in relation to the number of shuffle reputations when you have a shuffle step.

Input Parameters Example: Stream-Static Join

So in our example we’re going to do a stream-static join. So we’re going to join a streaming data frame through a static data frame and this induces a shuffling step. And as we just mentioned, when we have a shuffling step we need to think about the size of the shuffle partitions in our stream. And because of this, when we are doing the readSteps, So salesSDF, read stream from Delta, that’s really when we need to think about how big should our mini-batch size be in order to make sure our shuffle partitions are not too large.


So what will happen if we do not set maxFilesPerTrigger when reading our stream? Well for Delta, the default option is 1000 files. In our example, each file is 200 megabytes. So if we would just leave maxFilesPerTrigger to its default value, this would mean we would have mini-batches of 200 gigabytes. Which is, of course, extremely large and this would lead to massive spill. And this would lead to extreme performance degradation.

For other sources, such as Kafka or other file sources you don’t even have the default option of 1000, but it will be unlimited. So then you will get a mini-batch that tries to take in all your data, which of course also will not run smoothly at all.

So next I’ll show you a demo to show what happens when you do not tune files maxFilesPerTrigger.


In this demo, we’re going to look at how to choose input parameters for Spark structured streaming pipeline. But tuning our input parameters correctly we make sure our cluster is fully utilized in terms of CPU memory. We avoid performance loss due to shuffle spill. Finally, we make sure that we obtained a optimal sequel query plan for our structured streaming pipeline. This ensures we have the highest throughput per second that we can get. We’re used to following data sets. So first of all, we use a sales streaming data frame that uses the Delta table as a stream source that contains the following schema that you can see here in Command 10. So it has the timestamp and the item ID and then the amount of sales associated with the timestamp and item ID. Secondly, we make use of static data frame denoted as item DF, which has item ID at the category as it schema. And the goal then, of this D demo is to join our static data frame item DF to our streaming data frame sales SDF.

To read in our streaming data frame, we make use of the following helper function, construct input stream, which takes in the table name which will be our Delta file source. And next it will take in our input parameter, maxFilesPerTrigger, which you can see here. So how do you actually set maxFilesPerTrigger? Well, you put in a dot option after readStream and then in there you say maxFilesPerTrigger and then you put in the parameter that you want. So in the first case, we’re going to join our sales SDF to item DF. But we’re going to do this while making use of the large maxFilesPerTrigger parameter when reading in sales SDF. So in command 15, you can see the code that we use. First of all, we set our shuffle partitions to 20, which is equal to the number of cores we have in our cluster, which is also 20. Secondly, we construct our input stream or sales SDF and we set maxFilesPerTrigger to 40. Since each file in our Delta source table is around 200 megabytes, this represents a mini-batch of eight gigabyte on disc, which is of course, quite large. In line three we see then, that we are joining our sales SDF to our static data frame and as such, we perform the stream static join over here. Finally, we use the helper function called right to know up to trigger the stream. As you can see in the dashboard, but we achieve a processing rate of around 8.2 million records per second. Which is actually not too bad. But now let’s have to look at Spark UI.

So we can see in the stage in which we perform the join, which is here stage 79. That we actually have significant amount of shuffle spill. 20 gigabytes of data is actually being spilled from worker nodes to disc. And this represents a severe performance degradation. And that’s because the worker nuts has to keep interacting with the disc in order to perform the join. And in a good scenario, you would really want that the worker nodes contain all the data in memory instead of interacting with the local disc.

So the goal is in this demo to somehow remove the shuffle spill. Now, before we leave the Spark UI tab I also want to quickly show you what join is actually being performed. To this end We go to the SQL tab and then we go to the SQL that’s associated with our job ID. So let’s just go to ID 84.

And here we can see the SQL plan. So on the left you can see the streaming data frame that’s being read. Here we also see the 40 files associated with files per trigger and on the right, we see the item DF, which is our static data frame. And then these data frames are joined together by making use of the sort-merge join algorithm under the hood. And this is basically the bread and butter for any join in Spark. And this will become relevant in the later part of the demo where we will try to optimize the SQL query plan for this join.

In any case the main take away for this particular demo is that we have significant shuffle spill and this will lead to a reduced performance. Because worker notes will need to read or write shuffle files to disc. And this leads to sub optimal use of your cluster. So as we just saw, right, by not tuning maxFilesPerTrigger we got shuffle spill and this has led to streaming query that is perhaps suboptimal.

Input Parameters: Tuning maxFilesPerTrigger

So the next question then of course arises, how do we actually tune maxFilesPerTrigger? But as we already said before, it’s really closely tied to the shuffle partition size. So let’s first have to look at two rules of thumb. So first the fault in order to make sure that your shuffle partition size are optimal we want them to be a hundred to 200 magnified. This ensures that you will not get shuffled spill while you also make optimal use of the memory of your cluster. The second rule of thumb is that you want to set your shuffle partitions equal to the number of cores that you have. And the reason for this is that this tends to give you the best trade off between the throughput for your stream and the latency for each batch. So what you should then do using these rules of thumb is that you use the Spark UI to tune maxFilesPerTrigger until you get to this optimal size partition. So this means you just start with some maxFilesPerTrigger number. So perhaps then, you then run the query, You look at the Spark UI, you look at the size of the partition and based on what you see, you rerun the stream for a new number for maxFilesPerTrigger.

[ PLACEHOLDER] DEMO OUTLINE – Demo 2: Show how to tune maxFilesPerTrigger based on Spark UI

So in this demo, we’re then going to show what the string will look like once we have tuned our maxFilesPerTrigger.

So we’re now going to continue our input parameters demo by reducing our maxFilesPerTrigger, so that we are going to get rid of our shuffle spill. ‘Cause once we get rid of our shuffle spill it will mean that our mergeSortJoin is much more efficient. And this internship means that a higher throughput will be observed. So how do we tune this maxFilesPerTrigger? Well, it turns out that tuning maxFilesPerTrigger in this case, in our stream study join case, is closely related to how large our shuffle partition should be. So before we tune our maxFilesPerTrigger, we should look at two rules of thumb. First of all, the first rule of thumb is that the number of shuffle partitions should be equal to the number of cores we have available in our cluster. So in our case, we have a cluster with 20 cores and we want to set our shuffle partitions to 20 as a result. The reason we want to set this equal to number of cores is because this tends to give us the best trade off between latency and throughput. And this tends to be quite a general rule of thumb for any kind of Spark structured streaming ETL pipeline. The second rule of thumb is that we want one shuffle partition to be around 150 to 200 megabyte. This way we then make use of optimal use of our cluster memory while avoiding shuffle spill. However this assumes we are using a memory optimized cluster that has around eight gigabytes per core. So it will be different for other type of clusters. Most notably compute optimized nodes that only have two gigabyte per core. Then you would have, smaller shuffle partition size. So how do we then tune maxFilesPerTrigger? Well, we should look at our Spark UI and then specifically at the stage where the join happens. And then we should look at what our shuffle partition size is. And we should tune our maxFilesPerTrigger as long as, until we no longer have shuffle spill as we have shuffle partition size of around 150 to 200 MB.

So in this case, we actually already did the tuning. We already looked at Spark UI quite a bit and then we figured out well maxFilesPerTrigger is six, should be pretty optimal. And indeed, when we look at processing rate we’re now at around 10 million records per second processing rate. And this is of course a much faster than 8 million. We now actually have 20% improvement in throughput. Also when we have to look at our Spark UI, we can now see there’s no longer any shuffle spill. There’s only the shuffle rate which is perfectly fine. And so there’s no longer any shuffle spill in our join step. Also, when we have to look at just summary metrics, we see it at one partition size is around 175 megabytes, per shuffle partition. And this is of course right in this range of 150 to 200 megabytes per partition. So this indicates that we make optimal use of the memory of our cluster.

So brief summary, by reducing our maxFilesPerTrigger from 40 to six, we now have removed shuffle spill and we have increased our throughput from 8 million per second to 10 million per second.

Tuning maxFilesPerTrigger; Result

So as we have seen in our demo, we’ve got a significant performance improvement by removing to shuffle spill in the Spark UI. So we reduced the number of maxFilesPerTrigger to six files, our number of shuffle partitions was tuned to 20, which is equal to the number of cores that we used into demo. And because of this, we increased the number of records process per second by 30%. So that’s of course a significant improvement just because we removed shuffle spill.

Sort Merge Join vs Broadcast Hash Join

So we’re not done yet, right? So we have now optimized our stream for doing a Sort Merge join but it turns out that our static data frame we used to join in our demos is actually small enough to broadcast it. Broadcasting essentially means that we copy paste our static data frame to internal memory of all the worker nodes and because of this, you can make use of a Broadcast Hash Join which means that we can join our static data frame through the mini-batch of our streaming data frame without using a sort or a shuffle. And this can really lead to significant performance improvements. So we will see in the next demo that this will lead to a 70% increase in throughput. Because of this, we also can increase our maxFilesPerTrigger because our shuffle spill is no longer really this upper limit that we need to take into account when tuning maxFilesPerTrigger. So we can increase it by a lot. This is also something then that we will see the next demo. So in our last demo, we showed how we can tune maxFilesPerTrigger in order to create an optimal stat extreme join or more specifically to optimally join our sales STF to item DF. However, it turns out we can get an even better performance. And this is by actually changing our sortMergeJoin to a Broadcast Hash Join. We do this by broadcasting our item DF table. Broadcasting essentially means we copy the data of our static data frame into the memory of all our worker nodes. Because of this, when performing to join a shuffle step and sorting step can be avoided. And this can enable a throughput increase of up to 70% as you can see in a processing rate diagram below. We now have achieved a throughput of 70 million records per second. Which is almost 7 million records per second higher than 10 million records per second. So the takeaway here is, of course, to perform a Broadcast Hash Join whenever that’s possible. But this is only possible when your static data frame is small enough. And we’re typically talking in the region of a hundred or a few hundred megabytes.

Just anything bigger than that then a Broadcast Hash Join typically is not possible. So how do we actually execute a Broadcast Hash Join? We simply apply the hints with broadcast as a string to item DF which you can see in command three.

So let’s have a look at our Spark UI.

So when we go to the SQL page, now click on the query that’s associated with our static stream join.

We can actually see that there’s no longer the Sort Merge Join rate but instead there is a Broadcast Exchange and a Broadcast Hash Join on the left. So the Broadcast Exchange is the step where our item DF, which is here on the top right, gets copied into the memory of all our worker nodes. Once that’s being done, the query plan simply executes a Broadcast Hash Join. And Broadcast Hash Join does not need any sort or any shuffle, so because of this, we do not have to rely on any shuffle step. And because of this, we also do not have to care anymore about potential shuffle spill. And this in turn means that we can then increase the maxFilesPerTrigger to much higher than six because the mini-batches can now be really large because there’s no longer this upper band over which we would get shuffle spill. So in this particular example, we actually increased maxFilesPerTrigger from six to 20. And we see that the query is still running perfectly fine.

So the question then arises is how should we choose maxFilesPerTrigger once this upper limits that would cause shuffle spill it’s no longer an issue? Well, it just turns out that the processing rights will always increase with increasing maxFilesPerTrigger but the batch duration will also increase. So I think the main recommendation here is to first think about what do you really want your batch duration to be? So say you wanted to be a max of two minutes, then you can increase your max files per trigger accordingly up until you get this batch duration of two minutes. In any case, whenever you can do a Broadcast Hash Join this is definitely something you should think about because it does tend to lead to much higher processing rate of your streaming query.

Input Parameters: Summary

So as we just saw in the last demo, we increased our stream to 17 million records per second, while we started with 10 million records per second. So here’s just a brief summary of what we actually did to achieve this result. So first of all, we made sure that the number of shuffle partitions was equal to the number of cores. Next we tuned maxFilesPerTrigger so that we would end up with 150 megabytes to 200 megabytes per shuffle partition. Lastly, we checked the Spark UI and we made sure that the query plan was using a Broadcast Hash Join instead of a Sort Merge Join. And together this has really increased throughput again from 10 million records per second to 70 million records per second, which is an improvement of 70%.

Performant Streaming in Production: Part 2

And this brings us to the end of part one. Thank you very much for listening. – Hello everyone. And welcome to part two. My name is Stefan Van Wouw Max already introduced me, so I’ll stay brief.

So in this part we’re actually going to continue to see how we can tune our stream. So in the previous part, Max focused on the input parameters. And in this part, we are going to focus more on the state parameters, the output parameters and something regarding deployment.

So first off state parameters. So we actually need to make sure that we limit the state dimension of our stream because if we don’t, then it might actually be possible that the number of records that we compare against will grow overtime.

Limiting the state dimension

For example, if you’re joining against the data set that’s keeps growing over time. So Max already explained how the input size can be limited. And we actually want to now also limit the number of records that we compare against so that we can have a reliable performed introduction.

So first off let me explain a bit more about what we exactly mean by state because people might have different ideas with what we mean by state.

So with states– Yeah, so with state we can actually mean two things. But we actually stick to the more conceptual explanation of state which covers both. So first off we can actually consider any operation in structured streaming that uses the State Store as a backend. So we can think of things like windowed aggregations, running counts, etcetera. Drop duplicates is one of them. And for example stream stream-joins. These all use the State Store, which is stored in the checkpoint directory of the stream. Which means it’s bound to a specific deployment office stream which is an important thing to note. Then secondly, we can also do operations in the stream where we actually do not use the State Store at all but we might actually start a state somewhere else. So for example, we could use the Stream-Static Join at the next show, or do things like merge operations for Delta, etcetera, etcetera. These are all do not require the State Store but we still consider them state.

Why are state parameters important?

So why are these state parameters important? So actually optimal parameters means optimal cluster utilization. And if you do not control them properly strange things can happen in production that are actually unexpected. Things like slower performance overtime, every shuffle spill, or even out of memory errors. And actually this is quite a tough problem to solve for because you might not see the same issues when in development, because your data set sizes might be way smaller.

What parameters are we talking about?

So what parameters are we talking about? So we can split them into state source specific. Again, the State Store stored in checkpoint directory used by state for operations in the stream. And on the other side we have State Store agnostic such as Stream Stream Join, Stream Static Join, sorry. So on the State Store specific side, we actually can tune a couple of different things. You can use watermarking to limit how much state we accumulate and how much comparison we do against the history. So in short watermarking is actually a way to limit the state and what we can do, we can set a watermark on the stream, which is basically a point in time after which we no longer consider data if it’s late arriving.

And another thing that we can do on the State Store side is we can actually select what backend for the State Store we use. And some backends might be able to store more state with a smaller memory footprint than others. Here we then go to the agnostic side. So for example, just doing a Stream Static Join with some other table, we can actually limit the state by providing query predicates such as limiting how far we look back, like on a date column or something like that.

State parameters example

All right, so actually in this state parameters, example, we’ll use the same code example that Max used, so he mainly focused on point one and point two that you see on the right. So reading the stream and actually joining it together with the static data frame in .2. And here we will actually focus a bit more on point three which is actually aggregating the sales per item, per a certain time in the forwards can be for example per hour. And one thing that I want to note here is that if you look at the point two on the right side of this slide and the join this is actually not using the State Store. But we would consider it the state that is joined, so the item table. And in the second part actually at point three with the group by some, this implicitly uses the State Store in the checkpoint directory. And this is important to note because it will make you understand the demo better. And we’re actually going to show a demo on this which is actually one of the longer demos in this presentation. So now we actually wanted to tune the stateful parameters and mainly we want to do this to limit the state accumulating so that no state explosion occurs. So we’ve actually used the pipeline run so far as a base.

So we actually use all of the settings from the first part that were quite optimal. So broadcasting setting maxFilesPerTrigger and tuning shuffle partitions. So with that, we now actually want to extend the example of just enriching the sales with items by actually doing an calculation using some aggregates here.

So what do we want to do is we want to calculate the sales revenue for certain items over Certain periods. So in this case what we actually can do and this is kind of a little helper function here, is we can actually say we have this data frame we set the specific watermark. So in short, a watermark is the point in time where we no longer consider data that’s older than that point in time. So we have a certain tolerance. So we can set that very high or very low. Meaning if we set that very high, we keep a lot of state in this State Store and very low keep only a little, but then actually late data might miss hit on the State Store and we might not be able to deliver a correct and complete calculation. So there’s a bit of a trade off. Then we can actually do this group by. So in order to calculate the sales revenue, we want to group by a certain time window, which can be of certain size. So for example, for certain minute or certain hour, we actually want to calculate how much items were sold and for what price. And then we aggregate on an item gallery or item ID sample and then we calculate the total revenue here. So what does this look like if we use it help function here? So, first off we make sure that we use the State Store provider that’s the default. So we use HTFS back State Store provider. We then over all the items we want to calculate per item ID for every 10 seconds, what the revenue is for that item. And then we actually keep the history of one year in the State Store so that we might be able to tackle and recalculate later, I think, data up to one year back. So we purposely chose these extremes to demonstrate what happens when you do not really tune anything correctly. So what does this look like? So actually, as you can see, the stream has been running for a while now, and actually it last updated 70 minutes ago, meaning that the last mini-batch only completed 17 minutes ago. This does not really make sense because if you look at the best duration in the middle, it actually only took one minute here and it’s actually going up. So actually what’s happening here is that because we kept so long of a history with such a small time interval to groupBy on, we’re keeping a lot of distinct keys.

And that’s what you can see here. We actually have a key for every time, window and item combination. And with 10 seconds and many different items, it’s actually going to add up pretty fast. So what happens here is that actually the processing rate is going down. The batch duration is going up and the state is getting accumulated. And actually ultimately this will run out of memory. So it might actually already run out of memory on some worker here and it’s now trying to retry. So this is not very good. So we try to avoid this. So one option that we can do is actually not change any of these one year in 10 seconds, but just change the State Store provider so that it can hold more keys. So in that case, we can actually run this much longer before we run into similar problems. So if we look here, we can actually see that by changing it to the RocksDB State Store provider provided by Databricks, we actually start off with a lower processing rate that we would get with the default. So actually it comes at a straight off of lower throughput using this one. But actually can it still keeps on running. Last updated three minutes ago and it just keeps accumulating and you can already see it holds already more keys than the previous one ever got to.

So this doesn’t always end up being fine because ultimately if we don’t limit anything properly that we’ll also run out of of memory at some point.

So what we actually should do is limit the state by doing two things. So first off we can limit it by reducing the lateness threshold that we provide. So we actually, if we tolerate less late data arriving in the stream, we do not have to keep the state for so long, meaning that we can discard that keys much earlier. So what we can actually see here is that this one actually run a similar time, but actually it doesn’t keep track for more than around 60 million keys. It just keeps sticking around there because of the lateness threshold going down. So anything older than five minutes will actually be removed. And then at the same time here we also increase the window size. So that means that we only keep one distinct key for every item ID minute combination instead of 10 seconds. So it’s already six times less keys for any period.

So this actually stabilizes our stream so we can stop here if we want to use the State Store. Actually, I want to show you one more thing before we close this off. And this is that you actually do not have to use the State Store, do to do similar things. So this is kind of a new, in a sense that it wasn’t possible before, but because Delta is now so mature and the new stream APIs allow for it specifically before it’s batch sync, we can actually do something cool.

So what we can actually also do is not use the State Store at all. The one downside of the State Store is that it’s stored in checkpoint directory which you might sometimes need to wipe out and specific to the specific deployment. And it’s not very transparent what’s stored in the State Store. Other than looking at the distinct case you cannot easily read it because it has a specific format. So we’re here in this case, we’re actually not using the State Store at all. And you can easily see this by the arrogation state is missing. Instead, what we’re doing is we created a Delta backstate. So short, this is just a Delta table that we use to keep track of the state. Then instead of using the stateful operators available in the streaming API, what we do is we use for each batch which we bought in our streaming data frame. And then we actually execute this custom function that we wrote, Aggregate Sales Revenue Delta Backed. So quickly looking to give you an idea of what the Delta code looks like, I won’t go into detail. It’s creating a Delta table at the specific location and it’s called Delta Backed state, that’s one. Then the write for each batch looks like this. So we actually do a write stream and then we do for each batch and then we pass the function. And then this is actually for benchmarking, just selecting a new checkpoint location every time we do this.

And then what’s most interesting is actually disfunction, the aggregate sales revenue Delta backed. So what we get in here is the updates data frame which is just a mini-batch and doesn’t contain any previous date. So what we first do first is actually calculate the sales revenue specifically for this data and on a per minute and item ID group.

Then we actually look up the target Delta back state table, which we use both to write towards the output and to actually combine with our input. And what we then actually do is we look up to simulate the watermark. We look up the most recently seen timestamp in this target table, which is the Delta back state. And then we actually calculate the watermark time manually. So in this case, we do five minutes of lateness that we tolerate. And what we then do is we actually merge the newly calculated sales revenue here with the already existing revenue that we already pre-calculated before. And that’s actually happening here in the whenUpdate, whenmatchUpdate class. So we add the new revenue with the one LR existing and if there’s nothing existing for a specific group, so different specific time window and item ID, then we just insert it. And here are some clauses that we apply on the join condition in the merged to make sure that we will not scan the full history every time. But we also look only back as far as five minutes in this case to not scan the entire Delta table every time.

So that’s just a brief summary. And actually, if we look at the actual performance of this, we can see that the processing rate’s actually quite comparable to the States store background. We can only see that the batch duration is a bit higher. So that means that you get similar throughput for more flexibility because the Delta table is easier to manage, you can actually just read it using a Delta read command but the latency is higher.

And with that, this summarizes the demo for state parameters.

State Parameters: Summary

Okay, so you’ve actually seeing now in this demo how to tune the state parameters such that we limit the state dimension. So in this case there are a couple of things that you can do. In short, you can limit the state accumulation by actually setting appropriate watermarks and think about the fact whether you want to use a very fine grained aggregation or more course grained aggregation. So more fine grain details like on an item level versus an item category, for example, means you will keep track of more state. And we have also shown you an alternative of not using State Store to provide more flexibility at the cost of a bit more latency.

So to sum things up basically, we can actually focus on the last topic regarding a single stream, which is the output parameters. So what output parameters should we actually look at when tuning a stream?

How output parameters influence the scale dimensions 1

First off, actually, if we look at these dimensions on limiting and the state size and the input size output parameters, don’t directly influence this for the current stream or the stream that you’re running. Instead, they actually influence this for the state and the input of downstream streams. So actually you can be badly by not tuning anything here, then your stream will perform just fine. But then maybe somebody else, which can actually be yourself, will try to read this data and it ends up not performing so well.

Why are output parameters important?

So actually this main thing that I would be talking about here is the fact that if you generate many small files and that’s actually what you do when you do not take into account any of these parameters, a stream will generate many small files and reading many small files is slow. So anyone then reading your outputs will have a fairly slow job. And that’s actually what you want to prevent. So that’s why they are important So this is just very quick one actually.

What Output parameters are we talking about?

So there are two ways to think about this. So one is you can manually repartition the last stage of your streaming job to make sure that you do not create a lot of files because by default Spark creates one file per partition that’s in the last stage. And if you did a lot of shuffles and you didn’t tune that you probably get to a hundred files every time mini-batch is processed. And another way that you can do is just use Auto-Optimize, which is a feature specific to Delta Lake on Databricks which will automatically choose the appropriate number of files based on the actual size of the data being output.

And actually this is also what we’re going to show to you in a short demo. Suppose we now want to tune our output parameters. Remember the only thing we want to prevent is that we create a lot of small files because that’s slow. So in this demo, I quickly demonstrate that there’s a real performance impact in that scenario. So what we do first off here we have our example that we’ve seen before. So we calculate the item sales per minute. And now to actually demonstrate that generating a lot of small files can cause a big performance impact, we up the shuffle partitions to 1000 and this is actually a realistic scenario if you write to an output that has many partitions on disk.

So we pre-run this and actually turns out that if you do not do anything to reduce the number of files, your output, you will actually read the same data back in around two minutes in this case. And that’s the with out optimize off and not doing any manual repartitioning. And if you look at the same writing the same data out, but then turning the auto optimize to two in this case. So similarly you can do that manually using repartition. Then if we actually read back the data we produced, it only took around seven seconds. So that means that you go from minutes to seconds by just turning the simple setting on. So I definitely recommend you do this. Okay, so what we have seen in this short demo is that actually generating a lot of small files can impact performance very badly.

Output Parameters: Summary

And by just setting some simple settings, you can actually easily gain up to 10X speed difference. But you have to remind yourself of doing this. So we’ve seen input parameters, state parameters and output parameters. Now we’ll go into a couple of small things to think about when you actually have your stream running in production. So first off you might actually decide that for some streams, they do not really warrant their own cluster.

Multiple streams per Spark cluster

They’re so small and you want to, for cost optimization purposes, pack them into one Spark application. That’s actually a good idea if you think of a cost optimization because the multiple streams will run on the same cluster. But in some deployment scenarios, that means that they will use the same driver process. And actually because they use the same process, they could potentially influence each other’s performance. So that’s one thing to think about. That if you have really mission critical jobs, you probably do not want to deploy them together and instead spin up a separate cluster for those.

Temporary changes to load (elasticity)

And secondly we really focus on the number of shuffle partitions and actually ideally you want to set that to the number of cores to get the good performance. But this actually is a bit of a trade off in essence, because if you want to be elastic and flexible to handle a load spikes that you don’t expect, you might want to be able to scale up your cluster for like a specific period of time.

And then scale back down again. But actually if you then already set your shuffle partitions to the number of cores you cannot easily change that because the number of shuffle partitions is bound to the deployment it’s stored in the checkpoint directory. So this is something to think about. If you want to trade off a bit of performance by setting the shuffle partitions, maybe a bit higher so that you have more flexibility in actually scaling it out.

Permanent changes to load (capacity planning)

And finally you have to think of a bit of a capacity planning. So actually some of the settings that we touch upon in this presentation are bound to the deployment and shuffle partitions is one of them. It’s all stored in the checkpoint directory, meaning that whenever you first start the stream and you have this setting fixed and thus, you need to wipe out the checkpoint directory, if you want to change that. So you need to think of a strategy for increasing capacity over time. And also if you use the State Store in the checkpoint directory, you think of a strategy how you are going to recover that state that you then wipe out, if you actually use a checkpoint wipe-out. So this gets us to the summary. Yeah, so to summarize, we touched four different areas, input parameters, state parameters, output parameters and a bit on the deployment side of things. So if we look at the input parameters, you want to limit the input size so that you have a reliable performance stream and actually by tuning the shuffle partitions and enforcing broadcasting when possible, you can actually ensure way better performance than if you don’t look at it at all. Then secondly, the state parameters can be tuned so that you can limit the state accumulation. And by limiting how far you look back, you can actually limit the number of comparisons that happen in each mini-batch and there for keep a reliable state. Then thirdly, on output parameters you can actually very easily prevent generating a lot of small files, which is kind of being bullied to downstream systems that consume your data. And on the deployment side we actually need to do some capacity planning because some parameters that we discussed are bound to the deployment and we cannot simply change them on the fly when we need to.

So with that this presentation comes to an end. Actually, we hope we have provided you with sufficient knowledge to actually go tackle these issues yourself with your streaming job.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Stefan van Wouw


Stefan is a performance and scalability subject matter expert at Databricks. He has a background in parallel distributed systems and has years of experience in the Big Data Analytics field. More recently, he is focusing on deploying Structured Streaming applications at scale, advising clients on how they can build out their pipelines from proof of concepts to production grade systems.

About Max Thone


Max is a machine learning and performance expert at Databricks. He has a background in machine learning and analytics, and has worked in analytics teams in a variety of industries. At Databricks he has been focusing on advising clients with implementing and tuning their Spark production jobs, ranging from solving skew problems to deploying streaming jobs at scale.