Mark Paul

Engineering Manager, Healthdirect Australia

Mark Paul has 15 years experience in large scale software development. Having worked in frontend, backend, data engineering and architecture roles he has gained practical knowledge on how to build distributed software solutions that scale. He currently works for HealthDirect (An Australian Government agency), solving complex data quality issues in the Public Health space.

UPCOMING SESSIONS

PAST SESSIONS

Building a Federated Data Directory Platform for Public HealthSummit 2020

Healthcare directories underpin most healthcare systems around the world and is often a core component that enables initiatives like 'Care Coordination'. For example, if your Doctor needs to refer you to a Specialist, they use a healthcare directory to find the Specialist or if your Hospital needs to send out a discharge summary to your Doctor, they use a secure messaging lookup that's powered by a healthcare directory. Due to these kinds of critical use-cases, healthcare directories often become a "single point of failure" to healthcare systems. This is especially true in the event you have bad data quality within the directory.

In our session, we will present how the NHSD** implemented a 'Federated Data Directory Platform' that ingests data from multiple sources (Authoritative Systems of Record) and performs data operations like validation, matching, merging, enrichment, and versioning whilst generating and maintaining comprehensive data lineage, attribution and provenance in a quest to continually improve data quality, governance and completeness of Australia’s national directory of health services and practitioners. We will also cover how we currently ‘rank’ (promote/demote) input data sources based on manual audit outcomes and how we intend to use machine learning to achieve auto classification of preferred data sources. We will also detail our solution architecture built on Databricks Delta Lake and Spark Structured Streaming.

** Launched in 2012, the National Health Services Directory (NHSD) is a national directory of health services and the practitioners who provide them. This key piece of national digital health infrastructure was established by an Australian Health Ministers’ Advisory Council (AHMAC) agreement. It is jointly funded by Departments of Health within state and federal governments and managed by Healthdirect Australia.

[saisna20-sessions] [transcript-play-code]

Video Transcript

- Hi everybody, my name is Mark Paul, and I'll be joined by Anshul Bajpai. Our talk is titled, Building a federated data directory platform for public health. We've got a lot to cover today, we're gonna start by talking about the problems with Centralized Data Directories. We'll then move on to a solution in the form of a Federated Data Directory Platform. We'll talk about some design patterns, we'll move on to a specific use case called Intelligent System of Record Ranking and then we'll end up with some architecture patterns that you could take home with you. So we're from HealthDirect Australia. We're a national government-owned, not-for-profit organization that provides trusted health information and advice to all Australians. Within HealthDirect Australia we got the National Health Services directory. This is essentially Australian digital health infrastructure, the National directory of health services and the practitioners to provide them.

So let's begin by looking at the problems with Centralized Data Directories.

Healthcare Directories - Critical Healthcare Infrastructure

Healthcare Directories are essentially critical healthcare infrastructure as they underpin most of the healthcare systems around the world. And this is because it enables care coordination. So for example, if you see a doctor and he would refer you on to a specialist, they use a Healthcare Directory to locate that specialist. Or if you were to go to hospital and there's going to be a discharge summary sent out to your doctor at the end of your hospital visit. They use secure messaging which is powered by Healthcare Directory, so therefore they are single points-of-failure. This is specially amplified if you have bad data quality, cause then we have clinical risk to patients. For example, what happens if you can't locate your closest emergency department during an accident? So we need to look at this problem in a more proactive way.

Current Healthcare Directories are essentially centrally managed database applications, with this data updated via content management systems and call centers. This model is highly reactive and inefficient, mainly because of the high frequency of change of the data within the directory. For example, healthcare services constantly change their operating hours, and the practitioners in healthcare services are constantly moved between healthcare services. So there's high data volatility, we need a more proactive way to deal with this problem.

Regardless of whether you're in healthcare or any other industry that struggles with the issues around centralized data stores, one solution which we have adopted is to move to a Federated Data Directory Platform.

Federating Data is a powerful concept.

You might be familiar with a federated database, which basically maps multiple autonomous database systems into a single federated datastore. There is no aggregation happening here. Instead, it's the abstraction of multiple data stores.

Then you've got the Federated Data Platform, which is basically controlled aggregation to create gold-standard data by using multiple autonomous origin data sources. And the go to architecture pattern here is what we call event sourcing, which you will be familiar with. And we'll discuss that in detail at a later slide.

So data federation for Healthcare Directories can be described as building a Federated Data Puzzle. So if you look at the screen up there, to your right, you would see what we consider our gold standard puzzle. Now, we're not the masters of this data Instead, what we do, is we enroll Systems of Record to create this puzzle. And we essentially create and coordinate the creation via pipelines. So our role again, is not to create the puzzle but to enable the creation of it by using systems of record. So in that example, we've got Opera which is the System of Record and we've got Medicare which owns specific pieces of this puzzle, and then we coordinate this creation.

Let's look at some Design Patterns for Federated Data Platforms.

The entire process begins with what we call Source Classification. Basically, we identify Authoritative Systems of Record. Once we do that, they can play one of three roles. They could either be a Source of Truth, or the authoritative owner of a subset of our data. They could be a Source of Validation, which we then use to validate against and improve our data quality, or they could be a Source of Notification, essentially to increase a Data Currency. Data Currency is basically the extent to which our data is up to date.

Once we have that in place, we move on to what we call Entity/Channel setup. So we create our gold entities so these are our final entity models. For example, healthcare practice and organization practitioner. We have our raw entities, these are the raw source specific entities that are in a pre-map stage that we would then move to converting into our gold entities. What we then have our source channels, basically our pipeline channels, that transition these raw entities into our gold version entities.

We next move on to a process that we call Attribute Sourcing. Essentially, this is identifying the relationship Systems of Record have with our Data Entities. So that with that example on screen there, you see a healthcare service. Towards the middle of the screen, you see this thing called Practitioner Relations. Basically, the practitioners work in this clinic. And we use a system of record called Health Scope. And you will use them as a source of notification to keep this specific attribute set up to date. So this goes back to that example of building up federated data puzzle that I spoke about a few slides earlier.

Okay with that, we'd like to introduce you to our Curated Data Lake. We're gonna go into specific details at a later slide. But at a high level on the left of your screen there you see what we consider our Data Sourcing. So basically enrolling our Systems of Record data from those Systems of Record that enter our Curated Data Lake into a pre-processing layer, we then move it into a raw, stage, gold pattern, which you'll be familiar with, and a gold standard records then exit using a publishing application, which then makes those gold, stage, raw standard data available to our consumer products downstream. Let's now dive into some of those specific layers and talk about them.

Pre-Processing Layer

So it all begins in the pre-processing layer. For us, these are basically notebooks in our environment. And in this notebooks, we basically hit origin API's, we get data extracts, we could pick up data from S3 for example. And the main goal of this layer is to generate what we call a Source Data Event Object, which contains two attributes. It contains the data payload, which is the raw entity payload. And it also contains a provenance which is then used for source origin identification.

Raw Processing Layer(Bronze)

Once pre-processing is complete, we then move on to our raw processing layer or the bronze layer. Over here, we perform routine high level parsing and cleansing of data. Nothing specific to the entity itself. But we look at things like, is this file format correct? Or is this even JSON, for example. Once that's done, we generate what we call a Core Data Event Object, which is the fundamental building block that carries to downstream layers and we capture transition/operational changes too. We then generate what we call an Event Trace ID or the end to end traceability identifier. We also then do this thing called Data Lineage Capturing or in the form of Data Lineage objects, which begins at this layer but also happens at downstream layers as well.

Stage Processing Layer(Silver)

So once our role is complete, we then move on to our stage processing layer or the silver layer. Now, this is where a bulk of the operations happen. And there are many things that happen here. But at a very high level, we begin with the mapping operation where we convert from the raw source entities to our gold entity models, we then proceed on to a referencing operation, which is basically enrichment using reference data lookups. We then head over to our merging operation where we get the last known best version and create a new version of the data based on the change set required. And finally, we head into a validation operation where we enforce our final validation against our gold standard rules. Essentially, you pass the validation operation, you have a new version of a gold standard data.

Now let's look at that merging operation a bit more in detail. So how that works is we get a request to change a specific data attribute, we then match and get the last version of the attribute using a primary key. Then we merge on that last version using a process called Delta Determination, essentially looking for the changes that we need to implement. And then we generate the new version of the data. Along the way, we do this thing called, Metadata Attribution, where we log every change to every attribute, on every event. And this lets us make some runtime decisions. By looking at who was the last person who changed this attribute. We also generate data lineage objects in this layer. In fact, we generate quite a lot of data lineage objects in this layer, to capture things like attribute exceptions, violations, status changes, etc.

Gold Processing Layer

So once we're done with the stage layer, we move on to our gold processing layer. Now the gold processing layer's responsibility is, to ensure entity relationship validation and to prevent orphanS. So it makes sure those entity relationships are intact. It's also the layer that we do reprocessing and replay. So basically, we can if you want to reapply some reference data or business rules, we can replay from this layer. Or if you want to roll back to an older version of our data, we could use this layer as well. The gold is also a Data Science Layer, so our data analysts are constantly looking at our lineage and our history of our data updates, and they basically derive data quality benchmarks that we use to improve our data quality.

Data Provenance Object

Let's look at our Data Provenance Object. So this object, like I mentioned, is generated in our pre-processing layer. It's used to trace an event back to its exact origin. And we can look at things like, who was the exact source who came up with this change, we can even trace it back to the raw source that it originated from, or even trace it back to an external identifier like a source space Jira Ticket number for example. We also trace source intention. In other words, what is the source trying to do with this specific data change?

Data Lineage Object

And then we have the Data Lineage Object or the DLO, this DLO is generated in every layer. It encapsulates the operational outcomes that occurred to entity events, and helps us capture deviation of data quality over time. So we capture things like exceptions and warnings. Exceptions are used to fix our data and warnings are used to improve our data quality. We also use this for end to end data flow and visibility, so we know exactly what's happening to our data. Now we capture a lot of these DLOs, for example, the last couple of months, we have captured more than 25 million issues that can be traced back to specific sources. And these DLO objects play a critical role in our data improvement strategies, which I'm going to describe next.

We'd like to move into a topic that we'd like to call Intelligence System of Record Ranking. This has been an internal process we have been using and this is the first time we are publicly sharing it. So I'm pretty excited to share this with you.

As you've seen so far, we are a system heavily built on Dedicated Systems of Record because we are a federated data store. So these dedicated SoR, Systems of Record have full update authority over our data attributes. The problems with this is that the upstream data quality regressions flow into our system, which we don't want. And also some of these System of Records have no frequency of change. And that makes it look as though our data doesn't change very often. So we have no data currency, which is also something we don't want.

So a solution we've come up with to solve this is what we call, Candidate Systems of Record or CSoRs. Now you can think of these as alternate System of Records who compete to update the same data attributes as the Dedicated System of Record. So that example over there, we have a healthcare service. And it has its operating hours and contact details. So SoR A, which is the Dedicated System of Record has the ability to update this. But our Candidate System of Records B and C, could potentially also fill in and update this if need be.

Manual Ranking

At the moment, we encourage and enable this competition to update data by using what we call Manual Ranking. So this is basically ranking assigned based on business priority. So in that example over there, we have System of Record A, which has got priority one, to update the opening hours and the contact details. And Candidate SoR B and C, have a lower priority to update contact details. As we use streaming applications, and in the scenario where we have multiple sources competing to update the same attribute at the same time. We use this ranking metrics to basically know which to honor over which. So basically, who wins over whom if there was a race condition to update data.

Automatic Ranking

Once we have Manual Ranking in place, it enables us to do what we call Automatic Ranking. So this is basically Data Lineage outcomes aggregated over the last 30 days. And it's powered by those Data Lineage objects I was talking about. So in this example here, we have System of Record A, which you can see made 10 updates to contacts, which resulted in four warnings and two errors. Whereas we had Candidate SoR B made eight updates, but only resulted in one warning. So by looking at these aggregations we're able to boost priority based on the recent performance of sources. Originally, System of Record A has priority to update contact details, but by looking at the performance of Candidate System of Record B we give it a higher priority, as if we feel it can improve our data quality.

hea Intelligent Ranking - Future state Healthcare Service Entity Features

Where we are heading to, is what we consider Intelligent Ranking and we say this is a future state but we've already started implementing this. Our features are growing. In other words, our source base metrics that we're collecting about our data quality is growing. So in that example over there, we have System of Record A, we know how many updates it's trying to make. How many warnings have resulted in those, how many errors have resulted in those, how many public complaints can be traced back to data updates from System of Records, we even have things like longitude and all data quality metrics on completeness, accuracy, etc, that we're collecting. And at the same time, our sources are also growing. So we're getting more Systems of Record and Candidates Systems of Record that we enrolling. We've also had this scenario of what we call, Seasonal Data Regression, where we noticed that some Systems of Records, they go through a, maybe at certain point of the year they go through a data cleansing, which could either improve or also cause a worse quality of data. So it's no longer feasible for us to look at the recent performance of sources. So what we have come up with is, a source specific data quality model. And essentially what this gives us is a confidence score based on past performance, which we then can apply in real time. So we're not far from implementing this. And this then becomes the foundation for what we call Organic Data Quality improvement. Cause once we have this in place, all we have to do is enroll as many sources as possible, and then naturally compete to increase our data quality in this environment.

And with that, I'd like to jump into some architecture patterns. And I'd like to invite Anshul, who's gonna present this segment. - Thanks Mark, for setting the context.

Let's now look at the architectural patterns that we have in our Federated Directory. So talking of architectural pattern here at Healthdirect and we basically have few major components in the architecture, data producers, data consumers, development control data control plane and the operational control pane. Now data producers are the ones who are either pushing data to us or we are pulling data from and Healthdirect essentially has data sharing agreement with several health integrators. Depending on the type of data source, we have variety of ways to fetch and collect chain sets like either through API endpoints, or through secure FTP sitting on top of mounted S3 buckets. Since the federated directory heavily relies on the external reference data for enriching an incoming event, we also do receive taxonomy reference data, for example from a third party vendor. And we in fact have one of our internal data sources also pushing content updates through kinesis stream. All of these data sources does flow through a pre-processor to begin with, which is responsible for data acquisition and data ingestion as two key steps. All these three processes are built using data bricks notebooks with Scala as a choice of programming language. And Spark as a choice of engine for distributed computation. Now data consumers are the one who rely on consolidated and aggregated view of information available from the Federated directory of Healthcare Services. A data event, when successfully processed inside data control plane gets published to the kinesis stream, which then gets loaded on to one of the dynamo DB tables. This in turn gets exposed to the external health integrators and internal users like Healthdirect service finder via consumer API's, file API's and other channels.

Now talking of development control plane, this plane is meant to define the tools and IDEs used by the engineering teams to carry out development work, to build new features and apply bug fixes, preparing reports maintaining CICD pipelines and etc. Now data bricks notebook is one of the widely used tools across teams in Healthdirect to build the scripts, applications, dashboards, generate reports or run ad hoc queries, and also perform compute intensive aggregations for that matter. Since AWS is used as an enterprise wide Cloud Platform within Healthdirect. AWS code commit in conjunction with code deploy, and code pipeline, for example, is also used across the platform to manage automated pipelines for faster time to market outcome.

So talking of data control plane and operational control plane. Data control plane, this is primarily a collection of various real time processing applications which are built using Scala as a language supporting functional paradigm and it's Spark structured streaming as a distributed engine to facilitate event-based data pipeline hosted on data bricks platform inherently utilizing the power of DELTA lake. Now, each layer inside the data processing pipeline has a function to perform . Raw layer for example, along with the process of changed set determination is also meant to perform preliminary data cleansing by doing basic validation like file format validation to check if it's a valid JSON formatted file, for example, and provenance level validation to check if all mandatory source identification related information is available. The stage layer on the other hand, is primarily responsible for matching, merging and versioning the entities along with metadata attribution and additional source entity validation followed by final entity validation as per the defined data quality benchmarks. Gold layer however, is supposed to then finally prepare entities to be published in an aggregated presentable format, while making sure that the domain entity relationships are intact, so that any un-related entities or so called often entities are held back for future cascaded retrospective corrections. All of these layers read and write intermediate results and DELTA tables which are partitioned by entity types. Now DELTA table schema has been designed in such a way that it is independent of the underlying domain entity model. All of these streaming applications run on interactive clusters, and with the DELTA cash accelerated driver node and worker nodes underneath. All of these apps individually rely on their corresponding checkpoints and explicit data checks to keep track of events being processed, so as to avoid any accidental replays. Operational control plane is basically a place where we deal with the stuff like how the data can be securely accessed by various teams involved and the necessary administration setup to facilitate seamless data access and how the content is also going to be actively managed through internal management UI etc. So broadly speaking, we've got two types of teams requiring access to data and data bricks platform. Engineering teams and operations team. Both these teams require different level of privileges to be able to manage data processing pipeline and analytics or reporting needs respectively. And as a result, we have created two categories of interactive clusters associated with two different instance profile, essentially, two different IM roles with necessary AWS policies and permissions attached. These underlying AWS IM rules precisely defines what data someone can access and what operations are allowed to perform. All the batch jobs, streaming jobs, reporting jobs and analytics jobs, whether notebook based or jar based. They're all deployed through automated CICD pipelines. These CICD pipelines use data bricks jobs APIs to manage jobs with the necessary configuration including the schedule.

Now, let's look at a bit low level architecture. Considering S3 as the landing layer event is now flowing through raw stage and gold layers before it is available to the end consumers. Stage layer as you can see, depends on other static data sources like RF data, Medicare data for either enrichment or quality improvement through validation. Every step of the way, an incoming event is producing data lineage object, which is a reflection of all the operations and corresponding outcome, whether success or failure performed on the event throughout the pipeline. Now, these DLOs through a separate streaming application sitting on top of intermediate raw table, the stage table, and gold tables, gold data tables get consolidated and stored in a centralized DELTA table as a flattened DLO records corresponding to each operations of each operation outcome from each processing layer for each data event flowing through the pipeline.

Data Plane & Processing Pipeline

Now, this DLO DELTA table is then used by operations team to do exception reporting, for example, or a ranking, as my colleague mentioned in his presentation. Now, since the entire pipeline always works in an append mode, meaning any new data event is only resulting in creation of a new entity versions, so there is no override happening there. This has a potential to keep growing these DELTA tables and does impact performance over time. And as a result, what we have done is there are a few housekeeping jobs in place which runs on the given schedule to move historical version of entities from let's say, main DELTA table to archive DELTA tables based on the time threshold criteria which actually leaves behind only the latest version of any entity in so called main DELTA tables at any given point in time. So, this way, gold table in DELTA lake becomes the source of current snapshot of directory data.

Continuous Streaming Applications

So, now essentially, there are quite a few advantages of continuously running real time streaming application. It is not only enabling us to have a true event sourcing as in when it happens from systems like kinesis, S3 and DELTA tables. But also running micro-batches is sort of helping us interact with smaller and more manageable data volumes. And of course, recoverability through checkpoints is a bonus which comes by default with Spark streaming along with the added reliability because of using DELTA tables. Now, before I end my presentation today, I wanna briefly speak about most commonly occurring data issues

and how we sort of tend to resolve it using some of the key pillars of our architecture.

Now, here's the problem statement. So imagine a downstream health integrator is complaining about an unanticipated special Unicode character in the service description, which is sort of breaking their integration.

Some of the steps that we generally follow, to address these kinds of data issues is essentially go and grab the latest version available in the system and check the payload as a first step, as a first thing basically, and then try and analyze the metadata attribution associated with the latest version to figure out two level of insights. One, when was this particular attribute modified and who was the originator for this modification? Depending on the identified timeline, we try and grab all the historical versions meeting the time threshold criteria, followed by isolating the faulty version by comparing the payloads between the machines. And then we also additionally try and grab the provenance of the faulty version to trace it back to the source of its origin. And also, we try and grab the data lineage of the faulty version to find out all the operations performed on that event at that point in time, and see if there are any additional warnings generated. And that way, we are also able to inspect the other events from the originally ingested file to find out if the other entities are also impacted. And finally, we are now in a position to replay the latest version of all the affected entities with the payloads corrected to be able to create the freshly new versions. Now all of these has been purely possible because of these four important and significant factors. One the ability to try to traverse through the statical versions. Second, the metadata attribution associated with each version to isolate the faulty one. Third, visibility on the series of operations performed on the faulty version through the data lineage. And fourth one is tracing it to its origins source through the provenance, which is actually supported up to millisecond precision in our system. And this way, we have not just been able to achieve the complete auditory but also being able to generate data quality reporting for any system of record.

And with this, that's pretty much it about from us today, and thanks a lot for spending your valuable time in listening to our story and I would now like to open up