Skip to main content

Introducing Easier Change Data Capture in Apache Spark™ Structured Streaming

Share this post

Summary

  • Simplified state tracking: New change feed and snapshot features in the State Reader API simplify debugging and analyzing state changes in Apache Spark™ Structured Streaming.
  • Accelerated development: Track state changes across micro-batches with minimal queries and reconstruct state snapshots for precise diagnostics.
  • Enhanced accessibility: Streamline insights for both engineers and business users by enabling seamless integration with dashboards and analysis tools.

This blog describes the new change feed and snapshot capabilities in Apache Spark™ Structured Streaming’s State Reader API. The State Reader API enables users to access and analyze Structured Streaming's internal state data. Readers will learn how to leverage the new features to debug, troubleshoot, and analyze state changes efficiently, making streaming workloads easier to manage at scale.

A simple way to handle state changes

In the ever-evolving landscape of data engineering, Apache Spark Structured Streaming has become a cornerstone for processing real-time data at scale. However, as streaming workloads grow in complexity, so does the challenge of developing, debugging, and troubleshooting these systems. In March 2024, Databricks took a significant step forward by introducing the State Reader API, a powerful tool designed to address these challenges head-on by making it easy to query state data and metadata.

Databricks has introduced significant enhancements to the State Reader API, building on its existing capabilities to further streamline state tracking and analysis. These improvements leverage the state store's changelog data to provide a change feed with output in the standard Change Data Capture (CDC) format. Another new capability helps generate a view of the state using preferred snapshots in the checkpoint directory.

In this blog post, we'll delve into these new features, demonstrating how they streamline state change tracking, data transformation auditing, and state snapshot reconstruction. The change feed's benefits accelerate development by offering a simpler method to observe state value changes over time. While possible with the previous State Reader API version, it required more code to iterate and inspect different state versions. Now, just a few options suffice to build the change feed.

Beyond development and testing, these enhancements facilitate data accessibility for analysts. For example, a scheduled query could now easily populate AI/BI Dashboard visualizations, bridging the gap between complex streaming data and actionable insights.

Prerequisites

The State Reader API Change Feed requires that delta-based state checkpointing be enabled. Here, "delta" means "diff," not Delta Lake. The HDFS-backed state store implementation uses delta-based state checkpointing by default. When using the RocksDB-based state store implementation, an additional Spark config is needed to enable changelog checkpointing.

State Reader API review

The basic statestore format has the following options:

  • batchId: the target batch for which we want to read state store values. If not specified, the latest batchId is used by default.
  • operatorId: the target operator for which state store values are sought. The default value is 0. If multiple stateful operators exist in the stream, the other operators' state can be accessed using this option.
  • storeName: This represents the target state store name from which to read. This option is used when the stateful operator uses multiple state store instances. Either storeName or joinSide must be specified for a stream-steam join, but not both.
  • joinSide: This option is used when users want to read the state from stream-stream join. If this option is used, the expected option value supplied by the user is "right" or "left".

The output DataFrame schema includes the following columns:

  • key: the key for a stateful operator record in the state checkpoint.
  • value: the value for a stateful operator record in the state checkpoint.
  • partition_id: the checkpoint partition containing the stateful operator record.

The basic required options for the statestore format are helpful for understanding what was in the statestore for a given batchId.

Example

The example below shows how the statestore Spark data source format helps us query state store data. Imagine that we're investigating userId 8's count value. Before the new State Reader API options, which we will review in the next section, if we wanted to observe the change of userId 8's data across micro-batches, we would have to re-run the query below for various batchIds (see line 3 of the first cell below).

Before the new options, observing the change of a key's value was tedious and would require multiple queries. Let's now look at how the new options make this easier.

Introducing new options

The following new options are part of the new State Reader API change feed capabilities:

  Option Comment
Change feed
  readChangeFeed When "true" enables the change feed output.
  changeStartBatchId Required. The batchId at which the change feed should start.
  changeEndBatchId Optional. The last batch to use in the change feed.
Snapshot
  snapshotPartitionId Required when using snapshotStartBatchId. If specified, only this specific partition will be read.
  snapshotStartBatchId Required when using snapshotPartitionId.
  snapshotEndBatchId or batchId Optional. The last batch to use in the generation of the snapshot values.

Be mindful of the values used for the batchId options. By default, one hundred historical checkpoints and related state files are retained. The property spark.sql.streaming.minBatchesToRetain can be used to override the default value. If you try to access a batch's state data that has aged out and no longer exists, you will see an error message like this one: [STDS_OFFSET_LOG_UNAVAILABLE] The offset log for 92 does not exist, checkpoint location: /Volumes/mycheckpoint-path.

Change feed example

In the example below, we use the change feed to observe changes for the key userId value 8. The change_type field can be helpful during development, debugging, or when investigating a production data issue. The change feed data lets you quickly see how a key's value changed over several micro-batches. In the example below, where the state key includes a window, you can see how the partition_id changed too.

Snapshot example

State store corruption is unlikely due to Apache Spark's fault tolerance, where micro-batches are planned (offsets get written to the checkpoint location) and commits are completed (and synced with state data to the checkpoint location). However, human error or bugs are always possible. The snapshot feature of the State Reader API can be a helpful tool to reconstruct the state from changelog data, bypassing the subsequent snapshot files. The feature does require a starting batchId (via the snapshotStartBatchId option) for which a snapshot file exists. Beginning with the snapshotStartBatchId batchId, the snapshot feature of the State Reader API will construct a picture of the state based on the starting batchId and ending at the batchId specified with the snapshotEndBatchId option.

If using the RocksDB state store, the underlying file structure looks like this:

To build a picture of the state as of batch 1800, using the starting snapshot of the 1740.zip snapshotted state, you would use code that looks like this:

You may notice that in the picture listing the checkpoint files, the snapshotted data is in 1740.zip, whereas when using the State Reader API, we used a snapshotStartBatchId of 1741. The reason is that the file-naming convention uses a 1-base index, whereas the batchId numbers in the Spark UI start at 0.

Conclusion

The new features of the State Reader API open up new opportunities for auditing, exploring, and visualizing state changes. The new features will help developers be more efficient because, otherwise, separate queries are needed to extract the state values across a range of batches. However, the potential beneficiaries of the new feature go beyond development and support staff. Business stakeholders may also be interested in the insights possible by looking at the change feed data. In either case, building queries and dashboards to surface the data is now easier, thanks to the State Reader API enhancements.

In conclusion, the change feed allows for the detailed tracking of state changes across micro-batches, offering invaluable insights during the development and debugging phases. The snapshot feature is a handy diagnostic tool, enabling engineers to reconstruct the state from changelog files to build a complete view of the state at a specific point (batchId).

You can read more about the State Reader API here, or view a demo here.

Try Databricks for free

Related posts

Announcing the State Reader API: The New "Statestore" Data Source

March 28, 2024 by Craig Lukasik and Jungtaek Lim in
Databricks Runtime 14.3 includes a new capability that allows users to access and analyze Structured Streaming 's internal state data: the State Reader...

Performance Improvements for Stateful Pipelines in Apache Spark Structured Streaming

Introduction Apache Spark™ Structured Streaming is a popular open-source stream processing platform that provides scalability and fault tolerance, built on top of the...

Multiple Stateful Operators in Structured Streaming

August 6, 2023 by Angela Chu and Jungtaek Lim in
In the world of data engineering, there are operations that have been used since the birth of ETL. You filter. You join. You...
See all Platform posts