Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins

May 26, 2021 12:05 PM (PT)

Download Slides

This talk will cover some practical aspects of Apache Spark monitoring, focusing on measuring Apache Spark running on cloud environments, and aiming to empower Apache Spark users with data-driven performance troubleshooting. Apache Spark metrics allow extracting important information on Apache Spark’s internal execution. In addition, Apache Spark 3 has introduced an improved plugin interface extending the metrics collection to third-party APIs. This is particularly useful when running Apache Spark on cloud environments as it allows measuring OS and container metrics like CPU usage, I/O, memory usage, network throughput, and also measuring metrics related to cloud filesystems access. Participants will learn how to make use of this type of instrumentation to build and run an Apache Spark performance dashboard, which complements the existing Spark WebUI for advanced monitoring and performance troubleshooting.

In this session watch:
Luca Canali, Data Engineer, CERN

 

Transcript

Luca Canali: Hello and welcome. My name is Luca Canali and my presentation today is about monitoring Apache Spark 3.x on Kubernetes, using metrics and plugins. I worked for CERN as a Data Engineer with the data analytics and database services. I’m passionate about methods and tools for measuring and improving performance of data platforms. We will cover today Apache Spark Monitoring Ecosystem, Spark Metrics System, Spark 3 plugins, metrics for Spark on Kubernetes and cloud storage, and you’ll see how you can run a Spark performance dashboard. As Spark users, we care about performance at scale. The key to good performance is to run good execution plans and make sure to remove serialization points and bottlenecks in general. To do that, we need instrumentation, data and tools that allow to investigate and find bottlenecks in the workload.
Apache Spark Monitoring Ecosystem. Let’s have a quick recap. The web UI is the first entry point into Spark instrumentation with details on jobs, stages, task, SQL, streaming, et cetera. The REST API, and the programmatic Spark Listener API exposes task metrics and executor metrics, which are also very useful for troubleshoot.
Then there is the Spark Metrics System, which is instrumentation we are going to use in the rest of this talk. Spark metrics are emitted from the driver, the executors and other components into configured sinks. There are several sinks available, as we will see. Metrics are many and varied. They cover many details related to Spark execution. For example, number of active tasks, jobs and stages completed and failed, executor CPU, run time, garbage collection, shuffle metrics, I/O metrics, and memory metrics.
If you want to explore the Spark Metric System, you can use the WebUI Servlet Sink. Metrics are made available in JSON format, and are particularly useful when running local mode. This also comes with experimental support for Prometheus. The JmxSink allows for more flexibility. For example, you can explore metrics using JConsole and use older tooling for JMX.
In the rest of this DOC, we will discuss the Spark Metrics System in the context of a Spark Monitoring Pipeline. This slide provides the architectural overview. The Spark driver and executors collect their workload metrics and sink them to a Graphite endpoint in an InfluxDB instance. Metrics are then visualized using custom Grafana dashboards that read from InfluxDB.
This is how you can configure the metrics system to sink into a Graphite endpoint. Simply edit metrics to a properties file. There is an alternative, you can use the spark.metrics.conf parameters as highlighted in this slide.
One of the key advantages of building dashboards, is the possibility to visualize metrics as a function of time. This slide shows two particular important metrics. Number of Active Tasks is useful as it shows how much Spark is able to paralyze the workload, and highlight areas when this does not happen, which could be because of long tails, stragglers, partitions queue, insufficient partitioning, and several other reasons. CPU Usage by Executors is also very important, to understand how the workload is utilizing the underlying platform.
Spark is widely used in cloud environments. There is a need for improved instrumentation in such cases. Rather than adding this instrumentation into Spark Core, Spark 3 offers new opportunities with the Spark Plugins interface. In this talk, we discuss plug within metrics, for Spark on Kubernetes, where you can measure pods resource usage, and also plugin instrumentation for cloud filesystems.
This slide provides an overview of Spark Plugins in the context of metrics extensions. Plugins allowed to run user-provided code on the start of the executors. The plugin code can interact with external packages, and register metrics that will flow towards the sink together with the rest of the Spark metrics. This effectively expands the Spark Metric System.
This is how the Spark Plugin API works. You can create your custom class extending SparkPlugin, and use metricRegistry from the PluginContext, to register the metrics of interest. In this example, a very basic one, the plugin metric simply reports are constant value. If you want to experiment with Spark Plugins, you may want to head to the repository shown in this slide around these two basic plugin examples. To do that, you just need to add a package and a configuration parameter. Here is an example of a more interesting plugin that you might find useful when running Spark on Kubernetes. The plugin reads workload data from CGroup instrumentation, and meets metrics with the pod resource usage. This is how you can use it in this slide.
The metrics reported by this plugin are CPU, memory, and network I/O. Measuring CPU usage this way is useful, as it gives information on CPU used by the JVM, and possibly other components running in the pod. For example, Python UDF, when using PySpark. Network throughput metrics are also quite useful as they give information on remote I/O and shuffle workload. Another plugin that you might find useful allows to measure cloud filesystems such as S3 or any other Hadoop compatible filesystem. The metrics reported are bytesRead, bytesWritten, readOps and writeOps. This is how you can configure the plugin.
There is to say that there is a bit of complexity in setting up the infrastructure for a Spark dashboard. The repository mentioned in this slide provides code and examples that you can use to get started. You can run the instrumentation on a Docker container, which takes care of configuring the InfluxDB, and comes with pre-built Grafana dashboards. You can find at this repository also details on how to install the dashboard using a Helm Chart.
This is a recap of the provided tooling for a Spark Plugin instrumentation on Kubernetes and cloud filesystems. There is a package that you can use, and the configuration that you need to set if you want to use these plugins. In this slide, you can see how it can put all this together. In an example, you can run the Docker container with the infrastructure, and then you can run your Spark shell, Spark summit, or PySpark with the additional instrumentation that will show up in the dashboard.
The dashboard allows to visualize Spark metrics stored into InfluxDB. It can be used for real time monitoring, or to explore historical data. The metrics are many, only a few are shown in the provided dashboards. You can see gauges that summarize metrics across time intervals, and other graphs represents metrics as the function of time. The final goal of this exercise is to feed monitoring data into performance troubleshooting and root cause analysis. An additional piece of configuration is about the dashboard annotations. Annotations allow to link data about the start and end time for job ID, state ID and SQL ID to the graphs in the dashboards. This is how you can configure it. And now, a short demo on the Spark Performance Dashboard.
This demo is showing you how to run a performance dashboard for Apache Spark. We will use the tooling and instructions available, on the repository shown in the video. First, a quick recap on the architecture. Spark drive and an executor’s collect workload metrics and send to a Graphite endpoint into an InfluxDB instance. Metrics are visualized using Grafana dashboards that read from InfluxDB. The dashboard infrastructure components, namely InfluxDB and Grafana, we are running a Docker container that you can start, as highlighted here. I’ve added additional configuration to the Spark jobs used for the [inaudible] to configure when and how to sync the metrics. You can see the configuration highlighted here, and also added additional configuration for adaptation, for queries, jobs, and stage, end and start time.
I have used plugins from the repository shown here to extend Spark instrumentation with metrics for Kubernetes and Cloud5 systems. Spark plugins require additional configuration, as shown in the repository here. Here you can see one of the provided dashboards. In the background, Spark [inaudible] with 12 executions are in a Kubernetes Cluster is running the TPC as benchmark a scale, 1,500. Use the drop-down to select the user and application ID you want to monitor. During this demo, you will see a real-time data refresh every 10 seconds. The controls on the top-right of the dashboard allow you to change the time interval and the refresh rate.
The first set of gauges that we want to look at, show cumulative values for metrics of interest, such as Task Run Time, CPU usage, garbage collection, metrics for tasks and jobs, details, metrics for the memory usage, metrics for IO. This case here shows the number of active tasks that is at the current moment in time. To have some metrics values as a function of time are useful to understand how the workload evolves. Number of active tasks is particularly important as it shows you how Spark is able to paralyze the workload, and to allow periods of time when this doesn’t happen, which could be due to long execution tails, stragglers, [inaudible] partitioning, partitions queue, et cetera, all that closes. As you can see, as I hold it on top of the graphs, the details of the metrics values per executor up here. CPU usage is also very important to understand how the workload evolves. Garbage collection is shown here. This graph summarizes how they executor tasks run time breaks down in terms of CPU usage, habits collection, serialization, also uninstrumented time, which is [inaudible] and other components.
We have here metrics about the memory usage. You can see JVM on heap memory, then unified memory in Spark, collided in executor, execution, and storage memory. There’s the driver memory here. Then you have metrics for Spark IO, HDFS IO, and Shuffle IO. An additional set of metrics that is very useful for running Spark on Kubernetes, [inaudible] on the plugin instrumentation with CGroups. This graph shows the CPU used, measured directly from the parts. Here you would see the CPU used by Spark and in case we were using PySpark, that would also be displayed here. Then you have the metrics for network by the coming into the pods. This includes the IO reads we are doing, and also onto the shuffle. You can see here. This is a network bytes that go out, also contains parts of the shuffle and data transfer towards the driver from the executors. And we have metrics for memory. An additional set of metrics are about the cloud storage. You start using, for example, S3 or any other compatible file system, you can measure here throughput in terms of bytes ran and written, and read and write operations.
I’d like to show you an additional instrumentation that has to do with annotations. Here we can see when the queries have started. For example, the last interval of 15 minutes, the last square that started and we’re measuring is square in 99. So we can go to the web UI and see the details of the Query 99. Here is the [inaudible]. This makes the link between the dashboard instrumentation and the standard web UI instrumentation. This ends our short tour of the Spark Dashboard. I hope you find this useful, and I wish you good luck with your Spark performance troubleshooting.
I hope you enjoyed the demo. It’s an advanced topic. I like to mention that Spark plugins, and the metric system, allows to instrument custom code in libraries. As an example, this slides details how we instrumented the rate time for S3, HDFS, certain unified system, for a benchmark that we did across via these five systems available here. To do that, we use the infrastructure of Spark plugins and the metric system, and some custom code. You can find the link in this slide.
Just before our conclusions. I would like to share some of the lessons learned, developing and running these type of infrastructure for a couple of years. One of the things we do is we provide a Spark dashboard, as an optional configuration for the CERN Jupyter-based data analysis platform. Users are able to select these type of instrumentations if they want to. This also works nice and fine, but there is to say that there is a cognitive load to understand the available metrics and the troubleshooting process. In practice, users do this type of troubleshooting together with experts, or from the Spark service. An additional technical point is, we said data retention.
In general, we pay attention not to overload InfluxDB with too much data. Another point I wanted to bring up is about Spark development in general. For this infrastructure, for example, in more things can be done. It’s still not a complete job. A native InfluxDB sink will be quite useful at the moment we are using a Graphite sink, and InfluxDB 1.x. You can pick it up, but it would be nice to have a native sink or maybe also Prometheus sink. Also, spark is not fully instrumented. A few areas that are yet to cover. For example, instrumentation of I/O and the Python UDL run time.
This brings me to our conclusions. The Spark metrics and the dashboard provides extensive performance monitoring data. They are a nice compliment to the Web UI when doing performance troubleshooting. Spark plugins, introducing Spark 3.0, make it easy to augment Spark metric system. For example, we have seen in this demo plugins to monitor Spark on Kubernetes and cloud filesystem usage.
You can get started building and using Spark performance dashboard based on InfluxDB and Grafana, with the tooling that was shown in this presentation and the demo. I hope this motivates you also to build your own plugins and dashboards, and also share your experience. I would like to thank all the people that made this work possible. I also put some links here in this slide, and I would like to thank you all for your attention.

Luca Canali

Luca is a data engineer at CERN with the Hadoop, Spark, streaming, and database services. Luca has 20+ years of experience with designing, deploying, and supporting enterprise-level database and data ...
Read more