Observability for Data Pipelines With OpenLineage

May 28, 2021 11:40 AM (PT)

Download Slides

Data is increasingly becoming core to many products. Whether to provide recommendations for users, getting insights on how they use the product, or using machine learning to improve the experience. This creates a critical need for reliable data operations and understanding how data is flowing through our systems. Data pipelines must be auditable, reliable, and run on time. This proves particularly difficult in a constantly changing, fast-paced environment.

Collecting this lineage metadata as data pipelines are running provides an understanding of dependencies between many teams consuming and producing data and how constant changes impact them. It is the underlying foundation that enables the many use cases related to data operations. The OpenLineage project is an API standardizing this metadata across the ecosystem, reducing complexity and duplicate work in collecting lineage information. It enables many projects, consumers of lineage in the ecosystem whether they focus on operations, governance or security.

Marquez is an open source project part of the LF AI & Data foundation which instruments data pipelines to collect lineage and metadata and enable those use cases. It implements the OpenLineage API and provides context by making visible dependencies across organizations and technologies as they change over time.

In this session watch:
Julien Le Dem, CTO and Co-Founder, Datakin

 

Transcript

Julien: Hi, this is Julien. I’m the CTO and co-founder of Datakin. Today, I’m going to talk about data lineage and observability with OpenLineage. So, for this presentation today, the agenda is as follows. I’ll start with giving a little bit of context, talking about the need for metadata, and then I’ll go over OpenLineage, which is an open standard for lineage collection and Marquez, it’s reference implementation. And at the end, the presentation was talking about Spark observability with OpenLineage in particular as an example of application. So first, let’s talk about the need for metadata. And so I think in an organization, you often have magical teams consuming and producing data and depending on each other, and within a team, people are fairly aware on how things work, how they dependent on each other, how things change and things work fine, but as soon as you go across teams, there’s a lot of friction and there’s a lot of problems that come from changes happening and people not being aware of it, not really understanding the context of changes, and that creates a lot of friction.
And these come from a limited context around data, right? Often you consume data or you produce data, but when you consume, you don’t know where it’s coming from, you don’t know what it’s schema is supposed to be, you may not know who the owner is, you don’t know how often it’s updated and where it’s coming from, and when you produce data, you don’t necessarily know who’s using it, and there’s diff… Basically, hard to go beyond everybody who’s consuming several time and sometimes multiple hops downstream from you or where the data is coming from multiple hops upstream from beyond the dataset. So there’s very limited context and it is hard, you can get to this information, but it’s just hard and time-consuming to get to it.
And this is… And this part of making, getting value out of the data. We talk about a lot of the work of data scientists or data engineers is cleaning up data, fixing things and making sure things work, and that fix a lot of work and I built this… I’m inspired from the Maslow’s hierarchy of needs which is… The Maslow’s hierarchy of needs is about, before you reach happiness, you need first to have food, you need to have shelter, and then once you have that, you need to have safety, and once you build on those basic things, on top of the pyramid, that’s how you can reach happiness in your life. And there’s a similar hierarchy of needs for data in that first, before you can get value out of your data, it needs to be available and you need to have it in a location or in a tool that you can start analyzing it.
And second, once the data is available, you need to make sure it’s fresh and it gets updated on time, right? So this data freshness notion. And [inaudible], once the data is available and it gets updated on time, it needs also to be correct, and once you build on this foundation, then that’s when you can start trust your data, build trust in your data and work on, how do we optimize our business, right? Or, how do we find new business opportunity with it? The actual getting value of your data, but first, you need to get all those three things out of the way so that you get your head above the water line, which the distraction here.
And that’s how we launched OpenLineage last year, and the goal was really to enable getting this context, understand what’s happening, how’s things are changing, and really get to the bottom of those problems. And so we were honored to reach out to a group of great tech creators and contributors from major open source project, and really reach out to these groups and get this project started last year and really defining a standard for lineage and metadata. And the purpose of OpenLineage is to define an open standard for metadata in each collection by instrumenting data pipelines as they are running. And so that’s our mission statement here, and to go simpler, really, the purpose is to be the exit for data pipeline. So, today when you take a picture with your digital camera or smartphone, it’s going to encode in the picture itself, the coordinates of where it was taken, when it was taken and so on, right?
And so OpenLineage is really this purposed for data pipelines. The best time to capture metadata is when the job is running, that’s when you know what was the version of the code, what was the schema of the input, what was the schema of the output, how long it took to produce it, and really being able to encode and capture these metadata on time. And so, before OpenLineage, there are bunch of projects that are interested in collecting data lineage, things like Amundsen, DataHub, Marquez, Atlas and others, are all project that are interesting in visualizing this lineage and understanding it across the data ecosystem, and they are all figuring out how to extract this lineage from the various tools we’re using to process data, and so there’s lots of duplication of efforts. Each project has to instrument all of those projects, and it’s also very brittle because those external integration [inaudible] whenever each of those projects changes.
So with OpenLineage you really reach out to all those people in this ecosystem and we sat together and say like, “Okay, let’s define a standard way to represent lineage and metadata around it so that we can simplify this problem.” And so there are multiple advantages. One, the effort of integration is shared. So instead of reinventing the same concept over and over, we can have a standardized generic representation of lineage. And second, because OpenLineage is a spec and it’s like an interface and it’s just really a client, an interface and a definition of lineage, you can actually push it in the ecosystem. Instead of integrating from the outside and extracting lineage from Spark sequel, your schedulers, you can actually make all of those things expose the lineage, understand internally into a standard representation. So it’s a lot more robust from an evolution of the ecosystem perspective and less… It’s a lot more efficient from a duplication of a front perspective. And so, to clarify what is scope and not in scope, and with OpenLineage, there’s a clear parallel between OpenLineage and Open Telemetry
Open Telemetry is really capturing traces and metrics for services, and OpenLineage is about capturing similar notion of data lineage and their connected metadata from the data pipelines. And so it’s really a spec and the integrations on how we expose that spec. And then there’s a configurable backend, so those are events that capture lineage can be sent to various backends and can be consumed by various consumers. So OpenLineage is really the client side specification of these lineage events, and on the backend side, you can have Marquez, obviously, as your reference implementation, you can consume those events in DataHub, in Amundsen, in Atlas, and so that’s not the scope of OpenLineage. This is a whole… How we index and collect this information and use it for various use cases.
The core model of OpenLineage is defined as a JSONSchema spec. So there’s a schema that defines how you represent a run of a job and what inputs and outputs you test. So this is based on consistent naming of recurring jobs and their input and output datasets. So, that’s a core model of lineage. There’s a runner for job, it starts at a given time, it end at a given time and it’s read from this dataset and wrote to those datasets. And now, around the score model, you can attach the notion of facet. Which each facet is this atomic piece of metadata that describes one aspect of that entity, right? For a job that could be, what was the version of the code of this jobs? For example, a [inaudible]. For a dataset, that could be, what was the schema of this dataset at that time? For a run, it could be, what was the [inaudible] profile of the job that just finished? And so you can capture various species of metadata like this, and they all self contain little spec that makes it really easy to extend and customize OpenLineage.
And so, like I said, once you define those events, the protocol is flexible. So it’s meant to be… The only constraint is that we can send it as synchronous events which are observations about the running jobs, and so a unique run ID identifies a run and enables correlating those events that are describing, making observation about a particular run and that can be sent to configurable backend. So typically, in the case of the Marquez integration, it’s an HTTP backend when you can send… Just post events. And, but that could be Kafka, that could be anything that accepts sending [inaudible] events. And so, some of the examples, you would start with the run start event that would contain, what was the source code version, which were the parameter paths to this particular run, and then you would have a run complete event that would say, “Hey, those were the dataset you were write from. That was the schema they had at the time you were write from. Those are the dataset we are writing to, in what version and what was their schema when we were done writing”
And so this notion of facet is really this extensible notion. So, like I was saying, those facets are atomic pieces of metadata. They identify by a unique name and it can be attached to core entities. So the advantage of this approach is, instead of having a monolithic spec that would take a long time to conversion, we were able to convert very quickly on the core spec of what is lineage, right? You have this notion of jobs and inputs in our dataset. And then each of those aspects of metadata can be discussed independently, right? And it’s really important to be able to decouple things that are controversial from things that are fairly easy to agree on so that we can move faster, right? Then we can make all those co… And also different set of people. Whoever is interested in different concepts can help drive this effort, right?
It could be driving the effort about column level lineage, could be driving the effort about data quality metrics, driving the effort about about schema or query profiles for performance. So those are individual things that we can define independently and have quick, independent discussion and agreement on standardizing those. And it’s also decentralized, so you can… You have those core central facet part of the spec, but you can also have your own custom facet that makes it easy to experiment and add your own notion of facet, right? If you have a proprietary system, you can add your own custom facet that are defined by you and controlled by you, and [inaudible] them in the metadata, and so that’s an easy way to experiment and decentralize and enable using OpenLineage without necessarily going through an approval mechanism for everything.
And so to give you examples of datasets. Our facets at the dataset level, for example, you can have statistics for data quality, you could have the schema information, what was the version of the dataset if you’re using a version storage linear like Delta Lake or Iceberg, it could be the column level lineage if the transformation you’re having expose this information. At the job level, it could be, where’s the source code for this current version of the job? Which was the dependencies of the versions? What parameters were passed or [inaudible] parameters if it’s a training job? What was the query plan for this particular job? And if it’s self-contained like a SQL query, it could contain the SQL query itself. At the run level, you’d have the schedule telling when you start, when did it end, what was it scheduled for, what batch ID is this relevant, what was the query profile?
And so I discussed as OpenLineage as the spec for collecting metadata and how to instrument varying things, and Marquez is a reference implementation that helped store and version every single activity in OpenLineage. So, really, in this context, the Marquez was built is the goal was to collect all the lineage information from a data ecosystem, right? And get visibility in the dependencies of everything and version everything that’s happening. So this is an illustration of what a data platform could look like. And in this example, you have ingestion, typically, you have a storage layer for streaming and batch with things like data in motion. In Kafka, for example, storage could be a Cloud just to bridge its storage with some abstraction on top like Iceberg or Delta Lake. The compute layer may have streaming like Flink of Kafka streams or other things, and on the compute layer, typically, you have a scheduler like Airflow or something else I [inaudible] jobs or SQL [inaudible] and Snowflake, for example.
Any other BI layer on top. So the goal is to instrument all those things, and this is really just one example of things as an illustration, and keep track of all these it’s lineage and metadata in one place. And the difference between the OpenLineage model and the Marquez model is, OpenLineage is making an observation about a running job, right? There’s a run of a job and it started at this point, finished at this point and write from this road to this. Now, what Marquez is doing is taking all those observations and keeping track of all the changes, right? It will keep track of whether the dataset schema has changed. It will keep track of whether the job they’re finishing has changed now that we changed a version of the code, or the inputs and outputs have changed and keeping track of all the change. So it’s taking those events and versioning the metadata as it’s changing so that you can keep track of the changes and understand how the data evolves over time.
And so to give an example of what Marquez is doing [inaudible] Datakin, which is my company. On top of this, you can look at OpenLineage and Marquez standardized metadata collection, the job runs, what parameters were passed, which was the versions, when were the inputs and outputs, and really, all that metadata that’s collected, and Datakin is just leveraging this data and for understanding operational dependencies, doing impact analysis, troubleshooting, what has changed since the last time these things worked? And so it’s really leveraging the exact same metadata collected with OpenLineage and Marquez to build a better lineage analysis and better understanding and quicker troubleshooting on when something goes wrong.
And now, I will talk more specifically about how we do the Spark integration for OpenLineage, right? And the goal is to, how do we achieve observability for Spark in this particular context, and OpenLineage [inaudible] for schedulers like Airflow for warehouses, like [inaudible] and Snowflake, or even [inaudible] databases in Spark, and there’s more integration on the way that are in progress and that people are having. And so, in the context of this presentation, I’ll dive in the Spark integration. So first, the way you set up the Spark integration is used, just add the extra Java options to your Spark jobs. So it’s meant to be configured [inaudible], right? You could configure it once for all your Spark jobs and start getting visibility in what’s happening, and you pass it information of you’re opening each endpoint that’s going to consume the OpenLineage events.
And then, it’s going to start collecting metadata. So whenever… The way this works is it adds a listener to the Spark context, and so it’s going to be able to see the logical plan for each of your Spark runs, and so it’s going to collect from that. It’s collecting the inputs and outputs of the jobs and calling the words, reading from where it’s writing to. It’s also from the profile, it’s going to collect the row count and bite-size how many rows were consumed from the input, how many rows were produced in the output, and it’s also collecting the logical plan in a standard way. So all very useful information if you want to be the observability and understand what happened, and now things change over time, right? Like a typical use case of understanding if, “Oh, is the row count consistent [inaudible]?” And so, when we model the lineage for Spark, there are multiple levels of nesting, right? So if we take the example of running a Spark job within an Airflow dag, you would…
Your very typical use case is to model that as an Airflow, dag workflow that will have [inaudible] steps in a simple case. You may start a Spark cluster initially, then you will run your Spark job and then take down your Spark cluster. So in your OpenLineage model, the task that starts a cluster is a task, it’s a job that exists. It doesn’t have any input and output dataset. And then the Spark job it’s in Airflow, it’s itself an entity, and then it runs the actual Spark job which will actually have multiple running steps, right? Each action in your Spark job will actually be a different run, right? So we model each individual action inside a Spark job as its own run with inputs and outputs, and that’s what consists with the lineage. And so, as part of this model, you’ll be able to see the nesting of, there’s a Spark job that’s made of multiple actions, and here’s the status of each of those action, and whether it was successful or not, and what dataset it consumed or produced.
And so to give you an example of what that looks like, once you have the model and you have multiple jobs reading and writing from the dataset, you can start visualizing how they depend on each other, right? And understand how those jobs are depending on each other in a more wider lineage view in Marquez. So in Marquez, for example, you have a lineage API that lets you retrieve, show me all the sub drive that’s connected to a particular note, and that lets you visualize your lineage graph and see the dependencies. Another use case, now, I mentioned that OpenLineage integration in Spark, we collect input size and output size, so that lets you see over time how this changed, right? When you look at a dataset in Marquez, for example, you’ll see the history of the version of the dataset, or every time it was updated there’ll be a new dataset version and you’ll see the facets that are attached to it.
So there’s going to be a statistics facet that captures the row counts and the bite counts written to it for each of those runs. So you can start keeping track of how the size of the dataset will evolve over time, and if, for example, if there’s anything suspicious in having suddenly a drop in row counts, for example, which is the cause of green data quality metric. And there’s also a [inaudible] going on on adding more quality metrics from things like great expectations and GQ and enable enriching this information if you want to keep track of metrics at the column level. And on that, I’ll encourage you to join the conversation. So you can find OpenLineage on Github. There’s a OpenLineage repo. We have a Slack channel.
You will find all the contact information on the OpenLineage, GitHub repo. So there’s Slack channel, Twitter accounts and Google groups for people who prefer email. And similarly, for Marquez, you can go on the Marquez Github to find all the information and how to join the Slack channel and join the conversation. And it’s really… And that’s where you can… If you have questions on how to use Marquez or what’s the appropriate way of what the roadmap for adding more integration to OpenLineage, this is where you would this information. And on that, I’m going to thank you and encourage you to ask questions, and thank you very much for listening.

Julien Le Dem

Julien Le Dem is the CTO and Co-Founder of Datakin. He co-created Apache Parquet and is involved in several open source projects including OpenLineage, Marquez (LFAI&Data), Apache Arrow, Apache Iceber...
Read more