Let’s Dumb-Proof Data Pipelines

May 27, 2021 05:00 PM (PT)

Developing and deploying data pipelines in production is easy. Maintaining data pipelines is hard because most often it’s not the same engineer or team responsible for operating and maintaining data pipelines in production. If your data pipelines are not parameterized and configurable, you need to recompile your source code and go through your release process even for simple configuration changes. Making your data pipelines configurable is not enough. Bad user input can result in many classes of issues such as data loss, data corruption. data correctness, etc.

In this talk, you’ll walk away with techniques to make your data pipelines dumb-proof.
1. Why do you need to make your data pipelines configurable?
2. How to seamlessly promote your data pipelines from one environment to another without making any source code changes?
3. How to reconfigure your data pipelines in production without recompiling the ETL source code?
4. What are the Pros and Cons of using Databricks Notebook widgets for configuring your data pipelines
5. How to externalize configurations from your ETL source code and how to read and parse configuration files
6. Finally, you’ll learn how to take it to next level by leveraging Scala language features, pure config, and typesafe config libraries to achieve boilerplate free configuration code and configuration validations

In this session watch:
Ganesh Chand, Data Engineer, Databricks

 

Transcript

Ganesh Chand: Hello everyone. Thank you for attending my talk. My name is Ganesh Chand. I’m a Staff Solutions Consultant at Databricks, where I get to solve the world’s toughest engineering problems for our customers.
The title of the talk is Let’s Dumb-Proof Data Pipelines. What I mean by this is that in the real world, deployment issues and production outages do happen. And often, these are caused by user errors. When these mistakes do occur, it may take minutes, hours, or days to recover. And depending on the severity of the issues, it could impact your team and the company in many ways. For instance, missed SLS can result in penalties. You could impact downstream pipelines and systems. Your business could be impacted for not being able to make key decisions because of data and availability, and prolonged outages can cause data decay, making data no longer valuable.
Let’s see the common causes of ETL accidents caused by human error. First, the configuration errors, such as incorrect input paths, output paths, filter conditions. Resource misuse, such as accidentally deleting jobs, clusters, storage paths. Operator errors, such as accidentally re-running data pipelines, resulting in data duplication and corruption. And a bug in your data pipeline causing data leakage and corruption, right? These situations typically occur when you are deploying pipeline from dev to staging to prod, and they also occur when you are deploying hotfixes to respond to production outages, and also giving regular maintenance activities such as fixing defects, housekeeping activities, upgrading runtime environments, upgrading your dependency libraries, and scaling data pipelines in response to business requirements.
So this talk is not about how to recover from these mistakes. This talk is about, how do we make such mistakes hard to make, right? So there are obviously many things we can do, for instance, having a proper CSED, and release management process around your code, implementing and enforcing security policies around your resources, and setting table [inaudible], table properties, and data governance. But these problems are already solved by many tools and products, and which are most likely available in the platform that you’re using. But what is not solved is the interaction between data pipelines and users, and how we process the user inputs and catch errors. So that is going to be the focus of this presentation.
We will go to the series of steps and see how we can improve our data pipeline, starting from the hard-coded data pipeline. We’ll parameterize it and we will externalize the parameters. And finally, we will see if it’s possible to reduce all the boiler code related to configuration loading and validation.
Let’s look at a typical hard-coded data pipeline. So what is wrong with this pipeline? Right? If you see the reader option here for where we’re saying rows per second is one, this value is hard-coded. And similarly, the Spark configure tuning parameters are hard-coded, and the storage path where we are maintaining the checkpoint and storing the data, are hard-coded as well. What this means is that when you want to deploy this pipeline from a development to staging to production environment, you will essentially have to make source code changes. So all the testing that you did in dev and staging before deploying prod, they become pretty useless because you are changing source code before deploying in production. So, how do we make sure that environment changes do not have any impact to our code, and how do we catch user error?
We are going to fix this problem by parameterizing values that change across environments. We’re going to increment the user input validation and we’ll refactor the code.
So if you’re a Databricks customer, you’re familiar with this interface. This is a Databricks Notebook. And what we are using here is a Databricks dbutils widgets API to define the interface, to capture user inputs. And then we process the user input and we implement validation here. So, if there are some incorrect user input values, this pipeline will immediately fail here, before even your Spark [inaudible] starts.
So what we have done here is we have parameterized our data pipeline. You can see the parameterization and handling of the user inputs and enforcing requirements on the top right. And our ETL code is now fully parameterized and it’s agnostic to environments. So you can safely deploy this code from development to staging to production, without making any source code changes.
So we have solved environmental changes issues. We’re able to validate user inputs and fail our pipeline early if there are any mistakes, but can we do better? If you have many input parameters? So dbutils widget-based interactions will probably be difficult to manage. And because dbutils widgets API can treat every input parameter as string, you have to do the type casting yourself, and that can lead to error sometimes. And often we might need complex data types as input parameters. For example, you might want to get a collection of strings. And we also want to be able to track parameter changes. So if the pipeline fails, we’d like to know, or the input parameter to this data pipeline changed and who changed it and when it was changed. So how are we going to solve this problem?
So now we will look at how we can externalize the data pipeline configuration to solve those problems. But it’s a multi-steps, right? You need to first choose the configuration format, and then choose the configuration library. And you’ll have to refactor the code. So first let’s look at some of the configuration formats that are available and pick one for our example.
So there are many configuration formats available. And the one that I see often is, of course, JSON. But JSON isn’t really a good configuration language because it’s very verbose. There’s no support for comments, multiline strings and substitutions. So what I’m going to talk about is the HOCON, Human-Optimized Config Object Notation, configuration format. It’s a purpose-built configuration format because it’s very concise. It supports comments, multiline string, supports substitutions and includes. And it’s actually a JSON superset, so any of valid JSON format-based configuration is also a valid HOCON format. And there are libraries available in Python, Java, and Scala. If you wanted to learn more about this specification, you can follow this link.
So now I’m going to start from JSON configuration ,and then incrementally improve it using HOCON format. And then finally present you a side by side comparison and really show why HOCON format is really great. So in this example, what I have is… My data pipeline requires configuration parameters for dev and prod.
So as you can see here, JSON format requires me to use double quotes and curly braces, and there are lots of repeated values such as the tuning field, is part of both the dev and prod, but the values are identical. So I’m repeating this configuration for both environment. So here’s the first iteration. Basically I’ve removed unwanted curly braces, colons, and double quotes. But can we do better?
So iteration two, I extracted the common values to common config object, and notice that the checkpoint path is now relative to output path, and output path is constructed using the root path in the common config object, and tuning is referred in both dev and prod. What this means is that, if for any reason you want to migrate your storage path from one bucket to another bucket, you would just simply make change in this one line here. So this really enforces the DRY principle, right? So we’re not repeating ourselves.
So here is the side by side comparison. On the left, you have a JSON configuration, on the right you have a HOCON configuration, and it really shows why HOCON is a readable and a maintainable configuration format.
So we picked our configuration format. Now we need to pick a configuration library, and here I’m going to talk about Typesafe. It’s a configuration library for JVM languages, written and maintained by Lightbend. It’s implemented in plain Java with zero dependencies. It supports files in three formats: you can read java.properties files, JSON fires, and of course the HOCON files. And if you are Databricks customers, this library is already part of Databricks Runtime. And this code snippet here, what I’m showing here is how easy it is to parse, to load the configuration file, and parse it and use it in your data pipeline.
So now we have picked our configuration format and configuration library. We’ll now go ahead and refactor our code. First, we need to define a input to read the configuration file path, and then the environment in which we want to run. Right? So notice how the number of the input parameters went from six to just two, and now in line number 9 and 10, we are just reading the path that the user provided to us, and we’re parsing the configuration. And line 13 through line 17, we’re extracting the parameter values. And on line 18, we’re enforcing our requirements. In this case, I just have one validation, but you can write as many validations as you want.
So now we’ve finished implementing loading and parsing our configuration. The core ETLs should not change. So as you can see here, we still don’t have anything hard-coded. And all of the configurations now are coming from the configuration file that was passed to us by user.
So we have fixed environmental changes issues where we have implemented user validation. So we’re able to catch user errors early on. It really doesn’t matter if the number of input parameters grow over a period of time. And because the configurations is managed in file, which can be stored in your Git repository alongside your ETL code, we’re able to track changes. But can we do better, right? As your data pipelines grow, you would essentially need to come up with some patterns around your configurations, and you’d start developing some data model around that. And then what you will essentially be doing is you’ll be reading configuration files that you would want to then deserialize that into some data model that you have developed, right? And at that point, you start writing a lot of boilerplate code and because Typesafe config library is written in Java, and if you’re using Scala to develop your data pipeline, you have to deal with type conversion between Java and Scala. So our next goal is, how can we reduce boilerplate code and write some advanced user validation?
So loading configuration has always been a tedious and error-prone procedure, and a common way to do it consists of writing a lot of code to deserialize each field in the configurations. And you have a region about mapping the field in your configuration to the fields in your classes. So PureConfig is a great Scala library for loading configuration files that supports HOCON properties and JSON files. It has excellent support for sealed traits, case classes, and Scala collections, and optional values out of the box. And you can use this library to generate configuration files as well. If you want to learn more about PureConfig, please follow this link.
So now let’s take a look at an example of pipeline where we have a streaming pipeline, and we want users to decide whether they want to operate this pipeline in trigger once mode or in trigger interval mode. Right? So, I have a data model that allows me to define whether the streaming trigger can be either trigger once or trigger with interval.
And then I define the pipeline conf class with output path, checkpoint path, rows per second, and streaming trigger, and tuning is a sequence of strings. And I define my requirements here, which will basically validate user inputs. And I do a pattern match here based on the user inputs, and then return the structured streaming trigger, which can then be used in line number 52, which will then basically decide whether this pipeline runs in a trigger once mode or triggered duration mode. So, the main thing that I wanted to highlight here was that on line number 48, notice how easily we’re able to read our configuration, parse it, and deserialize this configuration object into our pipeline configuration class. And once we have the pipeline conf object initialized, we can just simply refer our output path and checkpoint path and rows per second, streaming trigger, and tuning, and so on.
So we’ve solved environmental changes problems. We’ve implemented user validation, so we’re able to catch user errors early on. Because we have externalized the configuration parameters in a file, number of input parameters doesn’t matter. We have libraries that is doing type casting for us. We are able to track parameter changes. We’ve reduced the boilerplate code, and we have enhanced validation feature as well.
So, in summary, user errors are inevitable. We should really make it hard, if not impossible, to make mistakes. And we should try to catch errors and fail early, as early as we can. And we should externalize configuration so that we’re able to track changes, and use libraries to reduce boilerplate code. The source code and examples will be available at this GitHub repo. Thank you.

Ganesh Chand

Ganesh Chand is a data engineering consultant at Databricks with 10+ years of industry experience in building enterprise-scale Data solutions. He is particularly passionate about solving world's tough...
Read more