Redis + Apache Spark = Swiss Army Knife Meets Kitchen Sink

May 28, 2021 11:05 AM (PT)

Download Slides

We want to present multiple anti patterns utilizing Redis in unconventional ways to get the maximum out of Apache Spark.All examples presented are tried and tested in production at Scale at Adobe. The most common integration is spark-redis which interfaces with Redis as a Dataframe backing Store or as an upstream for Structured Streaming. We deviate from the common use cases to explore where Redis can plug gaps while scaling out high throughput applications in Spark.

 

Niche 1 : Long Running Spark Batch Job – Dispatch New Jobs by polling a Redis Queue

· Why? 

o Custom queries on top a table; We load the data once and query N times

· Why not Structured Streaming

· Working Solution using Redis

 

Niche 2 : Distributed Counters

· Problems with Spark Accumulators

· Utilize Redis Hashes as distributed counters

· Precautions for retries and speculative execution

· Pipelining to improve performance

In this session watch:
Yeshwanth Vijayakumar, Sr Engineering Manager, Adobe, Inc.

 

Transcript

Speaker 1: Hi guys. Today’s talk is going to be how we combine Redis and Apache Spark to get what we call it as a Swiss army knife meets kitchen sink. I’m Yeshwanth Vijayakumar. I’m a senior engineering manager and architect at Adobe in the Adobe experience platform group, specifically with the unified profile team. We’re extremely heavy users of Spark, and we’re always looking for people to join us in our journey. So if you’re one of them, please do hit me.
Even before we go into the agenda, I do want to say one thing about Redis. A lot of the time people just dismiss it as a cache. It is not just a cache. It’s probably my most favorite tool that I have a personal bias, not withstanding. It is, in my opinion, a database of data structures. A lot of the times we are implementing the data structures I a hood irrespective of whatever language we want. What we have in Redis is like a very efficient data structure database that we can reuse across multiple components with the network.
Now that said, and this talk is not sponsored by Redis. With that said, let’s get started into the agenda. This talk is going to be a bit different. This is a complete anti-pattern talk. If you are looking for something much more standard that you can apply, that would be, I have two other talks for this summit. So it would be good to check them out, but today’s talk is, I promise it to be a bit of fun and to get your brains thinking of it in a different way. The first niche that we’re going to talk about is how we get a long running Spark job implemented using a Redis script. Second one will be distributed counters, which will be a lot more straightforward than the first one. So let’s get started.
First, let’s establish what the problem context is, because without setting the problem context, the talk is not going to make any sense. The use case that we’re trying to solve for here is we want to run as many queries as possible, in parallel, on top of a denormalized dataframe.
If this was a SQL land, think of it like you have, say a 100 SQL queries, you have to execute them as fast as possible on top of one dataframe. You’re not changing the data. It is one table, you’re running 100 queries on top of it. In SQL, we will end up in what we call it as like a slight waterfall model. The moment I say this, people will be like, “Hey, have you heard about [inaudible]?” And even with [inaudible] you would get a bit of parallelism, but at the end of the day, you will end up with an effective parallelism factor, which is less than say the 100 queries or 1000 queries that you want to run. So in our team, what we came up with is a custom query processor, a query engine if I may say, which can run thousands to tens of thousands of queries on top of the dataframes in one single shot.
So we take one row and we execute all 1000 queries on top of it. And then we move to the second one. Of course, we’re not talking about joins or crossroad queries right now. That’s kind of out of syllabus for this talk. So let’s take a very simplistic use case. We want to run a 1000 queries and these need to be executed on top of this dataframe that we have already loaded into memory. So sounds simple enough. Right? So, and we want to give an interactive, what to say, element on top of it. We already have this dataframe loaded into memory. So we want to let an infinite number of people sitting on multiple computers, trying to send queries to this single Spark application. So that’s why we say we need a long running Spark job.
And what we’re going to do is, we’re going to dispatch new jobs by polling a Redis queue. And we want to parametrize every Spark action for, based on the query that has been submitted by a given user. So every user can submit a different query. They might be submitting, like if you have a 1000 users, some of them might send the same query, right? Okay. Give me everybody who, say works in San Jose, California. So you could have multiple people sending the same query and what we do is, we want to load the data only once, but query N number of times. Now this is the problem statement. And one cost component that we want to think about is we don’t want to bring up a Spark cluster every time, right? Like the interactivity element will be lost. So we’ll be trying to initialize a lot of border flip code, some homework and some houseworking needs. We can’t keep repeating that every single time. So, that is a time and what to say, a monetary cost associated with bringing up a Spark cluster for every job or a group of jobs.
So what we need is a long running Spark job, which is running forever because I’m saying the interactivity is all that matters. And of course, the correctness of the results also matters. And possibly can we do multi tenancy? Can we say, load multiple such dataframes and it can be, what do you say, multiplex queries to them in this setup?
So what I described is kind of like, you just have a rest server put on top of the Spark cluster. That’s literally what Apache Livy is. So Livy, so I’ve just taken this diagram from the Livy site. And in fact, Livy was one of the first implementations that we used for our full end-to-end. The way Livy works is, it basically puts a rest server in between your cluster manager, as well as your client. So your client can submit queries. This can look something like this. You can submit code samples, which would get passed by the rest server and get executed on the long running Spark context, which is maintained by Livy and the cluster manager.
So why did we not use this, as in why did we use this and find this [inaudible]. The problem was, every time, as in we don’t want to be sending code in this format again and again to the Livy server, right? We already know a lot of, and this is very generic, and this is a great use case. Like when you have a notebook or an ecosystem like Zeplin where you need to submit your own custom interpreter like code to the Spark cluster, that works perfectly. But in our case, speed is everything. We want to rip everything down to the bare bones and then have an extremely efficient and fast implementation to run the queries. And then add to that, we also have an extra rest server that we need to put which we have, which Livy adds, so it causes extra latencies, if I’m going to say. And top of that, the other big problem that we have is Livy has, or to say deployment of Livy in a multi tenant environment also was a bit challenging.
There were other alternatives like a Spark job server that we also evaluated, but just the overall extra boiler plate in everything that we needed to manage meant that why can’t we have something very simple, because all we need right now is an event to acceptance. So can’t we just solve that somehow. So what we decided was to run an event loop, even before we go there, the next thing is we are in a Spark conference. So people will be like, “Okay, you mean to say, you have a bunch of queries coming in and you have a bunch of data coming in and you want to do queries based on that. Why can’t you use structured streaming file?” The problem with structured streaming, with this problem statement was that the lack of access of a Spark context within the executer, so what I mean is, if you do something like dataframe dot map, or map partitions, and within that, what you get is like a batch dataframe, which is like the microbatch in it.
So you can do Spark operations on top of the microbatch, but say, if I wanted to do something like Spark dot read of a new file within that executor, that’s not going to work. It just won’t work. We’ll end up getting a null point reception. People now will be like, “Hey, why do you want to do that? Instead, just load the dataframe at the driver level and just do a join within the executer with the micro batch, which is your query dataframe.” Now imagine this, right? So you have 500,000 rows. It’s been loaded in the dataframe already and it’s cached. Now you want to run a query dataframe that’s coming in, which is like, say five or say 1000. You need to do a join. And it’s such a cumbersome operation to even visualize. And it just doesn’t scale well with respect to the performance of query. What we are looking for is like kind of a, as in structured streaming gives you a lot of powerful capabilities, right?
This is basically, I would say a very narrow gap between Spark batch and Spark structured streaming. And we just are falling through the gaps because of the needs that we have. But it’s not exactly a problem because let’s just go over the summary of our working solution. We use the Blocking POP on Redis sets, inside the driver and we use that as our event loop. So the queries are passed to the Redis queue and then we use a blocking POP to consume them, and then basically we know we can parametrize the Spark actions on the driver’s side. So the command pattern works extremely well, to send queries, I’m sorry, send and execute queries on the Spark cluster. And we are using a FAIR scheduler. So even if we get multiple batches of queries, we can execute them in parallel. And then we can communicate the status of the job using Redis itself.
So people will be like, “Hey, this is going way over my head.” So let’s actually go through a step-by-step workflow. Start off with a Spark job, a vanilla Spark job, a vanilla batch Spark job. The only difference is the driver is in a while loop and infinite loop. It’s not going to end. Now, what they’re going to do is, the executors are going to come up. They’re first going to load the dataframe in, that we need to query on top of. And they’re going to persist it or cache it in memory. So the dataframe is cached. We’re all, it’s all hot and loaded, ready to go.
Next what’s going to happen is, we’re going to submit a query to say this interactive API that we have. Now, what this API does is, it already checks, okay for this query, do I already have the resulting cache? If not push the query into the Redis queue. Now, once it’s in the Redis queue, it’s an push operation.
So once we have pushed it into the Redis queue, on the Spark driver’s side, which is basically an infinite loop, like a while of one right now, is going to be checking, “Hey, pop all the queries in that list.” Because, like I explained before, we have a custom query engine which can run tens of thousands of queries all together at one single time. So, that magic engine is taking care of the performance part of it. So all we need to do is to feed it queries very efficiently, right? So the queries get consumed till that I have no queries left, right?
And after we’ve obtained all the queries from the query list or the query queue, we execute them. Now comes the fun part of like, okay, we’re executing them. Now as and when, so if you have what, how many ever rows you have, if a row matches this condition, in that case, what we do is we increment the counter in Redis saying that, “Hey, for this query ID ABC we have a match. So it’s just plus one.” So again, we’re using one of Redis’ counters as a feature, you increment that count. We will actually go into quite a bit of detail in the second part, because that deals exactly with this.
And then if you want to fetch the results, there’ll be another API which exposes, what do you say an interface over the Redis counters to be able to give the results in real time. The beauty of this is that, for this extremely narrow use case, you will be able to get real time results. You don’t need to, if it takes say, 20 seconds to go over the entire dataframe across all the rows, across all the executors, you will start getting results right from reading row one because the results are getting updated as soon as possible.
So people will be like, “Okay, I really still don’t understand what the hell is going on.” So the last dish that I can have, to try to, what to say, get to the bottom of this is, let’s actually dig deeper into why are, we need to replace the functionality that is given by Livy or Spark jobs now. So what we need to replace is that rest API or that RPC control that those solutions are giving us. So how are we going to, so it basically comes down to the fact that on the Spark driver, we already have compute. Why can’t we implement a HTTP server like interface, or like a RPC interface on the Spark driver. Now that’s literally what we’re doing with this function called Pop until empty. The event loop is literally this one function and the query queue, instead of you submitting the query directly to a rest server or a TCP, you’re pushing it to Redis. Redis becomes your transport layer to get your requests.
So in an API server, instead of somebody making a get call or a post call to port 8080, instead what we’re doing is we’re pushing all the queries or the calls, the payload to a Redis server and instead the API is pulling the payloads from the Redis queue. So that’s how we are, what to say, making the asynchronous approach of this entire event. So let’s go step by step into how this actually works in real life, right?
So first we loaded a dataframe for a particular tenant. Now, the pop until empty method, what it does is, in that we need to return a sequence of queries that we need to execute on. So let’s just create a list buffer so that we can add stuff to it. Until the fact that we error out, what we are going to do is we are going to get a Redis connection and we’re going to do a blocking pop, and we’re going to fetch, and we’re going to do a blocking pop on top of the queue. So the key thing to understand how the blocking pop works is, for a given connection, it will block the connection when there are no elements to pop from any of the lists.
So basically, let’s say, there is no activity going on. This entire method is going to block till a new query gets submitted. So it’s pretty efficient in the way that it’s not like you’re going in an infinite loop like turn, turn. Instead, it’s just going to wait till you actually get a new thing. And so this works well. So you can have multiple elements of this because you can share, let’s say Redis connection across multiple threads.
So now that we have brought in and say, now we have a bunch of queries in the list, right, we will try, we will pop the item from the queue, and then we will add it to our mutable list. This is perfect. And we will wait till all the things have been consumed. So now that we’ve gotten a bunch of queries, right? So, so far the pop until empty basically just fetches a bunch of queries efficiently by blocking until the Redis queue has something. Okay. So now we have a wrapper function on top of it, which is going in the infinite loop, to make sure the driver doesn’t die, right? Because this is just a vanilla Spark application.
So it will, in the infinite loop, it basically keeps checking for if the queries are available. If queries are available, we just execute the queries on top of the dataframe. And then our magic query engine takes care of fetching the results, of executing the queries and then fishing the results. Now the thing is, how can we scale this, right? Like this is literally how people who are familiar with the structure streaming’s receiver code will find a lot of similarities here. So all we are doing is we are borrowing exactly the same patterns from that, where we are setting up a fixed thread pool with a bunch of receivers threads. And then these receiver threads spawn multiple instances. So that we can make utmost use of the fast scheduler. So you get, say queries one to five, we start executing, but then while it’s executing, say we get another set of queries. We don’t need to wait till the first set finishes executing. Instead, we just spawn a new thread and that thread takes care of the execution.
So the second niche that we’re talking about is distributed counters. So when I say distributed counters, people will be like, “Wait, aren’t accumulators already there to solve the problem?” So the problem with the accumulators, right, is the non idempotency part, specifically with the repeated task execution. What do I mean by that is, say when task failures happen, Spark automatically retries it till Spark passed the max retries. So you can have, like say four or five or whatever you’ve configured it. So a particular task can get retried multiple times. The other thing is, reusing a particular stage in multiple operations. So, that can also cause repeated executions. And speculative execution. So, specifically in cases where you have it turned on, Spark can start duplicate instances of the tasks, which are doing pretty slowly compared to the average of the time, and it will basically let the two tasks, the duplicate tasks race, and whichever finishes first, it will take the results from that.
So again, accumulators will fall flat on that. Another problem, not with numeric accumulators, but probably with custom ones are like, you have some memory pressure. So when, on the drivers, when you call a collect, you will end up trying to get all the accumulated details into the driver. So if you have a lot of partitions, you’re going to end up in trouble. Another problem that I noticed was you can’t access per partition stats programmatically. Like if I want, give me the accumulated details for partition 10, as far as I know, I don’t think we can get that. If there is a way to do that, please let me know. But the rest of the points still hold good.
So if you look at an example of this, right, you can, let’s take, let’s go through an example of how this would work. So we set up an accumulator, it’s called myaccumulator, it can be a long one. And then the first stage that’d we have, is basically doing a dataframe dot map. So for every row in the dataframe, we’re just going to increment the accumulator by one. All good, right. And then we do an upstream dot count. It’s completely knocked this code. There’s nothing looking problematic about that. And then, we’ll get the correct accumulator value. Now say we have another stage, right? Like we have, we’re doing downstream, which uses upstream as the source dataframe and it doesn’t have the other map and we’re doing more stuff on that and we do a downstream.count.
Now, it’s quite possible, as in we can’t guarantee this, right? So it’s quite possible that upstream might not get re-run but say if there is some kind of a shuffle failure or say some node went down after the upstream.count got executed or this is not cached, it’s quite possible that certain tasks within the upstream action can get re-run. So we might end up double counting the accumulator counts. So that’s kind of a, what to say, like a summary example for some of the stuff that we were talking about in the previous slide.
So what are we going to do? So let’s go through an example, right? So we’re going to use Redis hashes as distributed counters. So Redis hashes, think of it as like a map data structure, a memory-efficient map data structure, which is inside Redis and we can do a lot of different operations that we’re going to look at now. So what we have here is we have a dataframe and we have a map partitions going on and within each partition, we’re going to execute the query. And if that query, if that row qualifies for that query, in that case, we will do an increment operation for that accumulator. Okay. So what we have just called, so what we do first is we set up a Redis collection at the topmost level of the map partition.
So that’s what we’ve done. And let’s just define an accumulator for our query right now. So I’ve just very lazily named it as accumulator. And the next thing that we’re going to do is we’re also going to get the partition ID from the past context. This is going to be important specifically from the idempotency use case. So now what happens, and before we start executing for this partition, say you have 10 partitions. Before we start executing for any partition, we’re going to reset the accumulator for that, as in the value for that partition within that accumulator. So instead of having one global accumulator value, what we’re going to have in Redis is like a map. The name of the map will be the accumulator. The values, the keys inside that map will be the partition IDs of the dataframe that we’re talking about.
Now what happens is, so that way what happens, even if you re-run the task, we will easily, because they always reset the accumulator value, we are good to go because there’ll be no double function here. So after that, what we do is we do a HINCRBY, so H increment by, so hash increment. So what we are doing is now if the result matches, if the query executes to a positive match, in that case, we increment that counter. So key equal to accumulator, so value equal to partition ID. So for that accumulator map, we’re going to increment the value of that key, which is a partition ID. And then we will jus close the Redis connection for that partition. Now, if you look at it from the Redis point of view, right? So I wanted to do a demo, but then I thought this is a much more easier way to just show what happens.
So first we will do a H get all, that is, get me all the keys for that accumulator hash object, right? We see that it’s empty right now. Then what I’m going to do is I’m going to set a value. Say partition one zero and see that there is one, it has been incremented. Now the operations that we showed earlier, the increment operation. So what we’re doing is we’re incrementing for that accumulator map object, partition one we’re incrementing one twice, and then for partition two, we’re incrementing once. Now, if you look in the H get all, again, now we are looking at the state of accumulator. What we can see is partition one has a value of two, partition two has a value of one. So from the actual accumulation standpoint, at any given point, if you want to retrieve the value on a per partition level or across all partitions, for across all partitions, you do a H get all.
So you get a hash get all on that accumulator object, and you get the specific details on that. And for individual partitions, you can do H get all, myaccumulator and the partition ID to get the specific value. And now, if you want to delete something, you do the H delete that we use to reset the per partition count. And, voila. So we have now an accumulator that is safe from all the cons that we described in the Spark accumulator phase. But the only thing is it’s a network [inaudible] on Redis.
People will be like, “Hey, I have a lot of lower. What do I do? I don’t think it can do this.” So throughput wise, so this is a very minimalistic Redis cluster that we have here. And you can see that we’re almost doing like 240,000 operations per second on our peak. So this is, what you say, the Redis DB that we use for the long running job that we showed in the first case. You can see the sporadic load because the interactive component gets lowered every once in a while. And then it goes silent, but we can see that it can handle a good amount of load very easily.
One optimization that I want to talk about is Redis pipeline, because it’s not enough if you do like 240,000 requests per second, you don’t want to do it like one by one by one. That’s not going to work out. So there’s no free lunch on that side, right? So what we do is, without pipelining, what happens? You send the request, the server gives you an act back. Send request again, we get an act back. With pipelining, what you do is you batch up a bunch of requests. You send it to the server and then the server takes the bunch of requests, executes it efficiently, and then gives a bulk response back. So Redis pipelining makes it extremely easy for us to do a very high throughput operation. So your throughput can go 10 to 20 X just by modifying your workload, you have Redis pipelining enabled.
So an example of showing how this works is, again, the same, we’ll use for each batch in this case, we have a microbatch as an example. The batch DF is the microbatch dataframe. So in that, for each batch, what we do is on the map partitions, however you want to use it, right. What we will do is, we set up a Redis connection at the partition level, the top of the partition level. And then the important thing is we set autoflush to false. So this tells the Redis client that, “Hey, I don’t want you to send it to Redis the server every single time I make some command, but I will tell you specifically when to do it. So then in the Spark, in our execution code, we do the batchings like say specific to a group size that we specified.
And then we can do a bunch of Redis operations just like [inaudible]. And then once the group size hits, we will do an explicit flush to tell the client, “Okay, put this into the network layer and send it out all as a bunch.” And rest, Redis takes care of the rest of the methods. And then it specifically closed the connection. So this helps scale our workloads extremely well, and whether it’s for the founder’s use case, or even the query use case, this just works well. But then that said, right, for pipelining, you just can’t turn on pipelining and call it a day. An important thing to note is you need to set the client use size because sometimes you might use extremely large batches to send to Redis. So I found 1000 to 10,000 to work pretty well. So what will happen is, but then to compensate for that, this all gets put into memory, right?
So you need to buffer all these requests somewhere. So one important thing to use, if your library supports it, is to use off heap allocation. So if your library uses netty in the backend, this is one way to make sure that it’s not always in memory and triggering Redis connections. So that’s one very important thing to keep in mind before trying out this anti-pattern. So yeah, so hopefully today’s talk kind of gave you two different ideas of trying to use Redis and Spark, and hopefully it was useful to you. Thank you guys.

Yeshwanth Vijayakumar

I am a Sr Engineering Manager/Architect on the Unified Profile Team in the Adobe Experience Platform; it’s a PB scale store with a strong focus on millisecond latencies and Analytical abilities and ...
Read more