Skip to main content

Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

Now Available on Databricks Runtime 4.0
Joseph Torres
Michael Armbrust
Tathagata Das
Shixiong Zhu
Share this post

Import this notebook on Databricks

Structured Streaming in Apache Spark 2.0 decoupled micro-batch processing from its high-level APIs for a couple of reasons. First, it made developer’s experience with the APIs simpler: the APIs did not have to account for micro-batches. Second, it allowed developers to treat a stream as an infinite table to which they could issue queries as they would a static table.

To take advantage of this, we worked to introduce a new millisecond low-latency mode of streaming called continuous mode in Apache Spark 2.3, now available in Databricks Runtime 4.0 as part of Databricks Unified Analytics Platform.

image3-1

In this blog, we are going to illustrate the use of continuous processing mode, its merits, and how developers can use it to write continuous streaming applications with millisecond low-latency requirements. Let’s start with a motivating scenario.

Low-latency Scenario

Suppose we want to build a real-time pipeline to flag fraudulent credit card transactions. Ideally, we want to identify and deny a fraudulent transaction as soon as the culprit has swiped his/her credit card. However, we don’t want to delay legitimate transactions as that would annoy customers. This leads to a strict upper bound on the end-to-end processing latency of our pipeline. Given that there are other delays in transit, the pipeline must process each transaction within 10-20 ms.

Let’s try to build this pipeline in Structured Streaming. Assume that we have a user-defined function “isPaymentFlagged” that can identify the fraudulent transactions. To minimize the latency, we’ll use a 0 second processing time trigger indicating that Spark should start each micro batch as fast as it can with no delays. At a high level, the query looks like this.

payments \
  .filter("isPaymentFlagged(paymentId)") \
  .writeStream \
   {...}
  .trigger(processingTime = "0 seconds") \
  .start()

You can see the complete code by downloading and importing this example notebook to your Databricks workspace (use a Databricks Community Edition). Let’s see what end-to-end latency we get.

image6-1

The records are taking more than 100 ms to flow through Spark! While this is fine for many streaming pipelines, this is insufficient for this use case. Can our new Continuous Processing mode help us out?

payments \
  .filter("isPaymentFlagged(paymentId)") \
  .writeStream \
   {...}
  .trigger(continuous = "5 seconds") \
  .start

image5-2

Now we are getting less 1 ms latency -- more than two orders of magnitude improvement and well below our target latency! To understand why this latency was so high with micro-batch processing, and how continuous processing helped, we’ll have to dig into the details of the Structured Streaming engine.

Micro-Batch Processing

Structured Streaming by default uses a micro-batch execution model. This means that the Spark streaming engine periodically checks the streaming source, and runs a batch query on new data that has arrived since the last batch ended. At a high-level, it looks like this.

image7-1

In this architecture, the driver checkpoints the progress by saving the records offsets to a write-ahead-log, which may be then used to restart the query. Note that the range offsets to be processed in the next micro-batch is saved to the log before the micro-batch has started in order to get deterministic re-executions and end-to-end semantics. As a result, a record that is available at the source may have to wait for the current micro-batch to be completed before its offset is logged and the next micro-batch processes it. At the record level, the timeline looks like this.

image1-2

This results in latencies of 100s of milliseconds at best, between the time an event is available at the source and when the output is written to the sink.

We originally built Structured Streaming with this micro-batch engine to easily leverage existing batch processing engine in Spark SQL which had already been optimized for performance (see our past blogs on code generation and Project Tungsten). This allowed us to achieve high throughput with latencies as low as 100 ms. Over the past few years, while working with thousands of developers and hundreds of different use cases, we have found that second-scale latencies are sufficient for most practical streaming workloads such as ETL and real-time monitoring. However, some workloads (e.g., the aforementioned fraud detection use case) do benefit from even lower latencies and that motivated us to build the Continuous Processing mode. Let us understand how this works.

Continuous Processing

In Continuous Processing mode, instead of launching periodic tasks, Spark launches a set of long-running tasks that continuously read, process and write data. At a high level, the setup and the record-level timeline looks like these (contrast them with the above diagrams of micro-batch execution).

image2-2

image4-2

Since events are processed and written to sink as soon as they are available in the source, the end-to-end latency is a few milliseconds.

Furthermore, the query progress is checkpointed by an adaptation of the well-known Chandy-Lamport algorithm. Special marker records are injected into the input data stream of every task; we call them “epoch markers” and the gap between them as “epochs.” When a marker is encountered by a task, the task asynchronously reports the last offset processed to the driver. Once the driver receives the offsets from all the tasks writing to the sink, it writes them to the aforementioned write-ahead-log. Since the checkpointing is completely asynchronous, the tasks can continue uninterrupted and provide consistent millisecond-level latencies.

Experimental Release in Apache Spark 2.3.0

In the Apache Spark 2.3.0, the Continuous Processing mode is an experimental feature and a subset of the Structured Streaming sources and DataFrame/Dataset/SQL operations are supported in this mode. Specifically, you can set the optional Continuous Trigger in queries that satisfy the following conditions:

  • Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console (memory and console are good for debugging).
  • Has only map-like operations (i.e., selections and projections like select, where, map, flatMap, filter,)
  • Has any SQL function other than aggregate functions, and current-time-based functions like current_timestamp() and current_date().

For more information, refer to the following:

Closing Thoughts

With the release of Apache Spark 2.3, developers have a choice of using either streaming mode—continuous or micro-batching—depending on their latency requirements. While the default Structure Streaming mode (micro-batching) does offer acceptable latencies for most real-time streaming applications, for your millisecond-scale latency requirements, you can now elect for the continuous mode.

Import this Continuous Processing mode notebook in Databricks see it for yourself.

Try Databricks for free

Related posts

See all Open Source posts