How Adobe Does 2 Million Records Per Second Using Apache Spark!

Download Slides

Adobe’s Unified Profile System is the heart of its Experience Platform. It ingests TBs of data a day and is PBs large. As part of this massive growth we have faced multiple challenges in our Apache Spark deployment which is used from Ingestion to Processing. We want to share some of our learnings and hard earned lessons and as we reached this scale.

  • Repeated Queries Optimization – or the Art of How I learned to cache my physical Plans. SQL interfaces expose prepared statements , how do we use the same analogy for batch processing?
  • Know thy Join – Joins/Group By are unavoidable when you don’t have much control over the data model, But one must know what exactly happens underneath given the deadly shuffle that one might encounter.
  • Structured Streaming – Know thy Lag – While consuming off a Kafka topic which sees sporadic loads, its very important to monitor the Consumer lag. Also makes you respect what a beast backpressure is.
  • Skew! Phew! – Skewed data causes so many uncertainties especially at runtime. Configs which applied on day zero no longer apply on day 100. The code must be made resilient to Skewed datasets.
  • Sample Sample Sample – Sometimes the best way to approach a large problem is to eat a small part of it first.
  • Redis – Sometimes the best tool for the job is actually outside your JVM. Pipelining + Redis is a powerful combination to supercharge your data pipeline.

We will present our war stories and lessons for the above and hopefully will benefit the broader community.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– [Yeshwanth Vijayakumar] Hi guys this is, Yeshwanth Vijayakumar. I’m a project lead at the Adobe Experience Platform. Specifically with the Unified Profile Team. Today’s talk is going to be about how Adobe does a significant amount of throughput using Apache Spark.

Today’s goals, I wanna keep it as simple as possible. The first thing is to share tips and experiences, from our different use cases. The second thing is, hopefully it saves you at least an hour of your time. One thing I’ve kind of realized over the years, is we always think like we’re doing something really niche. And then the more people I speak to them, they go, “Oh my God, “why didn’t he tell me this before?” So, it’s kind of a combination of those kinds of depths and stuff that I’ve come across. And I wanna share.

What do you mean by Processing? Agenda!

Setting the agenda, right? What do I mean by processing? I wanna actually look at it at multiple levels. So I’ve split the talk into three different parts. The first is, when we say throughput, we also have ingestion, like how we get the data in. So we have a throughput associated with that. The second part is like evaluation, like you know, query processing. So you have, that’s the more common throughput that we usually talk about. But probably it’s me. A third thing is, redis. I just wanna go a bit about how it kind of applies to a lot of the use cases that we’ve seen, and how we can get a bit of extra juice from it.

Let’s talk about the ingestion scenario of it now.

Unified Profile Data Ingestion

The unified profile team, basically in this data from lot of different sources, like Adobe campaign. Adobe campaign is, for those who don’t know it’s like, an email targeting some solutions with all the batch and blast and stuff like that. A lot of marketing use cases here. AM is like website building. Adobe analytics is one of the largest web analytics vendors out there. And then also like Adobe Advertising Cloud. So you have a plethora of you know like, marketing information flowing in for all these different clients that we have. And the data can be in multiple formats. It could be JSON, parquet, protobuf But the key thing is, B44 is, once it lands into a platform it’s in this XDM, or the experience data model, this is a standardized data model that we have for the marketing use cases. Once it gets there, it gets into the unified profile store, which basically gives you like a 360 degree view into marketing use cases. Like I could make a cray like, “Hey, get me all the people who did, “who saw an advertisement for a Photoshop, “but did a trier of illustrator in the last three weeks.” And what you say did not get an email from Adobe in the last 30 days. So, you can make a lot of complicated and powerful segmentation use cases. Just with your first party data, are the party data. But the key thing that we’re gonna focus on, once it gets there, we’re gonna focus on the statistic creation, once it gets into the unified profile store. So, with respect to the ingestion thing. So all of this is getting piped into say a Kafka topic. So you have a fire hose of data flowing in. So, and we’re gonna build statistics on top of them, okay? So, that’s the setting of what we’re gonna talk with respect to the throughput. The first thing is we’re gonna go over, knowing what the lag is. Before we go into throughput, right? We need to know what we need to measure. Specifically with such a high volume of data flowing in. We want to be able to measure a lot of things, like what is the simple things that spot streaming gives you right out of the box, through the streaming credit listener. Like you get a handle on every credit progress. So whenever a micro batch completes, this handle gets called, and you are able to get. (mumbles) The number of rules that got ingested into the micro batch, and also things like how many rows got processed per second, the ad batch time. So, you get a good amount of information from just the vanilla metrics, that comes with the on query progress. But then, I mentioned that the kind of listening to like an offstream Kafka. So this fire hose. So, more often than not, you’re gonna fall back. So, apart from the throughput, you also need to know where you are at that point of time, when you’re processing information.

So, I came across this really cool blog, which kind of went into how to keep track of, the lag that you have for the Kafka topic. So, a niche trick is, we create a dummy consumer group for the same topic that we are listening to. And then they fetch the progress information from spark, from the structured streaming creativity. And then they write back that checkpoint state, into the consumer group. So, you have one topic, and you have a specific consumer group just to keep track of how much we have read, and that we are feeding it to the spark streaming creditors. And once you have this registered, you can use like burrow, for those of you familiar with Kafka Burrow.

Use Burrow to keep track of the Lag

And you can keep track of the lag that you have. So, now you can kind of get an idea of like, okay, on 23rd may at 12 midnight, we had a lag of 2 billion. So, it meant that. So it means. So, you know as we keep going across multiple time periods, how well we are doing, or how much more we need to do? So that’s a key thing to measure. The other thing is of course from the vanilla metrics, you also can keep track of the record per second, and right away you can kind of have a spoiler. Like you can see it, like we’re doing tops of 3 million questions per second, pretty easy, easy to do. So, now that we know what we need to measure, and how to measure it, like I know we went through in a very fast way, but hopefully that gives you an idea about what it is. Now we can get into, okay. How do we actually achieve that?/ With respect to optimizing the ingestion, that is the read.

We want to look in, but even before we go there, right? Let’s look at the generic flow that I described. So, we have a Kafka topic, has some partitions with it. And then we have some as box streaming application, which is reading in the data, and distributing it to the executor’s. And the executors have some business logic. And this business logic, using this business logic, we are doing some processing, and writing it to an external DB. So far so good, cool. Now, let’s get into actual, the optimization ways of the tip spot. So for the three dendrites. So this part of the code that I’m showing yah, a lot of you must be familiar with it, because this is kind of like Kafka 101 on the structured streaming site. But then we’re gonna kind of dig in into a couple of important aspects, through which we can tune our applications. The first thing that we wanna do, is we’re going to set the executors and this constant let’s pick a number. I’m gonna just draw a line in this. And don’t go too low. Otherwise you’re gonna have a big phone. And then once you keep this constant, they are going to pay attention to this parameter, maxOffsetsPerTrigger The structure streaming competent with Kafka, it’s predominantly your pool-based interface. So, we decide how many offsets they’re going to pull from Kafka per trigger interval, meaning, say if you’re trigger interval for the micro batch is say 30 seconds. And if this value is say a hundred thousand, we will be like, okay, every 30 seconds, we are gonna pull in hundred K entries from Kafka, and that’s going to be distributed across to your executor’s. So, this becomes the first gate that we need to optimize on.

To begin with, let’s say we have been very optimistic, let’s just pick a number and say like, this is the target QPS that I wanna go for. So calculate that and set the max offsets for trigger basis based on that.

The next thing that we’re gonna do is, all the monitoring that we set in place. They’re going to monitor just the processing time. So the processing time, and we’re gonna compare it with the trigger interval. For an example sake, let’s take our trigger interval was 30 seconds. So, every 30 seconds we’re pulling in a hundred key records, but then say our processing time not be monitoring separately on the dashboard that you set it up. Even through the Sparky way, if the processing time say takes 40 seconds. Now, the data for as in, for every 30 seconds you pulled in some data, but you’re taking 40 seconds to process it. So you’re never gonna catch up with this throughput. (mumbles) So, that’s not your ideal throughput. So, we need to what we say rinse and repeat now. So, what we’re gonna do is, if we see that the processing time is veil assessor, than the trigger iterval you know that you have some headroom to grow QPS wise. So increase it. Easy thing is like as you know, step in terms of doubles, like kind of like a binary search approach. So you keep increasing until you find your niche process, or to say maxOffsetsPerTrigger such that your processing time is as close to your trigger interval. Once you’ve kind of gone through the process with respect to that, the next thing that we wanna do, is we wanna look at this interesting parameter called min partitions. So min partitions, basically it enables a fan-out pattern. It maps one Kafka partition. (mumbles) Maps one Kafka partition to multiple sub-partitions within the executer. We are gonna take it a much more deeper in the the next slide so that, get a visual and representation of that. But then the key aspect is that as VDP di process, multiple times, we will end up with a proper throughput per code.

There’s of course, assuming that, you know like you’re all rock stars here, and we have some very well optimized business logic code, which knows what it’s doing. So, what this assumes is, for a particular piece of code that you have, they have calculated and for a particular type of instances and everything, we have calculated throughput per code. And now we can horizontally scale.

There are some caveats here, specifically like network bandwidth and stuff like that. But then, usually that’s not, if you usually think that network is your problem, nine out of 10 times, it’s probably not. So let’s dig a bit deeper now into, the case where we set the main partitions greater than the number of partitions on Kafka.

Flow with MinPartitions > partitions on Kafka

The example that we have here is, let’s take this topic it has two partitions, and we have three executor’s. Now say, if I say the main partitions equal to six, now what’s gonna happen is each partition is gonna get split into multiple sub-partitions. So, you can see executor one, has partition 1.1, 1.2. And the third partition that addition one gets good into goes into executor two. So, this is a very interesting thing to do, because a lot of the time. So in the initial stages, I was like when I was getting pretty hung up on throughput, I used to tell the pipeline team, “Hey guys, I think I’m saturated. “Can you increase the number of partitions on Kafka?” And they’ll be like, “No, man, this is hard. “We can’t just keep, “increasing the partitions as and when you want.” And of course they were right. So you just can’t keep doing this. Instead, the easiest thing to do is pull in more data, and then do a fan-out pattern, according to your own application. And this is something that the main partitions helps you do with respect to scaling your application. Instead of you saying like, “Hey, you know what, what and like by how many partitions “are there in Kafka?” What you are bottlenecked is, how fast you can read from Kafka that doesn’t change. Now that is somewhere, the net network actually plays a big part.

So, now with this clarification in place, you know how I was saying, this logic is rock solid, we’ve kind of optimized it.

MicroBatch Hard! Logic Best Practices

We’re using, say you’ve probably already done all the benchmarking, you have done what they say or micro bench marking to you know exactly which JSON Foster you’re using, all that is done. But then one thing that I did realize that in spite of all of that, you still can make some mistakes. So, one thing that I kind of note what is the evolution of my own code, was the main difference between these two paradigms, which is the map or the forEach, or you know which is on a record by record basis, versus the map partition or the forEach batch, which kind of inherently has like a batching concept with it from the name. So if you were to look into, the map or forEach thing, in terms of pros, it’s easy to code almost every single hello example that you have. It uses a map or a forEach. It’s super simple to visualize. And that’s probably the easiest way that you can get started with. But with respect to cons, right? It’s low. I’m sure a lot of people, I know you probably don’t know what you’re doing. And I’m just saying it from my own experience, with respect to seeing nine out of 10 times, it’s usually slow and how to read rewrite it as, a map partition of forEach batch. So, a lot of bias altered in my truck. With respect to why it’s usually slow, is that you don’t have a lot of local aggregations. And even if you want to do local aggregations, you need to specify explicit combiners. Specifying combiners and spark, is probably like feeling deep, potentially. That’s one thing that I don’t actually enjoy in the spark API, but yeah. The next thing is, it also spawns up a lot of individual tasks.

It’s probably good or bad depends on the situation. And then the main thing that I feel is a lot of the times, we have downstream processors. So, it’s very hard to get the connection management right, when you’re using the individual record. (mumbles) Of course the forEach writer, is a notable exception, but I kind of like a bit more granular control with my stuff. If you get to the map partition, or the forEach batch parallel, right? Those are the pros. You have explicit connection management. We will look at an example for it next. And then the next one is, so this helps you you know, batch things much more easily, and you can re-use the connection more efficiently. Other thing is, you can have local aggregations using you know like you can, any data structure like HashMaps or sets. You can use it at the partition level and then go for it. So you get like local aggregations, and you have complete control over it. The next thing with respective cons, is that when you look at it, it’s not exactly easy to visualize.

And also you need a lot more upfront memory. So, on the map side or the forEach side, you’re doing record by record. Here, you are actually simulating the batching process. So, your input load is gonna be significantly higher. So, you would need to re-evaluate your executer memory, and stuff like that based on this workload. It’s not just going to be like, “Oh, I have the exact same instances. “I’m just going to put it over to my package and batch it.” No, that doesn’t work that way. Of course, like I said, it’s uglier to visualize. Now, we gonna look at, oh! There is one thing. Like I said, memory is one thing, but there’s also CPU. Usually we don’t bother to change our spot per CPU’s. It’s usually set to one or two, but then depending upon how well you are batching, or paralyzing within your map partitions or your forEach batch. You might wanna add a little bit more juice to your spark or task per CPU’s.

An Example

Now, if you look at an example right? This is from another talk in the same spark summit. This basically creates some bloom filters off an event stream that’s flowing in, and we’re using a forEach batch construct to see what, and then they’re gonna build the bloom filters and ship them off to redis.

If you take a closer look at this, at the top of the forEach batch, we have a common redis connection, of the thread-safe redis connection that can get reused, for anything that goes downstream. And then the next part, you would see that they have a local aggregation, by you know, like getting all the unique products in a batch. So this is just a set. So, you’ve kind of gotten the local aggregation at the forEach batch level. And then once you’ve gotten that, you can see that they’re doing an update of the bloom filter to the downstream redis, by using the common connection that we have. The key thing to note is, this forEach could easily be paralyzed and threads. (mumbles) And then with respect to connection management, we can issue an explicit close, so that we can rest easy, that we are not causing a lot of issues with respect to connection management on the downstream database side.

With wrapping up this example, if we were to go into the next slide.

Speculate Away!

Another common thing while dealing with such high throughput

as in when the input itself is coming at a such high volume, what you’re gonna see is, your process might start off doing that. But then after a day or two, these are long running apps, after a day or two or even has, you’re gonna see some degradation. It could be in the machine, it could be in garbage collection or something like that. So, some set off your tasks, might slow down, and be like, why should that happen? It started running. It ran well for two hours or three hours or 24 hours. Why is this still happening? So, the ghetto with that, so Spark has this extremely neat feature called “speculative execution”. So, what exactly happens is that, if speculative execution is set to true, if you have some slow running tasks, they are palely launched on other executor’s. So, and the new tasks, if they complete before the slow running task, the old task gets killed off. So, we are speculating ahead of time. That’s saying like, okay, this will then. And this is extremely useful, because we do as an even if some nodes get really slow, because we have speculative execution on, we will be able to kind of get to the same average timings that we had before, so that the throughput doesn’t drop drastically. One cool parameter apart from just turning on speculative execution, is also the spark.speculation.quantile.

So, that tells you what fraction of tasks must be complete, before speculation is enabled. So, point nine means, 90% of the tasks must have completed before the speculative execution monitoring even begins. So, if you look at the example that we have here, you can see that usually just takes two to three seconds each step, but then for the two steps that are highlighted the seven seconds and six seconds, you can see a good number of tasks have been killed. So, that just means that speculative execution decided that, okay, things are going haywire. They’re not normal. So let’s spawn off new tasks, and make sure that they catch up. So, that may be saying that it doesn’t cause a huge fluctuation in the timings. You need the label to keep to around the same times. This personally has been a huge lifesaver in terms of monitoring or the starting processes, and stuff like that.

We went through the ingestion throughput tips. So, but then now we’ll kind of switch gears, and go into the evaluation scenario. But mainly with respect to, we’re gonna have some queries and how we evaluate them.

What are we processing?

First, let’s get some context with respect to what we are going to evaluate, or what is the structure like? The main goal that we have here, is that we wanna run as many queries as possible in parallel on top of a denormalized data frame. What do I mean, if you were to visualize it, say you have query one to query 1000. So, in our specialized use case that I have for my team, what we wanna do is we wanna take these thousand queries and we want it to run on top of like a large denormalized data frame.

The key thing is here, they don’t rely on the spark cost-based optimizer. The main reason is, every query needs to look at every room, and the subset of fields that the queries are interested in, could be all the fields in the entire, all the columns in the data frame. So, this means that all the push down, all the filters don’t work. So we don’t need a cost-based optimizer, because it’s always a full table scan. So given this perfect storm, how do we optimize and activate. The next thing is that, we wanted to have like kind of an interactive processing layer. So as soon as someone submits the query that you wanted to get, so someone’s on the other end waiting behind the UI, to get realistic results survived. How do we get real time results for others? And so that’s gonna mean like the… By real time I mean, the query you need to finish in say seconds, like 10 seconds, 20 seconds at max.

So, given this problem statement, some things that we found that were blocking.

The Art of How | learned to Cache My Physical Plans

So, we are gonna look at them. The first thing is, how we learn to cache our physical plans. This was a good one, mainly because in the case where you’re repeating queries over the same data frame, kind of took some inspiration from the RDBMS Land.

For Repeated Queries Over Same DF

So, where specifically we have prepared statements in RDBMS, like the example that we have here, the incident to products, you can see that it’s templatized, the values are templatized. So, this avoids repeated query planning by just using that template. So, what that means, the compilation process that is the parsing, optimization and translating to a physical plan of execution happens way ahead of time, and then values are substituted within it. So, if we are doing the same processing again and again, in spark, is it possible for us to have something like a prepared statement?

So, I have the example over here, but then in the next few slides, we’ll actually go over this example itself, and see how we can cache this query plan.

With respect to, now a lot of you will be like, “Okay, that’s fine. “Why do you still wanna do this? “What was the problem that you’re facing “that you wanted to cache this?” Our data frame has like thousands of nested columns. Just printing this query plan, right? When you go on the debug mode, it costs an overflow while printing to the logs.

Literally you can’t copy paste it. The time for query planning, very largely. It was completely a factor of the number of columns that we have. So, it just two to three seconds, usually you see that in like milliseconds, two to three seconds, sometimes even five. So, depending on which organization we’re running, this query on, it could go crazy. So, when you’re talking about, interactive queries when total untime is less 10 seconds, 30% of the time, is a good amount of work. So, there was this very interesting cycle of negotiation that was unanswered, it kind of led us to go in this direction.

So, with respect to status score, right, we will take a simple data frame. Once we go to the next slide.

We’ll take a simple data frame. Like the event data frame, it has a bunch of events flowing into it. What we’re gonna do is like for each partition, as in we are following all the good practices that we’ve specified before. They’re going to do an forEach role within that partition, they’re gonna do some IO operation. So, in this case, IO operation is our print. It just gonna print even type. So it’s fine to simulate our case.

And then, another thing that we did is, in order to simulate the same conditions as what we have in our code base, we also cached this event data, the data frame. So good, so we cached it too. If you look at the query plan over here, you can see that it’s actually doing an InMemoryTablescan, DeserializeToObject, mapPartitions. Good, all good, looks good right? That’s fine. But then, and time taken is one minute and 40 seconds, but then the query planning stage of this, took around, what did I say? Eight to 10 seconds, then lead on that. So, now next, what we did is, it could as far out of the caching the query very current thing. So what we did is every query internally for the spark, for the structure execution pipeline, it has this query execution part. So it is the physical plan. It doesn’t keep track of the actual… The physical plan is kind of a misnomer. It doesn’t keep track of which data set, or data frame is actually executing on. It’s more like the query plan, like okay, projection of these fields, like more schema base. It keeps track of that. So, what we did was, we hooked on to the query execution, what did I say, attribute and got the RDD out of it. So, but then that gives you an RDD of internal rules.

So, we cached that RDD. And, but of course now you don’t have, a data frame of events. Instead you have an RDD of internal roles. but we also pre-process, and just go a row and quarter created out of, what would you say? The existing schema that we have. Now, once we got this, when you run the query on top of the cache internal rules, what we see is, the query execution time, went down to 60 seconds, take a minute, regard of what they say VC runs 40 seconds. In this example, of course, we used a significantly, a lot more number of fields than we would use in a, what I say? Our actual use cases. So you’re seeing a difference of almost 40 seconds, but yeah, you get the gist on that one. And you can also see the difference in the query plan that got generated out of this.

So, this helped us get to the interactive capabilities. Now, the next thing is, for any query that you run into joins.

Join Optimization For Interactive Queries (Opinionated)

Join optimization mainly for interactive queries. I’m gonna have a very opinionated pick on it. For interactive queries. The best way to deal with joins, is don’t join.

I know a lot of people are just gonna say I said, but yeah, try denormalizing them as much as possible, specifically, as in you need to trade off, because if you’re looking at interactivity, you just can’t keep hopping from place to place. But say you can’t avoid it at all. Try to broadcast the join table, if it’s small enough. So that you get a hash join. The next thing is so, but then sometimes you end up with, you know like I do need to join. It is like a hash join, but then it’s not small enough. So, one cool thing that you can do, is put it into redis. Redis does a very good job with respect to high throughput redis. And it makes for very easy replacement for a broadcast join. And in theory, if you look at it, you still get the characteristics of a broadcast join.

Once the data for these interactive queries starts getting really large. And if you have the join, and you’re going to have shuffle. You don’t like shuffles and shuffles don’t like you. So, you’ll see that very quickly. So our kind of join would be the default join, unless what they say, the cost optimizer kicks in, and tells you to do something else. So, the best thing again, if you can avoid the join all together.

The next big thing is skew. Skew data.

One of the worst thing that you can see in the sparky way, is like you say that, “Oh you know, “99% of my tasks have completed, one hasn’t completed.” And then you feel like, “Hey shit! “This is not going to work out.” So, even 99% of the job got done, still that one person that didn’t get done, they’ll fail your task. And usually that is because the default partitioning scheme of your data frame, might not be good. Some partitions can have too much data. So processing them can cause like out of my memory or connection failures.

So, re-partitioning is your best friend in this case. Spark has this need utility to do, you know, DF trynna repartition, and can use some column and do. A lot of yoU will be like, “Hey, I know this already, that’s fine.” But then sometimes this might not be enough. So, what we do is, we add some salt on top of it. What I mean by salt is, we create a new column or derived column, which gives you a unique or a uniform distribution, with respect to a target partition count that you want to achieve. If you wanna keep the same partition count, that’s still fine, but then, you can get a new target partition count, or and then try to repartition it across that.

Now, the question would be like, okay, fine, but how the salt key sounds fine. You get a uniform distribution, you repackaging it. But how do I know what is this target partition count that I’m referring to?

How to get the magic targetPartitionCount?

So, one quick trick that actually helped, without us spending a lot of resources on top of it, is that we just sampled it, like a bit of random sampling. A key aspect that came up in a lot of our investigations, was that the recommendation was every parquet partition to have like a 128MB size.

And this recommendation did a hold up because it kind of… it mimics the HDFS block size too.

So, what we do is we sample a small portion of the large data frame. (mumbles) And then we estimate the size of each row, and we extrapolated. So, the sample function that I have here, all it does, it just looks at the DOP, say 5,000 rows or something like that. Looks at what size it is, gets a size per row. And then based on that, we try to see how many, 128 MB partitions we need. And when you get your estimated partition count, this udistic has actually worked out really well.

We’ve been testing this for over a year, and this kind of has worked out.

If you guys have better. (mumbles) I’ll definitely be interested in one.

All put together! Dataframe Size: 13 Million entries

So, let’s put together all this stuff that we’ve been talking about, right? Let’s just start with the interactive query processing. So, we ran a test. So we have a data frame. It has 30 million entries in it roughly. The schema has around 800 nested fields. So, you all know it’s gonna run and do the query plan problem. So before, you can see that it took 23 seconds, it is not bad. And it has like around eight partitions that we’re operating on top of. After we put all the gown that I spoke about right now into practice, you can see that we have like around we have the partition to around 30 to sub-partitions in the parquet. And with all other query practices that we put into place, you can see that it just took a mere six seconds to process. So that’s like 2.1 million records for a second. So yeah, so all of this put together definitely helped us, in achieving throughput that we wanted for the interactive case.

One thing that I do want to touch on, apart from this, is a common design pattern of trying to put redis everywhere. I’m kind of guilty of that, but then sadly, it just kind of fits in in a lot of places, and helps us out.

Using Redis With Spark Uncommonly

With respect to redis, I’m not gonna go too deeply into every single thing, but some uncommon use cases are, you know, you couldn’t maintain bloom filters, and HLL on redis. But write to them from spark, you could also get interactive counting instead of using accumulators. Accumulators have a lot of issues, but instead of using accumulators, you could use the redis counters with pipelining, to get your interactive. (mumbles) Then another thing is if you want to simulate like kind of an event queue system for a long running spark job, you could also use redis to convert any back spot job into an interactive spot. A key thing that while using redis and spark, that is specifically redis inside spark, is the pipelining and the batching aspect. So, just to end it, and redis is gonna touch on a bit about redis pipelining, because that kind of commonly features both, you know like in injection optimization, as well as the interactive processing optimization.

Digging into Redis Pipelining Spark

So, for redis pipelining, without pipelining, if you were to simulate a bunch of operations, right? So the client says, “Do something.” Server says, “Yeah.” Client says, “Do something again.” “Server says, “Yeah.” So you kind of have this bounce back. The pipelining though, what happens is, the client says, “Hey, do ABC and D.” And just pushes it, so it’s like a batch. And then the client reads everything together, and then says, “Okay, fine. “I’m gonna do all of this together.” With redis, this works really well, because on the server side, it’s able to read multiple commands, with the same read command from the socket. And at the same time and say, we will do all the. (mumbles) As in, each operation gets done. It’s able to put all of the results on a single packet back, not a single packet, but then it’s able to write back at the same time. So, in terms of throughput with pipelining, we get a huge vendor benefit in redis. And then now when you put together, you know the whole micro batching, what they say, proposing that we saying, like try batching as much as possible. And you put the pipelining into place. They kind of make a pretty good fit.

So, if you take an example of the score that we have over here, what we have is, we’ve created this forEach batch in an event stream. We’re going to first set up a common redis connection on top, to enable pipelining though, what we’re gonna do is, we’re gonna set the flush to false. So that means that as soon as the command comes, it’s not gonna be placed on the wire, and it’s not gonna go all the way to redis. So, then what we do is, we are going to do some local aggregations with respect to the operation that we want to do. And then, before we started writing to redis, we’re going to micro batch this. So, we’re going to batch it within the micro batch itself. So, we’re going to know the group size, like say 10 or something like that, or whatever you feel is the right, of your group size according to your operations. Once you have that, you can start pumping in messages. So, all you’re doing is just repeatedly adding messages. At this point, the key thing to note is not, a single message has gone to redis yet. Once the individual batteries are done, they call an explicit flushed, and only then does the redis go there. So, this makes for a… So now all 10 commands for the set ad have gone to get them. So this way, you’re gonna see a huge jump in your throughput. I find that this niche trick actually helped a lot, with respect to our throughput increase. I would easily credit this to at least a 10 time improvement in a lot of our throughput use cases. Of course, now when you put together, you know the whole forEach patch map partitions paradigms. You could potentially, you know, the forEach that you have here, you could palatalize it even more with threads. The redis connection is set safe. So, you can try to crank a bit more juice out of it.

So, that’s all the tips and tricks that I have for today.

If you guys have any more questions, feel free to reach it out to me, but that’s it.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Yeshwanth Vijayakumar

Adobe, Inc.

I am a Sr Engineering Manager/Architect on the Unified Profile Team in the Adobe Experience Platform; it’s a PB scale store with a strong focus on millisecond latencies and Analytical abilities and easily one of Adobe’s most challenging SaaS projects in terms of scale. I am actively designing/implementing the Interactive segmentation capabilities which helps us segment over 2 million records per second using Apache Spark. I look for opportunities to build new features using interesting data Structures and Machine Learning approaches. In a previous life, I was a ML Engineer on the Yelp Ads team building models for Snippet Optimizations.