All production environment requires monitoring and alerting. Apache Spark also has a configurable metrics system in order to allow users to report Spark metrics to a variety of sinks. Prometheus is one of the popular open-source monitoring and alerting toolkits which is used with Apache Spark together. Previously, users can use
Apache Spark 3.0.0 will add another easy way to support Prometheus for general use cases. In this talk, we will talk about the followings and show a demo.
Currently, Apache Spark exposes metrics at Master/Worker/Driver/Executor to integrate with the existing Prometheus server easily with a less effort. This is already available with Apache Spark 3.0.0-preview and preview2. You can try it right now.
– Hi, everyone. Welcome to this session. We are gonna talk about native support of Prometheus monitoring in Apache Spark 3.
My name is Dongjoon Hyun. I’m an Apache Spark PMC member and committer, and also have been contributing to Apache ORC and REEF projects. On Spark committee, I am focusing on Spark 3 Core and Kubernetes areas. And, move to DB. – Thanks, Dongjoon. I am a colleague of Dongjoon. We work together on many interesting Spark projects. I am also a Spark PMC, System ML PMC, Yunikorn Committer, Bahir Committer, but I’m not showing off my credentials here, but I just want to show how much I love open source. And most of you work in Apache Spark projects, but I’m also involved with other Apache projects to incubate that.
In this talk, we want to go over monitoring in Spark. Notice that many tech companies are using Spark to ETL, find a visiting site from the Beta or build some machinery models to make billion-dollar decisions. You want a Spark job running. You want to get the data or the model processed as fast as possible, and also be accurate. Maybe it’s less visible to a user, but it is still important to you as Spark developers, that you are running your Spark application efficiently and you are not wasting time and money under the hood of your assistant. So there are a couple methods that people use to monitor Spark jobs. Spark UI is probably the number one method people use. Simply open http, IP address, 4040 in the web browser, and you can see many useful information in real time while the job is running. You can see the number of executors, memory usage, storage usage, and even execution plan for SQL queries, jobs, stages, tasks, metrics, and also job collection information will be shown. So please raise your hand if you have used Spark UI to debug your Spark job. One, two, three, pretty much all of you. Okay, I’m joking here, it’s pre-recorded, but my feeling is that everyone has used the Spark UI at some point. And you can also view those information after the job is finished by using a history server. And the other way people could use this is logging. So you can take a look at event logs or Spark process log after the fact. And then try to find use for information from the logs. However, the above method, and most popular way to monitor and debug a Spark job, but pretty much people use that when the jobs are starting filling or get pitched by the downstream consumer saying that the pipeline is done wrong and the data has not been processed properly.
You can also argue that you can use Spark UI to do real-time monitoring, check on the real-time status, but no one’s gonna watch Spark UI in real-time in production job. And also it’s really hard to be alerting system based on Spark UI.
At this time, the Spark jobs are fading for a while, and you need to start to dig into the history server or log to see what’s going on and get the pipeline back to running again. So what if we have a monitoring system and alerting framework to actively help us prevent this kind of situation?
So anyone who has run a complicated distributed system will know that the system is always going to stay on the partial of degradation. And the challenge is that to to make the overall system work okay by proactively addressing degradation before catastrophic failure by monitoring the metrics in real time, and get an alert to fix the underlying issues. There might be memory leak here or there, or some misconfiguration that cause some of the job running slower because the resource is not being utilized efficiently. Or maybe some of the inter-states of a streaming job is growing indefinitely.
So Prometheus is the solution here. Prometheus is an open-source, decentralized monitoring tool, literally you can use for monitoring everything. Pretty much, you can plug everything into that and monitor that. So the data model of Prometheus is multi-dimensional time series. And in many environments such as Kubernetes, it’s really easy to deploy it and operate it. It’s use scalable and tool based architectures. The new query engine language PromQL is designed to handle time series computations efficiently they can use to trigger alerts. We like it a lot, and it’s a great tool to monitor everything from the service, database, whatever you can imagine that can go wrong in Kubernetes, including Apache Spark. The architecture of Prometheus is pull-based instead of push-based. Prometheus relies on service discovery in Kebernetes to find a running job and pull and store the metrics from the application endpoint. Since it’s pull-based, you can set up multiple Prometheus instances for high availability. Even when you upgrade Prometheus, you can just do it without restarting the application. And the reason why Prometheus is not push-based is because sometimes it’s really hard to know when you start a job, submit it to a customer, whether it’s successfully started or not, because if it’s not started at all, you don’t actually have the chance, you know, to push out the metrics to the Prometheus servers, or it can be terminated at some point and then there’s no chance that you send out the metrics again.
However, for some of the short running jobs, the push model actually makes more sense. Since the job finished so quickly, Prometheus needs the time to store the metrics. As a result, Prometheus also provides a Pushgateway to support this kind of use case.
Prometheus also has an alert manager that can tag user’s query expression written by PromQL, and then send out an alert accordingly. Prometheus web UI is a really useful web interface that can be used to visualize the metrics. So in Spark 2.4, we have three ways to use Prometheus.
So the first one is using JMX, Java Management Extension, by enable Spark’s built-in JmxSink. Prometheus JMX is for the Java, and its configuration file has to be added into your application Java. And then JMXExporter port has to be exposed, and “-javaagent’ option has to be added into the driver and executed.
The downside of doing this is that it requires extra Java file, and also the deployment model has to be slightly changed. So the second way people can do it is set up a Graphite server. So, people can use Spark’s built-in Graphite Sink and then use Prometheus GraphiteExporter to convert the metrics from Graphite metrics to Prometheus metrics. So this is good if you already have a Graphite environment set up in your production environment. However, if you don’t have it, then it’s gonna take an extra couple steps to get this approach working.
Finally, you can also use a Pushgateway approach. So, basically the idea is that you can implement your own custom sink or use third party library with Prometheus dependency to push metrics to a Prometheus server. And as we already talked about, a push model is not really recommended unless it’s a really short running job. And typically for a small job, it’s not short running, it’s a long running job. So it’s not recommended. And also, this requires bringing extra Prometheus Java into your application as well.
So, the above approach has some pros and cons. So the above approach you already use in production, and stably, and it’s a general approach that people can use Prometheus. However, the problem is that it’s really difficult to set it up in new environments. And also, some custom libraries have to be added into your Spark application, which people probably would not prefer to do it.
So in Spark 3.0, we want to make the experience much smoother, and we want to be independent from other frameworks such as CNF or Graphite. And we don’t want to introduce new dependency into user applications. And we also want to reuse existing resources such as the current port that has been used in Spark to monitor the metrics. And we also want to take advantage of Prometheus Service Discovery in Kubernetes as much as possible so that Prometheus can find those applications and pull the metrics from these jobs. So, I would like to introduce Dongjoon, and he will be talking about more technical details how Spark 3 natively integrates with Prometheus. Thanks. – Of course, the big change is usually a dependence change. As you see in this timeline, Apache Spark has been using DropWizard Metrics version three for a long time.
But although it has been stable, the Apache Spark 3 operates into version four in order to support Java 11. In general, we expect that users will see a stabler and comparable output.
However, there are two notable side effects. The first, a DropWizard metric for additional label whose name is “type”. Adding a new label is considered as a breaking change in Prometheus monitoring, and you may need to add a relabeling configuration. The second, in case of case type, DropWizard also adds duplicate metrics like the example in the right bottom. This affects the existing Java agent file. You may increase the Prometheus server to meet the requirement.
Spark 3 adds ExecutorMetricsSource. It is a new metric source providing a rich set of executor memory metrics. Not only JVM memory, but also the whole process tree, including Python.daemon and other process are collected on the right. The left box shows JVM metrics, and the right box shows Process Tree metrics. These metrics are collected from every executor to Spark driver.
To support Prometheus monitoring more natively, Spark 3 adds a new metric sink too. It’s called PrometheusServlet. You can enable it very easily by using “metrics.property” file in Spark configuration directory. There is no additional system requirement.
PrometheusResource is another endpoint for all executor memory metrics. When the number of executor varies time to time like a dynamic location situation, this provides the most efficient and convenient way to discover and collect all executor metrics because the Spark driver already has all the information. Enabling PrometheusResource endpoint is easier than PrometheusServlet. It’s just one configuration, the “spark.ui.prometheus.enabled”. PrometheusServlet and PrometheusResource endpoints are disabled by default in Spark 3.0. This was to prevent executor interference with the existing metric collection pipeline.
Spark 3 also adds on new metric “spark_info”, which follows a standard Prometheus way to expose Spark version and revision. This graphs shows a Prometheus expression to monitor the number of Spark jobs per Spark version. And lastly, included in this environment, driver service annotation is also supported in Spark 3. It means that the Prometheus is able to discover the endpoint from Spark driver service.
So far, I gave an overview of new features. Let’s move on to the under the hood and see how to use this.
Basically the new endpoint of PrometheusServlet follow the metrics of the JSON endpoint style. JSON endpoint has been supported since the initial Spark release. You can find the JSON endpoint at “/metrics/json”.
Prometheus endpoint is at “/metrics/prometheus” by replacing “JSON” with “Prometheus”. The output follows the existing output style with “JMXSink + JMXExporter + javaagent” way. This table summarizes the port number and endpoint in driver node and worker and master.
And this is one simple output example which you can get from Spark driver endpoint. So if you use Prometheus monitoring in Spark 2, you might be familiar with this.
Like all the other metrics, “metrics.properties” file is used to configure PrometheusServlet. This example configuration is already included in the built-in template. So you can enable this by simply copying the template file and uncommenting these Prometheus properties. These four lines enable PrometheusServlet in Spark master node and worker node and driver.
The PrometheusResource is also designed to provide the similar information from the existing JSON rest API. The new endpoint URL is “/metrics/executors/prometheus/” and exposes the driver side.
When you enable Prometheus service, it’s also recommended to enable process tree metrics. This is used for when you use Python. In this Spark shell example, both configurations are enabled, and you can get the executor metric from this new endpoint like at the bottom.
Okay then, how can you monitor Spark jobs in Kubernetes cluster with these endpoints?
There are some key monitoring use cases. Monitoring batch job memory behavior, dynamic allocation behavior, and streaming job behavior.
This monitoring gives us the answer to the following questions. Is a job killed by OOM, or is there a risk to be killed in the near future? Is there any unexpected slowness due to competition or portal limitation? Is the latency good enough?
Let’s start with batch job monitoring. What we need is only four configurations. One is “spark.ui.prometheus.enabled” as described so far. And the other three are just annotations to Prometheus to discover this endpoint. Prometheus accepts these three annotations: scrape, path, port. You can give a proper value for stop endpoint you want to expose. This is how to use Prometheus Service Discovery. If you install Prometheus with Helm, it works out of the box in this way.
When you’re solving a Spark job, it looks like this. The yellow parts are the required configuration in the previous slide. These commands launch the well known Spark Pi example. But this is designed to be killed by Kubernetes due to the label driver memory, quantity is only two gigabytes.
Let’s see the Prometheus monitoring legend. In this graph, each line shows one executor, including driver. The value is the sum of on heap, off heap, and direct memory. So we can see that the memory consumption of the driver increased continuously, and ended up with OOMKilled situation. With Prometheus monitor, you can get notifications before OOMKills happen.
Spark 3 supports dynamic allocation in Kubernetes environment. This is a very, very exciting improvement. For this example I used four additional dynamic allocation configurations. Those are the dynamic allocation enabled, executor idle timeout, shuffle tracking enabled, and max executor.
This job is using dynamic pi Python script, which completes pi calculation twice. More specifically, this script computes pi value, and sleeps for one minute to shrink down to general executor status, then will compute pi value again.
This is the previous monitor legend. We can see the process task for all executors. Each line is one executor. So we can see that dynamic allocation added the new lines gradually in a stepwise manner and labeled all of them during the idle time. For the second pi calculation, it repeated the behavior again. Also, we can see that the task got evenly distributed across the executor.
Spark driver now has two metric endpoints for Prometheus metric and PrometheusResource. To monitor both endpoints together, you can keep driver path annotation and driver service annotation at the same time like this. The yellow parts are the new driver service annotation feature in Spark 3.
From the driver metric, you can get the executor allocation ratio, which means the number of all executors over the number of target executors. These graphs show two jobs of previous dynamic pi Python script. In the first slide, more target executors are acquired. In the second job, less than 20% of target executors are found. In the second job, the cluster might be very crowded at that time, or the user might set a wrong configuration for max executor without considering the given limitation like I did here. I used 300 for max executor, but it was too big.
Streaming job monitoring needs additional configuration. Set “spark.sql.streaming.metricsEnabled”. Then you can get six useful metrics: the latency, input rate, processing rate, state rows, and state bytes, and event time watermark. These metrics have a colon prefix with a metric “namespace” and “queryname” part. So, to track the metric consistently, it is highly recommended to set namespace and query name consistently from the beginning.
All streaming metrics are important for alerting, but here are two typical monitoring conditions. The first, if the latency is greater than micro-batch interval, it’s a warning sign frequently. Spark can endure some situations, but the job needs to be redesigned to prevent future outage. Second, if the total states grow indefinitely, it’s also another warning sign. In Spark 2, sometimes a simple Alias addition deleted a watermark metadata completely. At the alert, the states are not cleaned up properly, then the job eventually dies. This is fixed in Spark 3.0.
To use Prometheus monitoring in Kubernetes cluster, we recommend to use the Prometheus federation way and follow the practice of separation of concerns. Every user namespace has a Prometheus server named by each user. For example, “namespace1” collects only batch job monitoring metrics, while “namespace2” collects streaming job monitoring metrics. And the cluster-wise Prometheus server only monitors a subset of metrics, like a “spark_info”.
So, limitations and tips. Please note that new endpoints of Spark 3 are still experimental. New endpoints expose only Spark metrics starting with “metrics_” or “spark_info”. The previous Java agent method can expose more metrics like “jvm_info”, including Java version. PrometheusServlet does not follow Prometheus naming conventions. Instead, it’s designed to follow Spark 2 naming conventions for consistency in Spark versions.
And also I mentioned before, the number of metrics grows by default if we don’t set the configuration properly.
In summary, Spark 3 provides a better integration with Prometheus monitoring. Especially in Kubernetes environment, the metric collections become much easier than Spark 2. New Prometheus style endpoints are independent and additional options. So there is no additional requirement, and users can migrate into these new endpoints or use them with existing methods in a mixed way.
I'm a software engineer and my main focus area is a fast and efficient data processing. At Apple, as an Apache Spark and ORC PMC member, I develop and maintain the internal distributions powered by Apache Spark and Apache ORC.
DB Tsai is an Apache Spark PMC / Committer and an open source and big data engineer at Apple. He implemented several algorithms including linear models with Elastici-Net (L1/L2) regularization using LBFGS/OWL-QN optimizers in Apache Spark. Prior to joining Apple, DB worked on Personalized Recommendation ML Algorithms at Netflix. DB was a Ph.D. candidate in Applied Physics at Stanford University. He holds a Master's degree in Electrical Engineering from Stanford.