Structured Streaming Use-Cases at Apple

May 27, 2021 11:00 AM (PT)

Download Slides

Structured streaming plays an important role in current data infrastructure. In response to tremendous streaming requirements, we have actively worked on developing structured streaming in Spark in the past few months. In this talk, Kristine Guo and Liang-Chi Hsieh will detail some of the issues that arose when applying structured streaming and what was done to address them.  Specifically, they will cover:  

  • How streaming applications that need to maintain large amounts of state require a scalable state store provider as an alternative to the in-memory solution built in with Spark. 
  • Structured streaming is currently missing session window support and although a map/flatMapWithState API may be used to implement a custom window, this approach does not generalize well across applications and is hard to maintain. 
  • Why we focused on structured streaming efforts like RocksDB state store and session windowing.  

Finally, they will detail how these features can help to compute aggregates over dynamic batches with minimum size requirements and perform stream-stream joins, while supporting high RPS and throughput.

In this session watch:
Kristine Guo, Software Engineer, Apple
Liang-Chi Hsieh, Software Engineer, Apple

 

Transcript

Liang-Chi Hsieh: Hi, everyone. Welcome to our talk. Today’s talk we will introduce our use-case in Structured Streaming and how we revive and enhance the Structured Streaming features. I’m Liang-Chi Hsieh, I’m a Spark Committer focusing in [Cisco] and Structured Streaming. I’m also a software engineer at Apple. Welcome to connect through my GitHub and LinkedIn profiles.

Kristine Guo: And I’m Kristine. And I’m also a software engineer at Apple. I currently focus on cloud platform technologies and I’m working on developing high-skilled backend systems. You’re also welcome to connect with me on LinkedIn, as well.

Liang-Chi Hsieh: In today’s agenda, first, we will talk about how we revive previous Structured Streaming efforts in the community. Next, we talk about how we pull in the new enhancement to Structured Streaming feature. And then lastly, we talk about our use-case.
There are some features in Structured Streaming that matter to us, including new built-in StateStore, session window, and stateful task scheduling enhancement. And also, checkpoint enhancement.
First, we will talk about how we revive previous Structured Streaming efforts. There are some features in Structured Streaming. There are some previous efforts in the community working on that. But unfortunately, they are not in Spark in the end. Driven by our use-case, we want to revive this use for features in Structured Streaming.
First, let’s talk about StateStore. What is StateStore? StateStore is a component used for state management in Structured Streaming for stateful operators like a streaming aggregation joint. For these stateful operators, they can get/put key value pairs of StateStore. In each micro batch, StateStore will check point states to distributed files tested. In case of a job failure, Spark can reload a check state back to StateStore.
So far, there’s only one built-in StateStore in Spark. It’s a HDFS backed StateStore. It still stays in an in-memory map and also checkpoint to HDFS-compatible file system. So there’s some disadvantages on this built-in StateStore. First, it is limited by executor memory, so it obviously is not good for large state use-cases. And even you enough memory, it could also impact other task due to the memory issue.
So it sounds like we need a new built-in StateStore. We are going to revive a RocksDB based StateStore as another built-in StateStore in Structured Streaming. Why? Because there are more and more streaming applications requiring larger states. And we also know that RocksDB is widely used in the industry. So we create a task, Spark-34198, to add the RocksDB StateStore as an external module, and we receive all the positive response from the community.
We’ve revisited some current open source RocksDB StateStore implementation. Currently, they are two implementation. One is from Chermenin and one is from Qubole. The Qubole one is also the one previous work submitted for request to Spark. The good news is we know that the Databricks want to open source their internal RocksDB StateStore in the future. It is ongoing work.
We run a simple benchmark on current open source RocksDB StateStore. The numbers show Qubole one is somehow faster than a Chermenin. So currently we use the Qubole one in internal experiment. We will run a benchmark on Databricks open source implementation in the future.
Next let’s talk about the session window. Structured Streaming supports a kind of window operation called event-time window. It basically groups events to fixed window, but there is another kind of a window operation called session window. It groups event with different session by the distance between the event. Each session is separated by a so-called session gap.
Previously, there is also another effort in the Spark Structured Streaming to add a session window. It is Spark-108816: EventTime based sessionization, but it is not in the Spark two. And this ticket keeps in active in last few years. So we know that session window, this feature is available in other streaming engines, but it is missing at Structured Streaming in the last few years. Driven by our use-case, we want to revive this feature as a building window feature.
Let’s talk about something about a session window internal. Session window has some much manipulation state, like a session initialization, restoring, merging, and saving. All these operations depend on internal StateStore format. We want the StateStore to be able to efficiently retrieve all session state for a specific session key, and also be able to partially update the start time and duration of the affected windows.
So far, there are three candidates internal StateStore format. First one is, I think, is the simplest one. It’s very easy to implement. You can see you just need one StateStore, keep the session key and a list of the session as value. But as each store state, as a long list, it could have some memory issues if you have too many sessions per value. And also, it does not support partial update.
The second one is so-called double list approach. It needs three StateStores to handle the states. First one is a session key and the earliest start time. And the second StateStore uses the key and the start time and the double list between the start time per key. And the last one [inaudible] key, and each start time is StateStore key and the value is each session window.
The good thing about this approach is it does not store a list. So there’s no complex structure, and it is efficient to traverse. But however, this structure is more complicated and is harder to maintain. The last one is to use two StateStores. First one is using a session key and a start time list as the value. And the second StateStore uses the key and each start time to match to each session window.
Also, we still need to store a list of session start times, but as each start time. So we can be fine from the memory issues here. Currently we implement the list StateStore format and co-work with the community on the session window poll request.
Next. Let’s talk about a new enhancement we plan to Structured Streaming. First one is stateful task scheduling enhancement. We know that Spark task scheduling is not designed for stateful tasks. StateStore location in each micro batch is assigned arbitrarily. So the change of the StateStore in next micro bath could cause frequent reloading from the remote file system. So it is an obstacle of future checkpoint enhancement in our future works.
To address this issue, we propose a few possible approaches. First one is leveraging existing data locality preferences as a simple workaround. And the second one we propose to customize the Spark task scheduling behavior. We want to add a plugin API to Spark. So what’s the new resource overcoming? A Spark task scheduler wants to schedule a task. Our plugin can suggest the scheduling preference to the scheduler and to ask Spark to schedule task for us is the expectation we need.
So how the task scheduling plugin might work. If in a micro batch, four states are scheduled on four different executors, then in the next batch plus one, our scheduling plugin wants to schedule a state at the same executor as the previous batch. So Spark don’t need to reload the state from HDS back to executor StateStore.
Next, let’s talk about how we use this feature in our use-case.

Kristine Guo: Hi, so thank you to Liang-Chi for walking us through all these new enhancements to Structured Streaming. Now I’d like to take this chance to talk at a high level about a use-case at Apple that is in part motivated and in the future will utilize all of these new features.
So this use-case uses a dual screen model where we have two data streams reading from the same data source. This data source consists of rows that are then assigned to application-defined batches. These batches are assigned in the original data source, and importantly, they are dynamic in both their count and the duration through which they will be written to the two streams.
After each of the two individual data streams processes and aggregates these batches, we then want to utilize a stream-stream joint to match the aggregates between these two streams to perform the final processing. Importantly here, because we are trying to process the one data source in two different streams. We must account for potential lag between the streams.
So here is a high level schematic of the use-case that I am talking about. So as you can see, we have a single data source with custom batch [UIDs]. These rows are then written to two parallel servers, both of which will aggregate over those batches. And then finally those aggregates will be joined later downstream based off of this batch.
A few things that makes this server architecture important is that we have a relatively high throughput. We were expecting petabytes of data per day. Furthermore, we have a high RPS as well on the order of tens of thousands of data coming in at once. The size of the data is also varying, ranging from one kilobyte in size to one megabyte in size. And finally, all of these factors together lead to exerting high memory pressure at the stream-stream and join, so the StateStore will be important here.
Luckily, Spark’s Structured Streaming with existing, as well as with these future enhancements, will be able to handle a lot of these different challenges. For example, accounting for potential lag is already solved by an existing feature, which is watermarking. To handle the dynamic batch aggregation, rather than relying on fixed time intervals or fixed counts intervals, instead we’re going to use these session windows that Liang-Chi just talked. And finally, to help with the stream-stream join pressure, we are going to be moving to a RocksDB-based StateStore.
Thank you. Hopefully you were able to draw insight from the new enhancements and features that we have planned for Structured Streaming, as well as how they can relate to actual use-cases at a high level. Your feedback is very important to us. So please feel free to reach out at any point and do not forget to rate and review the sessions as well. We are now happy to answer any questions. Thank you.

Kristine Guo

Kristine is a software engineer at Apple focused on cloud platform technologies. She currently works on developing high scale backend systems. Prior to joining Apple, Kristine obtained her Bachelor's ...
Read more

Liang-Chi Hsieh

Liang-Chi Hsieh is an Apache Spark Committer and an open source and big data engineer at Apple. Most of his contributions to Apache Spark are in SQL, MLlib modules. He recently works on Structured Str...
Read more