Skip to main content

How to Monitor Streaming Queries in PySpark

Hyukjin Kwon
Karthik Ramasamy
Alexander Balikov
Share this post

Streaming is one of the most important data processing techniques for ingestion and analysis. It provides users and developers with low latency and real-time data processing capabilities for analytics and triggering actions. However, monitoring streaming data workloads is challenging because the data is continuously processed as it arrives. Because of this always-on nature of stream processing, it is harder to troubleshoot problems during development and production without real-time metrics, alerting and dashboarding.

Structured Streaming in Apache Spark™ addresses the problem of monitoring by providing:

Until now, the Observable API has been missing in PySpark, which forces users to use the Scala API for their streaming queries to avail the functionality of alerting and dashboarding with other external systems. The lack of this functionality in Python has become more critical as the importance of Python grows, given that almost 70% of notebook commands run on Databricks are in Python.

In Databricks Runtime 11, we're happy to announce that the Observable API is now available in PySpark. In this blog post, we introduce the Python Observable API for Structured Streaming, along with a step-by-step example of a scenario that adds alerting logic into a streaming query.

Observable API

Developers can now send streaming metrics to external systems, e.g., for alerting and dashboarding with custom metrics, using a combination of the streaming query listener interface and the Observable API in PySpark. The Streaming Query Listener interface is an abstract class that has to be inherited and should implement all methods as shown below:

from pyspark.sql.streaming import StreamingQueryListener


class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` will always be
        latest no matter when this method is called. Therefore, the status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may be changed before/when you process the event.
        For example, you may find :class:`StreamingQuery`
        is terminated when you are processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass


my_listener = MyListener()

Note that they all work asynchronously.

  • StreamingQueryListener.onQueryStarted is triggered when a streaming query is started, e.g., DataStreamWriter.start.
  • StreamingQueryListener.onQueryProgress is invoked when each micro-batch execution is finished.
  • StreamingQueryListener.onQueryTerminated is called when the query is stopped, e.g., StreamingQuery.stop.

The listener has to be added in order to be activated via StreamingQueryManager and can also be removed later as shown below:

spark.streams.addListener(my_listener)
spark.streams.removeListener(my_listener)

In order to capture custom metrics, they have to be added via DataFrame.observe. The custom metrics are defined as arbitrary aggregate functions such as count("value") as shown below.

df.observe("name", count(column), ...)

Error Alert Scenario

In this section, we will describe an example of a real world use case with the Observable API. Suppose you have a directory where new CSV files are continuously arriving from another system, and you have to ingest them in a streaming fashion. In this example, we will use a local file system for simplicity so that the API can be easily understood. The code snippets below can be copied and pasted in the pyspark shell for you to run and try out.

First, let's import the necessary Python classes and packages, then create a directory called my_csv_dir that will be used in this scenario.

import os
import shutil
import time
from pathlib import Path

from pyspark.sql.functions import count, col, lit
from pyspark.sql.streaming import StreamingQueryListener

# NOTE: replace `basedir` with the fused path, e.g., "/dbfs/tmp" in Databricks
# notebook.
basedir = os.getcwd()  # "/dbfs/tmp"

# My CSV files will be created in this directory later after cleaning 'my_csv_dir'
# directory up in case you already ran this example below.
my_csv_dir = os.path.join(basedir, "my_csv_dir")
shutil.rmtree(my_csv_dir, ignore_errors=True)
os.makedirs(my_csv_dir)

Next, we define our own custom streaming query listener. The listener will alert when there are too many malformed records during CSV ingestion for each process. If the malformed records are more than 50% of the total count of processed records, we will print out a log message. However, in production scenarios, you can connect to the external systems instead of simply printing out.

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")


# Add my listener.
my_listener = MyListener()
spark.streams.addListener(my_listener)

To activate the listener, we add it before the query in this example. However, it is important to note that you can add the listener regardless of the query start and termination because they work asynchronously. This allows you to attach and detach to your running streaming queries without halting them.

Now we will start a streaming query that ingests the files in my_csv_dir directory. During processing, we also observe the number of malformed records and processed records. The CSV data source stores malformed records at _corrupt_record, by default, so we will count the column for the number of malformed records.

# Now, start a streaming query that monitors 'my_csv_dir' directory.
# Every time when there are new CSV files arriving here, we will process them.
my_csv = spark.readStream.schema(
    "my_key INT, my_val DOUBLE, _corrupt_record STRING"
).csv(Path(my_csv_dir).as_uri())
# `DataFrame.observe` computes the counts of processed and malformed records,
# and sends an event to the listener.
my_observed_csv = my_csv.observe(
    "metric",
    count(lit(1)).alias("cnt"),  # number of processed rows
    count(col("_corrupt_record")).alias("malformed"))  # number of malformed rows
my_query = my_observed_csv.writeStream.format(
    "console").queryName("My observer").start()

Now that we have defined the streaming query and the alerting capabilities, let's create CSV files so they can be ingested in a streaming fashion:

# Now, we will write CSV data to be processed in a streaming manner on time.
# This CSV file is all well-formed.
with open(os.path.join(my_csv_dir, "my_csv_1.csv"), "w") as f:
    _ = f.write("1,1.1\n")
    _ = f.write("123,123.123\n")

time.sleep(5)  # Assume that another CSV file arrived in 5 seconds.

# Ouch! it has two malformed records out of 3. My observer query should alert it!
with open(os.path.join(my_csv_dir, "my_csv_error.csv"), "w") as f:
    _ = f.write("1,1.123\n")
    _ = f.write("Ouch! malformed record!\n")
    _ = f.write("Arrgggh!\n")

time.sleep(5)  # OK, all done. Let's stop the query in 5 seconds.
my_query.stop()
spark.streams.removeListener(my_listener)

Here we will see that the query start, termination and processes are logged properly. Because there are two malformed records in the CSV files, the alert is raised properly with the following error message:

...
ALERT! Ouch! there are too many malformed records 2 out of 3!
...

Conclusion

PySpark users are now able to set their custom metrics and observe them via the streaming query listener interface and Observable API. They can attach and detach such logic into running queries dynamically when needed. This feature addresses the need for dashboarding, alerting and reporting to other external systems.

The Streaming Query Listener interface and Observable API are available in DBR 11 Beta, and expected to be available in the future Apache Spark. Try out both new capabilities today on Databricks through DBR 11 Beta.

The Streaming Query Listener interface and Observable API are available in DBR 11 Beta, and expected to be available in the future Apache Spark.

Try Databricks for free

Related posts

See all Engineering Blog posts