We present our solution for building an AI Architecture that provides engineering teams the ability to leverage data to drive insight and help our customers solve their problems. We started with siloed data, entities that were described differently by each product, different formats, complicated security and access schemes, data spread over numerous locations and systems. We discuss how we created a Delta Lake to bring this data together, instrumenting data ingestion from the various external data sources, setting up legal and security protocols for accessing the Delta Lake, and go into detail about our method of making all the data conformed into a Common Data Model using a metadata driven pipeline.
This metadata driven pipeline or Configuration Driven Pipeline (CDP) uses Spark Structured Streaming to take change events from the ingested data, references a Data Catalog Service to obtain mapping and the transformations required to push this conformed data into the Common Data Model. The pipeline uses extensive Spark API to perform the numerous types of transformations required to take these change events as they come in and UPSERT into a Delta CDM. This model can take any set of relational databases (1000s in our case), and transform them into a big data format (Delta Lake/parquet) CDM in a scalable, performant way all from metadata. It can then perform schema-on-read to project from this CDM into any requested destination location (database, filesystem, stream, etc). This provides the ability for Data Scientists to request data by specifying metadata, and the pipeline will automatically run producing the schema they require with all data types conformed to a standard value and depositing it to their specified destination.
– Hello everyone, welcome to our session where we will be talking about democratizing data by creating common data models and configuration driven pipelines to enable AI platform. My name is Shiran Algai and I am a senior manager of software development at Blackbaud. Presenting with me is Cindy. – Hi, I’m Cindy Mottershead, AI architect at Blackbaud. – Today, I’m going to be giving you an overview of the problems that we were facing and a brief description of our journey to build our data platform. Cindy will be going over our architectural decision, the common data models that we created, what the configuration driven pipelines are and the transformation building blocks and the AR feedback loop.
Before we get started, just a brief word about Blackbaud and who we are, Blackbaud is the world’s leading cloud software company powering social good, we serve the nonprofit and philanthropic space globally.
Let’s get started by understanding the problems that we were facing.
First we have single tenant and multitenant databases that spend multiple products built on different technology stacks and hosted with different cloud providers. Of course, all of our values stored in those silos.
Second, this has led to similar domain entities being stored very differently by products. For example, a constituent may look like X in product one Y in product two and so on. Sometimes that translates to new fields, sometimes more radical departures, even though describing the same core entity.
Third, through acquisitions and new development efforts over the years, we have actually compounded the problem by adding even more variety.
Then even within each product, data entry and consistencies exacerbates discrepancies in data sets.
Finally, it has become painful to get access to all data and it takes a long time to get access to that data. So how do you scale getting new data sets in any sort of reasonable way? Almost always, we are tied to some legacy product that has all the value. The question becomes how do you get data in and out of those products and make it accessible to microservices deployed in the cloud, so you can modernize and provide insight to customers?
Looking at our journey, we had a few data Lake projects,
they were scattered throughout the company.
So we began to build consensus towards building a common data model, a data platform and being a Microsoft partner, we started off by leveraging existing Azure tooling such as data factory, U-SQL, and Azure data Lake, with a target of using as many paths, tools as possible.
We wanted to keep it relatively simple and only do batch processing of data.
Finally we picked a small project that required getting data from a legacy product that would allow us to test things, from end to end.
This is what our initial architectural look like. The data would flow in from our various data sources into Azure data Lake storage and move from an initial transient zone by data standardization services to the raw zone. From there, the data would be moved by data quality services to a cleanse zone and finally data enrichment and modeling services would further enhance data in the enriched zone. The enriched data would be fronted by a data services API, or be delivered to additional databases to finally be consumed by our products.
It didn’t take much time for us to find some flaws in our approach, which led to some pivots in our architecture.
We found it painful to add new readers for different sources that were not supported such as Avro and Parquet, there were some gaps in the Azure data tooling that we were using for our specific use cases. We realized that support for batch only, processing was insufficient and we wanted to have a single path for both batch processing and streaming of data. We need to have a compacted version of our records to recreate data sets from our legacy products in the data platform to make the data more consumable and useful for the rest of our processes. And finally, we needed to use more standard standardized tools, so we can hire data engineers more easily with the right expertise that can get up and running quickly.
So let’s look at our solution at a high level. We bring data together into a common data Lake that, per data sovereignty region, we catalog data sets and transformations making data easy to find access and manipulate. We’re bringing out, we’re building out common data models, so new services can operate on data and not worry about differences in source systems. We set up and deployed configuration driven pipelines to support new transformations more easily, we developed tooling and infrastructure to allow teams to spin up their own spark pipelines quickly and in a consistent way, we incorporated technology to match records across products and entries creating a unique clinking key and we have infrastructure to support faster time to value by development teams.
Let’s talk a little bit more about the different components of that architecture.
We leverage Delta Lake as a storage layer on top of Azure data Lake, since it offers acid transactions on spark.
The data catalog service captures metadata about our data sets, but it also captures metadata about the transformations that happen within the data platform, to more easily find access and manipulate that data. The Lake authorization service was put in place to automate and better control access to data sets within the data platform. The ingestion service streamlines the flow of data into our data platform and of course the output server standardizes the flow of data out of the data platform and back into our ecosystem. And finally, the ACN contract broker service that seems a bit out of place, but it is our in-house message schema repository that proactively prevents services from releasing breaking schema changes.
So taking a closer look at the ACN contract broker, traditionally this type of functionality would be an application platform service but we saw an opportunity to get developers to do the unimaginable. We got them to actually catalog and annotate their own data. By using the ACN contract, broker developers get to enjoy cogeneration and peace of mind that they won’t introduce breaking changes, through their async messages Schemas. And at the same time, the data platform gets to use the metadata they generate to populate entries in our data catalog. Another problem we solved with this service was that, it was hard to convince development teams to start sending us data, especially when we didn’t have an immediate benefit, we could offer them right away. But we already had a lot of services sending a lot of good data over async communication. And service Bus made it very very easy to add an additional subscriber. So we made a process that lets us automatically subscribe to topics on a voluntary basis and push all produced messages into the data platform. Data that flows via this route is saved as a Delta table almost right away and cataloged as a separate entry in our data catalog. But with the same schema captured in the async contract broker, therefore everyone wins, everybody’s happy and we profit. Now that we’ve talked about architecture in our journey, I’d like to turn it over to Cindy so she can walk through how we leverage all this great data that we have ingested into our data platform.
– Next one.
Now that Shiran has walked us through our journey with the data platform architecture, I want to dive down a little into how we make this data easily available in a conformed and consistent fashion, across all products and clients. We accomplish this by creating a multi terabyte common data model. This common data model has a common defined structure, consistent naming of tables, structures, and fields, consistent representation of common values and consistent across all applications and application types. And it is integrated with our value added services.
A common data model or a CDM is a collection of objects. Each object represents an entity, in this case, the entire entity is a person. The object contains all the information about that person, including demographics, phones, emails, addresses, transactions, et cetera. These objects are stored as a row in a Delta table. Here we see that the name field is a structure of all aspects of the name. The address, phones and emails are arrays as they’re a multiple of each of one for one person.
Let’s look at the phones array.
These are rays of message structures with collections of various information about the phone, including auditing information and core phone information.
If we dive down into the core information, we finally see the phone number along with all the other possible information around a phone number. There is an array entry for each phone type home, mobile, work, et cetera.
The input of the common data model is a stream of change events coming from thousands of tables, CSV files, json, Parquet, across all of our products and across all of our clients. So we have the data is all in all forms, normalized data, denormalized data structures, nested objects, a simple flat files and it comes from many different databases, SQL server, MariaDB, Oracle, flat files, et cetera. Since we are processing streams of change events, we need to be able to update each change event without carrying other tables, considering each source table can be billions of records. We need to be able to update a single field in a structure in a column in the common data model. To do that, we created a configuration driven pipeline.
How do we get the relational data base changes events into this format at scale?
We use the CDP, the configuration driven pipeline. We designed a configuration driven pipeline that has a common ID. A common ID is a requirement in any common data platform. It is required to allow your cables to merge into the common data model using the common ID. Destination data sets are defined by creating a map from the source table specified in the data catalog to the destination table. The map is created by a module that presents a list of available source tables and columns definitions provided by the data catalog and a list of valid transformations, the CDP supports and a list of the available destination table fields. The metadata map is constructed to relate each source field to the destination table where the transformation is required to trans transform that into the correct schema, spark structured streaming processes change events, interrogates the data catalog to determine lineage from this source table to all destinations. It receives this metadata map of source to destination and uses topological storage to resolve dependencies and the map generation. For each lineage, it applies transformations on each field specified in the map and then merges into the destination table. Transformations are basic building blocks that allow the pipeline to take any source schema and transform it into the destination schema All right, transformations.
We’ve decided that, we’ve come up with a list of transformations that are building blocks, that if we can apply these six transformations to any input, any relational input and be able to create the desired common data model schema output. These six filters that we’ve defined are filters, transformations to filters, views, one to one with SQL transformation and Lookup, where we take one field and we put it directly into the destination field with possibly with a SQL transformation. One row to many rows where we take one row of data input change events and end up creating multiple rows to represent that data.
An example of this is when we get denormalized data, we want to break that up into multiple rows and then unpivot operation. We have many rows to a two array in one column, this is a case where we have phones, somebody has phones and multiple rows, their home phone in one row of work phone and another row mobile phone in another row, this transformation will allow us to collect all of these phones for that individual app and put them into one column in the destination table.
And the final transformation that we need in order to be able to transfer and transform any of these is aggregations, be able to do maximum and other types of aggregations on this data. So with these six filters, we’re able to transform any relational data change event, even which is, we just gotta a change event, which is a partial update to our record and we can figure out through the map and through and use one of these transformations to successfully push it into the proper place in their common data model. The filters are straightforward just, but we have pre-filters and transformation filters, where the pre-filters will be applied to the stream as it’s streaming in the change events. And they post their transformation filter will be applied to the transformation once it’s in process. And the view is instead of writing to a destination table, we just create a view from the map, so we can map any combination of source and transformation to destination and create a view of that. So this allows us to, for example, an easy way for us to provide non PII data pro from a huge table, by just making a view that hashes out the PII data and provides the rest of the data. So with these transformations, we’re able to do the configuration different pipeline can create any common data model or any destination model.
In this demo, I will show how the configuration driven pipeline processes, incoming change events to update a multi terabyte common data model. The change events in this example, here there are no denormalized data for phones associated with a person are processed by a spark structured stream. When the change of vent arrives, the configuration driven pipeline calls the data catalog service to return maps, to any destination data set that has a lineage from this source. This map specify the source field, any transformation needed and the final destination table field.
When these common configuration different pipeline processes all of these maps, it pushes the results up and updates the terabyte common data model person in this example.
So, that’s how the CDP receives these change events over a stream, reapplies the transformations and pushes them into a Delta table of other common data model. Now, we will look more specifically at an example of what spark API calls are used to do this. Here, we will start with a change events that are just name events, a very simple, just name events. We have a for product a, we have person ID one and their first name and last name and ID two with their first name and last name. These change events come in, we interrogate the data catalog service and ask for any lineages based on new sources. And we returned this map of source field transformation destination field, where you can see that the source field here, is the ID and we’re going to turn that into a common ID in order to go into a common data model by simply applying a transformation, that’s a CONCAT of the product name, product day with the ID and then we’ll push that into the destination field. So let’s apply that map right now and to apply it is very simple, we take our deltas that are our change events that came in and we add a new column, that’s the destination column, common ID. And we apply the transformation, the SQL transformation that was specified and we use a source and fortify ID. And so here, we simply created a new column of ID applying to transformation and we have our new column, common ID with a new common ID, which is productA colon one and the productA colon two.
At this point, we’ll process the remaining two or three in the map, which are very simple as well. If you remember the map is for two and three it’s just first underscore name goes to the destination of name dot first and last underscore name dot last. So we apply that transformation and we can see that, we have our change events that came in, we have our common ID that we’ve added, now we have this column name dot first and name dot last. At this point, we have finished processing all of our transformations and the CDP is ready to just hand it off to the CDP finisher that takes any combination of transformations and then cleans them up and pushes them. And the way what it does, the CDAP finisher is responsible for create structures. It will determine what destination structure columns were extracted and extract the nested fields necessary to fill in those destination structures. It also conforms the rest of the STIC schema to the destination schema, setting up fields as necessary and schema formats as necessary. And then it merges this Delta table into the destination table.
As you can see here name dot first name dot last, obviously look like structures, but there are two columns here so that the CDM finisher will take a look at that and say, we want an asset field named name and the sub fields are going to be name dot first name dot last. So to create that, to gather this structure, we simply do a new column, nested field of name. So we’re going to say new column named name, and we’re going to create a structure out of all of the columns that are sub fields of that structure in this case name dot first name dot last. So if we run that, we will see that now we have a name column with, a structure is underneath it named diverse.
Then the part of the CDP finisher that looks to make sure that the schema are the same as the destination schema we’ll look at this and say it, the destination schema has a middle name as well. So in order to set that up, we’ll just simply add that column to the nested columns, but we’ll just make it a literal a blank. So this time, when we gather up our structures, we will gather them up with the fields that we’re providing and the fields that are necessary to support the schema and the destination. At this point, this is ready to be pushed up into the destination, so we will just select the destinations part of the CDP finishes will just select the columns that are necessary to be pushed into the destination. In this case, all we need is the common ID, which we’re going to join on and the name column, because that’s all we want to push into the destination. So to upsert this into the common data model, we will do a merge, we’ll merge into the destination table and we will use the common ID is to join on. So I’ll join on common ID and when it’s matched, then we’ll start. We have the CDP finisher has gone through and created an update list based on all of the schema and destination and source and so in this case, it’ll say destination name dot SQL, source name dot first. But you can see, we did not do the middle name because we’re not updating the middle name because that did not come from the source, so we will just update the source fields within a structure that we have new data for. When it’s not matched, when this is a new row to be added to the common data model, we will just simply insert common ID and name and in that case, since we have set up the schema to be the proper schema for the destination table, with middle name as well, we will be able to insert this into the table with the proper schema. So with that, we have the destination table, it says we would want it to be where we have the structures nested, nicely, and a common ID. So that’s it for the simple transformation processing.
Now that we have a common data model that has all of the data across all the products and clients in a conformed way, in an easily accessible way where they can, where data scientists, or other product people can use, our link authorization service to simply request access to this table or they can access a view, that has no PII in it.
Now we are in a position where we can start creating an AI feedback loops. We can have the full cycle of model development, where we can have a model that pulls data from our common data model and trains a model and then we deploy that model and it starts scoring some new input data and we can turn that, the result, the labels that are created from that model back into our system and that data will become available for them. We can retrain our model on the labels that were created by the model itself and do a complete AI feedback loop.
– So when we look at it all together, data is not flowing from various products and services. It is automatically ingested into our data platform. We can transform the data through configuration driven pipelines and into common data models. Data sets can then be leveraged, much more easily and generically to produce value to our customers. And finally, we have baked in feedback loops so we can, further iterate and improve.
So how did we democratize data? We made it easy for our data scientists and engineers to access data sets from common data models in a secure manner that limits our liability and maximizes the benefits, that we can draw from them. They no longer have to worry about the, how, when and where of getting access to data center. They can more easily manipulate data, create insights, develop models, and output results to our ecosystems. Thank you very much for your time.
Cindy Mottershead is an experienced, hands-on software architect involved in the architecture, design, and implementation of big data, real time, Machine Learning, and distributed systems. She is an effective change agent driving strategic decisions and implementations across the company. She has lead the architecture of scalable, performant, distributed solutions into production, pivoting when necessary to maintain a competitive edge. Her last startup, attentive.ly, was acquired by Blackbaud, where she is currently involved in leading the AI Architecture.
"Shiran Algai is a Senior Manager of Software Development in Blackbaud’s Data Intelligence Center Of Excellence. Blackbaud is the world’s leading cloud software company powering social good. Shiran started his career at Blackbaud in 2006 after graduating from Clemson University with a BS in Computer Engineering. After 10 years as a software engineer working and leading several initiatives in Blackbaud’s wide portfolio of products, Shiran took an opportunity to move into the management side of software engineering. For the past two years, Shiran has managed Blackbaud’s Data Platform team and initiative, which is at the center of Blackbaud’s analytical transformation. Shiran has been a volunteer with the TEALS program, a Microsoft Philanthropies program that connects classroom teachers with tech-industry volunteers to create sustainable CS programs, for the past 3 years."