Extending Apache Spark – Beyond Spark Session Extensions

Download Slides

If you want to extend Apache Spark and think that you will need to maintain a separate code base in your own fork, you’re wrong. You can customize different components of the framework, like file commit protocols or state and checkpoint stores.

After the talk, you should be aware of the available customization strategies and be able to implement them on your own.

Speaker: Bartosz Konieczny

Transcript

– Hi everyone. Thank you for coming to this digital presentation, about the Apache Spark customization, and more exactly about Apache Spark State Store. My name is Bartosz Konieczny, and by day I’m Data Engineering Consultant at OCTOTechnology in Paris, and by night I’m blogging about Apache Spark, data engineering, cloud computing, at waitingforcode.com, and I’m also teaching data engineering at becomedataengineer.com. You can follow me on Twitter or subscribe to my GitHub. So, a customized Apache Spark. What does it mean? So here you can see my subjective division of how to customize Apache Spark, but generally, from my perspective, we can distinguish three different layers. So the first layer, we can call it sunny layer because it doesn’t require from you to know anything, or it doesn’t require to know a lot about the Apache Spark internals. You just define your code, and the code is pushed back, pushed down to Apache Spark engine, and is executed. So in this category, we defined everything. We start by User-Defined, so User-Defined functions, or User-Defined aggregate functions. The second layer requires a bit more knowledge about Apache Spark internals, because you’re called we directly interact with it. So in this part, you will find the components like SQL plans, so logical physical roles, also data sources, data syncs, plugins, which are one of the new features of Apache Spark free. And which is also one of the topics of the talks from this year’s data and dice summit, and finally fire committers, checkpoint managers and state stores. And there is also the fifth layer, the fifth layer that I call scary layer because you can still customize it, but it’s very hard to find the resources on the internet about typology mappers or recovery mode. So it require from you to dig a bit more in the, in the Apache Spark internals. And in this talk, I will focus on state store initially wanted to cover the three last items, 95 kilometers checkpoint managers and state stores. But as you know, I have only 30 minutes. So I prefer to focus really deep on state store and cover the two remaining parts on my blog in the series of followup block post. So a customized state store, what does it mean? And how does it look like? So just to recall, it’s very important to recall the definition of the state store. So the state store is a versioned hash map, which is stored on the partitions. The petitioners that are obviously managed by the tasks and the information which is stored is used by stateful operations, like aggregations, state, arbitrary stateful processing functions, or which can sound surprising the global limits. And from this definition, there are two important concept to retain. So the first is that it’s key value. So there are the concept of key which will correspond to the state key and the value, which will obviously correspond to the state value. The second point is about the diversions. So as we know, Apache Spark execute this in micro batches and in simple terms, every micro batch corresponds to one specific version of the state store. And how to customize Apache Spark State Store . So to customize it. You will have to define a class which is called state store provider. And do we have to register it in the state store provider class configuration entry. And the goal of this provider class is to return a state store implementation. And the state store implementation, we have to restart we will have to implement the state store API. And what to do that, why to do that. So the main reason for, for using our customized state store, like for example, the rocks DB, which is off heat based embedded database, is related. The main reason for that is related to the performances because the default implementation of the state store uses in-memory hash map, which you start on heap on the GBM. And if you have a lot of keys, you can encounter some unpredictable load and unpredictable garbage collection poses. And to overcome that issue, you can use the solutions like rocks DB to store your state elsewhere and do not impact the memory, which is shared with the task. And to implement the State store will have to define five categories. Five groups of methods. So the first group is called crud, and it’s very straightforward because it lets you to interact with the state entry. So you can read it with GERD, you can put or remove the States. And the second category is about transaction transactions management. So yes, it may sound surprising, but the state store API lets you to confirm that every changes you made in the current micro batch were correctly processed were correctly formed. And in that way, you, you will commit these, this specific version of the state store. The second at the third category of functions is about state exploration. And here too I think that you are a bit surprising to see iterator and get range range functions that can also be classified for, could be also classified in the first crowd group. But if you analyze the code, you will see that iterator ranch are mostly used to deal with the state exploration. The forth group is about state store metrics, where you can define some when you will define it, the information about your state store. So for example, the size , the approximate size taken by by all these storage states. The number of keys and things like that. Now that we’d be later exposed to you from, from the logs and the logging will be of course, performed by the driver. And finally, in the last category, you will find the maintenance job, which will be responsible for performing some housekeeping tasks and also, and they will see it in the default implementation. in the default implementation, it will be also responsible for performing the performance optimization tasks in case of state re-loading. But I will explain these concepts a bit later. So to start, let’s begin with the very basic operation of reading, creating, and manipulating the state in this crud category. And as you can see, it’s very straightforward because we start always by initializing the state store. Later, we apply, we retrieve the state for every input into the valve into the record. We apply, we combine this input record with the retrieved state. And later we will write a new version of the state. And how does it look under the hood? So under the hood, everything happens. Everything starts with map partitions, with state store function, which will return a state store RVD. And inside the state store RVD, the state store will be initialized with this create in it method that you can see on your screen. And later it will retrieve the good version for the given micro batch or epoch execution in case of continuous mode. And it’s, the score to get store will return the state store backend that you will use to deal with state. But stay RVD is not the single place interacting with the with the state store. Another one is a state store manager. So you have to know that from time to time, the state value changes between Apache Spark versions. It’s happened, for example, in this new Apache Spark three, where to fix a bug about a stream to stream left out and joins the community members decided to add an extra flag to the state store and therefore to let users to upgrade apache spark without losing the state. Because as you know, streaming applications are intended to run forever. The community added a new state or manager that handled this transition and which exposes the state store version corresponding to this, this initial state initially created States. So if some, someone, someone started with apache spark two to the state store manager, will return the will manipulate the state in the format corresponding to apache spark two. And if it started with Apache Spark three, it would return the format corresponding to this new version. And regarding state exploration. The operation is also quite straightforward because we are, the goal is to iterate over all states stored in the state store and apply some predicate on every process on every iterated item. And most of the time, this, this predicate will be water marked-based. So globally it means that the state will be removed when it’s all there done the watermark of the current query. And under the hood, it looks like that as you can see we’re at with our two read only methods. So get the range and iterator, and you can see interesting thing by the way that the get wrench in the default implementation of the state store interface, will call the iterator methods. So it means that at least us of the saying the ranges from the function are not used. By the way it also shows one direction that you can follow. If you decide to implement your state, your own state store backend. Because it’s better to have these states store as close as possible to the main memory. Because otherwise you will pay the cost of fetching them. for example, from the remote backend. And it may be inefficient in case of low, latest, like low-latency streaming applications. Regarding the field task, namely the state finalization, the workflow is a bit more complicated than in the two previous cases. Because once all input items are processed. once all expired states are removed from the state store, Apache Spark will call one of two callbacks. And this first callback will validate the state store version. And the second callback will be made, will be performed at the task level. And here Apache Spark will verify whether your state store was correctly processed or not. And under the hood, it looks like that. So Apache Spark will use one of two different iterators to process the input data. And these iterators are either completion iterator or next iterator. But even though they have different namings, they have one thing in common. Because you can specify a callback that will be invoked by the end of the iteration. So when there will be no more new records to process to return to the consumer application. And it’s the place where the states.com will happen. So it’s the place where you as the state store contribute owner will have to implement the confirmation mark for the current version. So it means that starting from this point, this state store is correct. And once the task is completed Apache Spark will call the task completion listener, that will verify whether the hash committed flag you saw at the beginning of this presentation. So the hash committed flag is closely related to the outcome of this commitment is true or false. So if it’s false, it means that something wrong happened during the iteration and the callback was not involved. And obviously in that case Apache Spark will call abort. On the other hand, if everything went well for this and for the other tasks of the processing, by the end Apache Spark will use the sequel metrics that you defined on top of your state store, and log them to the drivers log. And to make it possible, these sequel metrics are implemented as the accumulator. So they can live freely on different executors. But when, once we need the final result, they will be combined by fetching the results remotely on every practitioner. So on every state store. And the last point is about the state maintenance. So the state maintenance is an operation executed in a background thread, every maintenance interval, period. And how does it look with the API? So it’s very, it’s even simpler than previously because you it’s consists only of calling the domain this method. So in the default implementation, apache spark will use this do maintenance method to do two things. The first one is to clean up old accumulated and check pointed at state because these, the number of state check pointed state stores must be equal to the number of checkpoint metadata file. Otherwise you will not be able to reprocess your application seamlessly. And the second thing is about optimization. So you have to know that when Apache Spark moves on and manipulate the state, every time, so in every micro batch, it will only check point. It will only materialize the changes made in the given micro batch so that it will save some. It will accelerate the processing, but it can be costly. And you need to reprocess your application because it means that you will have to fetch these files, which are called the Delta files. One by one from the checkpoint location. And the rest of the state store map. And to overcome this issue, to optimize this process, spark will every X micro batches it will take the in memory map. So it will take all the state stores, all states from the state store and write them at once to the checkpoint location. That way, if you need to reprocess your pipeline or rest of your pipeline, and the action you want to restore is the snapshot version Apache Spark can simply take it and load directly to the main memory to the, memory, hash map, and not worry about fetching these small files one by one across the network. And to summarize, there are different points. So the first two points are about the iteration and get range. So as you saw in get ranch, there is no ranges, even though they are present in API. But they are not really used in the indie code for now. And the second point is about the iteration. As I said, it involves it’s involved in the state explorations. So the operation that will iterate over all states present in the state store. And it can limit you in the implementation choice, because if you, if you want it to, for example, use a completely remote storage for the state store backend, well, fetching the state every time across the network, maybe costly and may slow down your application. Even more than the unpredictable garbage collection that may happen in the definitely implementation in case of millions of keys on the executor. The two other points are about once again, iterator and put method. Because both use UnsafeRow and UnsafeRow pair objects that are equitable. So you will find that in this other talk of part function, but to give you an example, UnsafeRow is, can be re-used. So, for example, if you have 10 States and you don’t call the, the famous copy method of, of the UnsafeRow at the end of in your state, you can have only one value, one state that will correspond to the last processed state. So keep that in mind, keep that, that these two classes are mutable, and that they can be re-used and they to mitigate that, Be aware in the implementation of the iterator method, and also use the copy, in case if you need to store the unsafe road directly in your state store. You can still store, for example, the array of bytes, in this case, you will probably not use the copy. Next one is about the consistence awareness as you can see, there is one configuration property, which is involved, which is involved in a lot of places in instructors training. And this property is called in batches to retain. And it means it represents the number of batches that are the number of metadata related to the micro batches that are retained into checkpoint location. So it applies as well on the state’s talk presented in this talk. But it applies also on the metadata files like commits and offsets. And it’s better to keep these two aligned otherwise it can be complicated to reprocess your application to restart your application in case of failure, without any problems. The next points are about the state reloading semantics. So first point is about the trade of that you have to make. When you will have to decide whether you prefer to write every time only the changed states or that you prefer from time to time also check point the whole state in order to recover it faster in case of application restart. And the second point is about the delete. So let’s take an example, very simple in the micro batch number one, you create the state for the key one, and you delight the state in the micro batch number nine. From nine to one, one to nine, you wrote only the Delta changes. So only the changes occurred in this given micro batch. And now if you don’t materialize the delete action, when you will restore from for example, the nine, ninth micro batch, you will still retrieve your key one that was deleted. So keep that in mind. And in the depth of implementation, you will find that the both actions, so the updates and delete are materialized in the Delta files. And finally, the two properties related a bit on the explorations. So the state store implementation is immutable. Once you choose on your state store backend, you cannot change it across application restarts. So if your application stops, you have no way to change the provider class of your state store, because it’s hard coded into checkpoint metadata. You can always try to tweak that a bit by manipulating these checkpoint check pointed file directly. But in this scenario, you will also have to migrate the whole state from your initial backend to your new one. And finally, when the get store method is called, remember that the sate store that you are working on in the given in the current micro batch will be used as the input for the next micro batch. So play with this version number very carefully in your implementation. And we are very close to the end. So, as I said, I we’ll publish some follow up posts, blog posts about two remaining custom components that we can customize. And I will also complete the things that I couldn’t say in this talk due to this 30 minutes limitation. Under this, this talk on the blog. We will also find some code on the on the custom state store checkpoint managers and file committers in my own this data AI summit report. And if you are interested in other components, or if you want to know how to customize other components, I invite you to check different talks that were given in previous spar at that time Spark and Dice summit. So from last year, you will find that talk for data sources. This year you will find a talk about plugins that are one of the new features of Apache Spark three. And I think that two years ago there was a talk about, Apache Spark plus. And I also blogged a bit about that on the blog. So thank you very much and have a nice Summit.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About Bartosz Konieczny

Octo Technology

Bartosz is a data engineer enjoying working with Apache Spark and cloud data services. By day he works as a data engineering consultant at OCTO Technology. By night, he shares his data engineering findings on waitingforcode.com and becomedataengineer.com.