Stateful stream processing refers to processing a continuous stream of events in real-time while maintaining state based on the events seen so far. This allows the system to track changes and patterns over time in the event stream, and enables making decisions or taking actions based on this information.
Stateful stream processing in Apache Spark Structured Streaming is supported using built-in operators (such as windowed aggregation, stream-stream join, drop duplicates etc.) for predefined logic and using flatMapGroupWithState or mapGroupWithState for arbitrary logic. The arbitrary logic allows users to write their custom state manipulation code in their pipelines. However, as the adoption of streaming grows in the enterprise, more complex and sophisticated streaming applications demand several additional features to make it easier for developers to write stateful streaming pipelines.
In order to support these new, growing stateful streaming applications or operational use cases, the Spark community is introducing a new Spark operator called transformWithState. This operator will allow for flexible data modeling, composite types, timers, TTL, chaining stateful operators after transformWithState, schema evolution, reusing state from a different query and integration with a host of other Databricks features such as Unity Catalog, Delta Live Tables, and Spark Connect. Using this operator, customers can develop and run their mission-critical, complex stateful operational use-cases reliably and efficiently on the Databricks platform using popular languages such as Scala, Java or Python.
Many event-driven applications rely on performing stateful computations to trigger actions or emit output events that are usually written to another event log/message bus such as Apache Kafka/Apache Pulsar/Google Pub-Sub etc. These applications usually implement a state machine that validates rules, detects anomalies, tracks sessions, etc., and generates the derived results, which are usually used to trigger actions on downstream systems:
Examples of such applications include User Experience Tracking, Anomaly Detection, Business Process Monitoring, and Decision Trees.
Apache Spark now introduces transformWithState, a next-generation stateful processing operator designed to make building complex, real-time streaming applications more flexible, efficient, and scalable. This new API unlocks advanced capabilities for state management, event processing, timer management and schema evolution, enabling users to implement sophisticated streaming logic with ease.
We are introducing a new layered, flexible, extensible API approach to address the aforementioned limitations. A high-level architecture diagram of the layered architecture and the associated features at various layers is shown below.
As shown in the figure, we continue to use the state backends available today. Currently, Apache Spark supports two state store backends:
The new transformWithState operator will initially be supported only with the RocksDB state store provider. We make use of various RocksDB functionality around virtual column families, range scans, merge operators, etc. to ensure optimal performance for the various features used within transformWithState. On top of this layer, we build another abstraction layer that uses the StatefulProcessorHandle to work with composite types, timers, query metadata etc. At the operator level, we enable the use of a StatefulProcessor that can embed the application logic used to deliver these powerful streaming applications. Finally you can use the StatefulProcessor within Apache Spark queries based on the DataFrame APIs.
Here is an example of an Apache Spark streaming query using the transformWithState operator:
With transformWithState, users can now define multiple independent state variables within a StatefulProcessor based on the object-oriented programming model. These variables function like private class members, allowing for granular state management without requiring a monolithic state structure. This makes it easy to evolve application logic over time by adding or modifying state variables without restarting queries from a new checkpoint directory.
Users can now register timers to trigger event-driven application logic. The API supports both processing time (wall clock-based) and event time (column-based) timers. When a timer fires, a callback is issued, allowing for efficient event handling, state updates, and output generation. The ability to list, register, and delete timers ensures precise control over event processing.
State management is now more intuitive with built-in support for composite data structures:
Spark automatically encodes and persists these state types, reducing the need for manual serialization and improving performance.
For compliance and operational efficiency, transformWithState introduces native time-to-live (TTL) support for state variables. This allows users to define expiration policies, ensuring that old state data is automatically removed without requiring manual cleanup.
With this new API, stateful operators can now be chained after transformWithState, even when using event-time as the time mode. By explicitly referencing event-time columns in the output schema, downstream operators can perform late record filtering and state eviction seamlessly—eliminating the need for complex workarounds involving multiple pipelines and external storage.
Users can initialize state from existing queries, making it easier to restart or clone streaming jobs. The API allows seamless integration with the state data source reader, enabling new queries to leverage previously written state without complex migration processes.
transformWithState supports schema evolution, allowing for changes such as:
Apache Spark automatically detects and applies compatible schema updates, ensuring queries can continue running within the same checkpoint directory. This eliminates the need for full state rebuilds and reprocessing, significantly reducing downtime and operational complexity.
For easier debugging and observability, transformWithState is natively integrated with the state data source reader. Users can inspect state variables and query state data directly, streamlining troubleshooting and analysis, including advanced features such as readChangeFeed etc.
The transformWithState API is available now with the Databricks Runtime 16.2 release in No-Isolation and Unity Catalog Dedicated Clusters. Support for Unity Catalog Standard Clusters and Serverless Compute will be added soon. The API is also slated to be available in open-source with the Apache Spark™ 4.0 release.
We believe that all the feature improvements packed within the new transformWithState API will allow for building a new class of reliable, scalable and mission-critical operational workloads powering the most important use-cases for our customers and users, all within the comfort and ease-of-use of the Apache Spark DataFrame APIs. Importantly, these changes also set the foundation for future enhancements to built-in as well as new stateful operators in Apache Spark Structured Streaming. We're excited about the state management improvements in Apache Spark™ Structured Streaming over the past few years and look forward to the planned roadmap developments in this area in the near future.
You can read more about stateful stream processing and transformWithState on Databricks here.