Adobe’s Unified Profile System is the heart of its Experience Platform. It ingests TBs of data a day and is PBs large. As part of this massive growth we have faced multiple challenges in our Apache Spark deployment which is used from Ingestion to Processing. We want to share some of our learnings and hard earned lessons and as we reached this scale specifically with Structured Streaming.
Know thy Lag
Reading Data In
MicroBatching Best Practices
Adobe Spark Speculation and its Effects
Calculating Streaming Statistics
Yeshwanth Vijay…: Hi, guys. Today, we’re going to talk about how Adobe uses Spark Structured Streaming at scale. I’m Yeshwanth Vijayakumar. I’m a senior engineering manager and architect at Adobe, specifically with the Adobe Experience platform and [inaudible]. We are extremely high users of Spark and we are hiring people who are experienced with Spark. So Yeah, hit me up if you’re interested.
And now getting into the agenda for today. Going to over multiple high-level topics. We’ll begin with what does lag mean in our structured streaming world. We’ll also go about optimizing our upstream processes and how to write some efficient logic on top of it. And lastly, we’ll also go over streaming statistics to understand some niches about that. Before we jump anywhere into, let me say the tips and tricks, we need to understand the ecosystem that we’re dealing with here.
We have a lot of internal solutions, examples, Adobe Campaign, which is an email campaign orchestrator. Adobe Analytics, which is one of the world’s [inaudible] vendors for analytics software. So you have a fire hose of marketing information flowing into our system. This data can be coming in the form of JSON, [inaudible] or anything that you can think of. All of this is converted into our own JSON schema, which is a marketing standardized, experienced data model, we call it XDM. This in turn is ingested into what we call the Unified Profile. It gives you a 360 degree view for a customer.
The main advantage of this is that you can do a lot of complex targeting and activation queries like, “Hey, give me everybody who did a trial of Illustrator in the last 10 days and have added Photoshop to the cart, but have not made any purchases yet.” So you can do a lot of complex actions and queries using the Unified Profile on top of your marketing data. What happens after this is, any mutations that happen to the Unified Profile, they get fed into the fire hose that is the Change Feed, a Kafka topic, to keep track of the logical changes made to the entities. And then this is consumed by various downstream applications, statistics, generation, et cetera. Again, Spark all the way, and Kafka providing the transport layer.
With respect to structured streaming, we need to first understand the lag. With a streaming mechanism, it’s always difficult, right? What and how are we going to [inaudible]? So Spark then fully gives you a very nice streaming query listener, which we can implement on top of to provide our own monitoring. So the thing that we’re focus today is the on Query Progress method. This basically gets invoked as soon as micro-batch query is successfully executed. So you can see that we have a lot of handles, like progress, start, a number of input rows that got taken in by the micro-batch. We also have other metrics like process rows per second, input rows per second, infiltrate et cetera. So this is extremely useful to get a programmatic handle on what Spark, as in what the micro-batch actually cease or stop. Now we make these metrics and we publish it to our own internal metrics system, you can use Prometheus, [inaudible], any of the things that you have custom-built in your ecosystem.
So this way, what happens is as the query is processing, you kind of know what kind of progress has been made. Okay, how much work did we do? But this is not actually enough, right? Remember we are reading off of a Kafka queue. So when you’re reading off any queue, you need to understand how much progress have you made relative to the load currently in the queue, because that tells you how much more work has to be done.
So what we do for them that is, so this method we’ve kind of taken from this excellent blog that I’ve linked below. So what they do is you get to specify a dummy, you create a dummy Kafka consumer group and then you create a new instance of your streaming query listener which takes in this to make consumer Kafka group. And then what we do is the query progress also tells you what offsets have been read so far. Like some similar to input rate and all that stuff that you’ve seen put here. We also get an idea of what offsets have we read so far. Now these can be commented back into the consumer group. The dummy consumer group that we created, is responsible to keep track of the data that we have read so far.
Now, how does this help? So what happens here is now we can use Burrow which is like monitoring companion from Kafka itself, to keep track of the lag. So you will be able to figure out the lag per topic. Now that dummy consumer that we created since we are tracking the lag per topic, we commit [inaudible] per topic. So we will be able to figure out just from looking at the lag of that consumer group from Burrow, how much work do we need to do more, to keep up with the load on the queue. So this is an example of this random snapshot of a load. You can kind of see we’re going at a pretty high throughput with just… And this is provided by both the consumer, the streaming query listener implementations that we have registered with the Spark context.
The next important thing as I would say, we need to optimize the ingestion. What do we mean by that? So the Kafka, the queue, is the upstream process. Genetic flow would be something like this. You have a topic, you have multiple partitions, and then now this is being consumed by your Spark structure stream lab. Now the data gets sent from the… The driver tells executors what partitions to consume offer from, executors feed the data from the partitions, the business logic gets executed on the executors, and then it gets written to a sync are probably some other side effect takes place from the executors.
Now it’s quite important for us to optimize this part before we step into optimizing the executor logic itself. Because [inaudible] G profiler tried to optimize a logic that happens quite often. But then architecturally speaking, there are some settings and configurations that you could do to optimize your upstream load.
So here we have an example, a read stream from the Kafka topic. You can see the format of Kafka indicating how Spark should actually figure out what do we read, and where to read from. We specify a lot of things, like the bootstrap servers to subscribe et cetera, topics to subscribe from, et cetera, as part of the normal complication. But before we optimize anything we need to lay some ground rules. We need to keep the executor resources constant, because if you keep changing your executor resources, or keep changing your type of executors resources, like CPU type, amount of this that is there, it’s not easy for us to have a solid experiment, so keeping that in constant. The first parameter that we want to change is the max offsets per trigger. For those of you familiar with structured streaming, it deals with creating a lot of micro-batches at scheduled intervals of time. So you’re trigger interval, if you have a trigger interval of say 30 seconds, it means that every 30 seconds you’re going to trigger off a micro-batch to pull data from Kafka.
So now in order to avoid overwhelming the system, we always should specify how many offsets do we want to pull from Kafka in order to make progress on our system. So this is actually controlled by the max offsets per trigger. So if every safe trigger includes every 30 seconds, every 30 seconds Kafka goes and pulls say 10,000 offsets across all the topics that we have specified from different partitions, and distributes that to the executors. Now, this is extremely important because this decides what QPS you will finally end up with. So if you want to go 1,000 records per second, in that case, if you have a 32nd trigger interval, you need to pull at least 30,000 offsets per trigger. So this is something to be extremely conscious of.
The next important thing is you want to make sure that your processing time is less than equal to your triggering time. What do I mean by that? So a lot of the times when we start designing the system, or implementing it, they’re like, “Hey, I want to do a 100k [inaudible].” Yeah, it’s completely fun to have it in our head. But the problem is I can specify 30,000 QPS or 300,000 offsets per trigger, but then once we start processing, the resources at hand might not be enough to actually process that QPS. So we need to keep an eye on… So the processing time, if trigger interval is 30 seconds and processing time is 45 seconds on average, in that case, you’re never going to make up that 15 second gap. You’re always going to be done, running really behind, and you’re never going to catch up.
So the only way to, if you figure out or how much more headroom we have, is to make sure that first pick can offsets per trigger much smaller, and make sure that your processing time is way lesser than the triggered interval, and then kind of do like an exponential approach to keep increasing, so that you figure out what is your sweet spot. The next and most important parameter, after you’ve done the first two things that I talked about is the Min partitions. This is not as well publicized as the other two, but what the Min partitions does is it enables a fan-out processing pattern. So it maps one Kafka partition to multiple sub-partitions. A lot of the time what happens is, and I did the mistake myself, is I was like, “Hey, I think I’m blocked on throughput from Kafka because I have only say five partitions from Kafka.”
I was like, “Okay, let’s go raise a request to increase the number of functions from Kafka.” And then the pipeline team told me, “Hey, you know what? You probably don’t need that. Why don’t you do a bit more fan-out? You’re probably not locked, but it’s [inaudible] from Kafka. And then we explored this parameter. So this way, what happens is every single partition, because it is getting mapped multiples of partitions, you are able to pull in way more data and then distribute it across to the executor’s that you already have [inaudible]. So for a visual effect, we will go to the next slide. But the key idea is you have to rinse and repeat these steps, to be able to figure out what is your actual throughput per core. Once you have decided that, you basically know what your application is capable of, and you can figure out your scaling bounce.
So for the Min partitions visualization. So we have a Kafka topic, let’s say two partitions, now we have three executors. Now, if I say my Min partitions is equal to six, in that case what’s going to happen is you can see executor one and two, of partitions, 1.1, 1.2 and 1.3. So we have sub-partitioned it, and we have allocated across to executors. Similarly, two has been split across executor two and executor three. So this way, your executor course are getting proper utilization and your throughput is not limited by the number of partitions that are there on the Kafka side. What we have noticed impact is, reading as in doing the scan from the partitions from Kafka is extremely efficient, so very rarely are you going to keep those bumps. So as long as you have a good amount of topics to panelize across, you should not have to keep on increasing the partition count on the Kafka side. Increasing the partition count on the Kafka side does have some negative side effects, but I don’t think we want to go in deeper into that rabbit hole right now.
Now. So far we spoke about optimizing the upstream logic, but what about the logic itself? The business logic on the executor side. We need to optimize that too. There is no magic bullet over here. So usually when we are expressing the logic, it broadly falls into two categories. Are we doing it row by row, or are we batching it up? So that is why I’ve split this into two parts here. Whether are we going to follow the map and follow each pattern that we are processing row by row, or are we going to do the map partition, and for each batch [inaudible] that we are processing a bunch of those. So for the map example, you can see we are applying a filter on some dataset that the signal value is greater than 10. And then we have this mapping, the device [inaudible] out.
So the pros for this approach is that it’s extremely easy to code, the cons for this… Because this is the first thing that comes into mind when we’re looking at a data frame, we are visualizing, “Okay, what happens to every row across every transmission that’s going on?” The cons is, this is slow in most of the cases at least that we encountered. So there is no local aggregation out of the box for this. You need to specify an explicit component. A combiner in Spark is one of the things that I don’t like too much about it. Extremely cumbersome to write as an interface. And we have way too maybe individual tasks that are created in order to maintain the performance for the row by row processing performance.
The other main thing is specifically, if you’re going to write to an external sync, or communicate with an external sync, it’s very hard to get the connection management right. So given this, let’s compare it with the map partition and for each batch approach. So the pros of this is we get explicit connection management, because we are doing batch by batch, right? So for every group of records, we can manage the condition. So the batching and the use of conditions is well established. We can do local aggregations using HashMaps at the partition, or any other aggregating mechanism that you can do. You can also use the data frame itself, the micro-batch data frame itself as an obligation step. So you have a much finer control at that point. The cons for this is you need way more memory upfront, and you can’t just say “hey, I’ve converted my map logic to map partition”, and it’s just going to go [inaudible]. You need to tune a bit. So otherwise you’re going to end up without those memory use cases. It’s also very ugly to visualize. Because the map filter and that paradigm is extremely well-established, batch-wise, you need to apply a bit more and see it from a different angle.
The other thing is, it kind of relates to the first one, you might need a bit more extra CPU per task. This is more like a side of it. So if by default you sparked a task to CPU to set to one, now that you’re doing batching on the executor side, you might need to set it to a higher CPU count for parallelism. As in the implicit parallelism that you got with map and for each by the issued number of tasks that got created, now you are responsible for maintaining those tasks within the map partitioning for each batch. So it’s not like this, it just disappear, but you need do not keep that in mind before trying to juice this approach for more performance.
So an example of the map partitions, or for each batch thing. So in this example, what we’re trying to do is we are trying to create some bloom filters on the executor for every micro-batch, and they’re going to send them to that [inaudible]. So you can see that after the micro-batch is defined in batch DF, we are setting up a redis connection in the for each batch in the stack. And once we have set that up, we are doing some local aggregation, using the micro-batch itself. So we are doing [inaudible] on the micro-batch itself. So again, this is a local application of [inaudible]. And then we use the redis connection that we created at the top of the partition to send the data to the external [inaudible] that is created is in this use case. And once you’re done, we close the redis connection and we are done. So this way we saw an example of all the things that we spoke about, and the pros and cons.
The next super important thing that actually helped us, is speculative execution. Speculative execution in Spark, this should not be a huge surprise to anyone who’s come from the Hadoop MapReduce things. Speculative tasks basically get created when a particular stage is slow, like when some tasks are slow, when [inaudible] the speculative execution is that a true duplicate tasks are launched, and we basically have a race to see which task ends first, building the results from that task. So extremely useful when the tasks, and the side effects from the task are [inaudible].
And in some of our cases, this was pretty useful. [inaudible 0000:20:19] extremely useful, is trying to maintain your average and median times. Because failures are a common part of cloud, whether you’re doing it in one data center. Some VMs might become very slow, some networking issues will happen, so specifically for long running things. You can think of it as a parallel within a high throughput API, with the hundreds of instances obviously [inaudible]. So it’s always good to have the speculative execution on if the conditions worked for you. And what you will see is one important parameter is the speculation quantile.
Now this tells you what fraction of tasks must be completed before speculation is enabled. So 0.9 means at least 90% of the tasks for a given stage have to be completed before speculative execution can be triggered. So we can see here that 10 tasks were probably spun up, and then they got killed because the other ones ended. But the main takeaway is we’re trying to make sure that the time is not going multiple standard deviations away from the average time taken by the tasks.
Now, a very common thing, one of the most powerful things that we get with Spark structured streaming, is the streaming statistics. So let’s look at some pitfalls during the aggregate step that we have. Let’s take a very simple example. This looks familiar to you. This is an events data thing that we have created from the read stream. So we’re reading from Kafka for some detail company, and the events have purchases, add to carts, clicks, views, et cetera. So let’s take two different scenarios, two different structured streaming queries. So one is even count by hour. So we want to get all the event types that have occurred in a sliding window of one hour, and we also want… Sorry, not a sliding window, window on one hour. And we also want to get the product count by hour. What products have been sold in this past hour?
So we make a filter on the purchase events, and then we are going to group by the product title. So they look very similar to each other, right? But then how does it results in the state store? Because Spark has to maintain those results [inaudible]. So it will look something like the key would be from 8:00 PM to 9:00 PM for purchase events. You have 500 events that came in the picture. Similarly for add to cart, [inaudible]. So this is how the results table would roughly look.
Now how would the results table look for the second? Pretty similar, but you can notice that with product one, product two, and then a product xxxxx. And then add to the fact that this is on the hardly basis, the prospect is going to be pretty high. So the first one, it’s of no cardinality, we are all good to go. We don’t have any big issues over here. The second one, like I said, product IDs could be in say hundreds of thousands. And then since you’re doing a [inaudible] on the hour, 24 hours in a day, so it could get really nasty, and can be a very high cardinal to go over.
So the StateStore issues mainly come from the fact that by default, the store in memory, that is managed by the JVM the HDFS StateStore, this is one of the biggest issues with them. A large number of keys in the StateStore, obviously, is going to cause now a GC pause. So more GC pauses means higher latencies and increased lag on your streaming components. So what we want to do is we want to switch to an off-heap StateStore. So one example that we have over here is in the proprietary version of Databricks, you get the Rocks DB StateStore provider. So in this way, you can safely manage way more keys, like a hundred times more in the StateStore safely, rather than having to have failures or GC pauses. You take a slight hit with respect to the right leads, but it’s not exactly a big problem over there.
You can also implement your own persistent off-heap StateStore provider. There are a lot that are available right now for Then I liked that are available right now for Rocks DB. One example is you can use redis too. One thing to keep in mind is you want to think about what your output mode is, whether you’re doing a complete output or upend output, so that’s something… Or an updated. So that will also decide how efficient your StateStore choice is going to be.
Another thing that I quickly want to touch is skew. Some partitions might be extremely skewed when you’re getting data in. And this is very variable, you can’t always control what data you’ll get [inaudible] from. So in that case, when you’re processing these partitions, you can have out of memory issues or connection failures. So even if say at 99 or a hundred tasks pass, if that one task fails, your stage is not going to pass, and you’re going to be stuck in that way. So one thing that we can do to help out is repartition. So just repartition by whatever column you think might help you. This will be one easy way to [inaudible]. But repartitioning might not be enough. So we want to add some salting to the [inaudible]. So everybody has a unique way of trying to add some salting logic to the columns.
One simple way is you can always have a target partition count in mind, and then you just randomize your salt key so that you have a much more uniform distribution, so that you can split the data uniformly across the various partitions, rather than some other column which has a much more skewed distribution. One thing that I wanted to go into was about the target partition count, but then I will leave this for the slide. One thing that I’ll say is, if you are trying to target data target partition count, you might want to keep in mind to try to mimic the HDFS block size, because that will help you in the reading of the final block that you have written, in case your final sync is HDFS.
One last step, which I want to do focus on as if you’re using, say redis as your sync, you’re probably using some other HDFS sync also. You might want to look into pipelining, whether it’s on the HTTP side or on the [inaudible]. So without pipelining, every time a client sends a request, it gets an act back, and only then it sends the next requested. With pipelining, you send a bunch of requests, the server does the batching on its side and then sends requests back altogether.
So one example that we have in the for each batch example that we saw is, what you can do get the redis connection on the per partition level. You set the auto flush the false so that it tells the client, “Hey, don’t send the request unless I explicitly ask you to.” You group it locally in your for each batch, so according to your group size, and then you keep adding things as if it’s business as usual, as in doing operations on that. And then you do an explicit flush. So that tells you that is, or whatever client you’re using, “I need to actually send the commands in now.” And then you wait for the thing and you get the results back. So we’re almost out of time, so I think that’s all that I had for today. Please do leave feedback and let us know how we can improve our stuff, if this was useful to you. And that’s all. Thank you.
I am a Sr Engineering Manager/Architect on the Unified Profile Team in the Adobe Experience Platform; it’s a PB scale store with a strong focus on millisecond latencies and Analytical abilities and ...