Build Real-Time Applications with Databricks Streaming

May 28, 2021 11:40 AM (PT)

Download Slides

In this presentation, we will study a recent use case we implemented recently. In this use case we are working with a large, metropolitan fire department. Our company has already created a complete analytics architecture for the department based upon Azure Data Factory, Databricks, Delta Lake, Azure SQL and Azure SQL Server Analytics Services (SSAS). While this architecture works very well for the department, they would like to add a real-time channel to their reporting infrastructure.

This channel should serve up the following information: •The most up-to-date locations and status of equipment (fire trucks, ambulances, ladders etc.)

• The current locations and status of firefighters, EMT personnel and other relevant fire department employees

• The current list of active incidents within the city The above information should be visualized through an automatically updating dashboard. The central component of the dashboard will be map which automatically updates with the locations and incidents. This view should be as real-time as possible and will be used by the fire chiefs to assist with real-time decision-making on resource and equipment deployments.

In this presentation, we will leverage Databricks, Spark Structured Streaming, Delta Lake and the Azure platform to create this real-time delivery channel.

In this session watch:
Bennie Haelen, Architect , Insight Digital Innovation

 

Transcript

Bennie Haelen: Hello, and welcome to my talk. My name is Bennie Haelen, and I’m a Principal Architect with the Insight Digital Innovation. We’re both a Microsoft and a Databricks partner, and we’re very proud to have received the Databricks APAC partner of the year award last year. We have recently implemented a solution for one of the largest municipal fire departments in the country. And we’d like to share some things we did for them, specifically we built a real time reporting channel for them with Azure PaaS services, Databricks, spark structured streaming, and Power BI. So let’s get going.
So want to first describe quickly what we already implemented for the fire department. So this is a large metropolitan fire department, and we implemented a complete modern data warehouse architecture on Azure for them. And this architecture is based on Insights repeatable MDW framework architecture. So with this architecture, they can ingest data from a variety of sources. So it could be SQL, could be REST API, Saas services, et cetera. We ingest the data. We can land it in the raw zone, from the raw zone we can then cleanse and transform the data, land into a staging area. And from the staging area, we can build a dimensions and fact tables and land those in the MDW area.
From the MDW area, we can then push the data either to Azure analysis services or Azure snaps, and then finally the visualization itself is performed by Power BI. So in this architecture, we have a set of reusable, templatized, ADF pipelines, and these ADF pipelines are used for both data movement and for scheduling. As well this architecture works great. It’s been in production for quite a while. The fire department recently came back to us and asked for a real time reporting channel. So we’ll take a look at what their requirements were here in the next slide.
So what was the extension of our use case here? Well, the fire department wanted this real-time reporting channel, where they would get an up-to-date location and status of all their equipment, so that includes fire trucks, ambulances, et cetera, et cetera. They also want to know the location and the status of all their personnel, including the firefighters, EMT personnel, and really anybody working for the department. And then finally they want to know what’s going on in their city. So they want to know what are all my active incidents, what accidents are happening, where are the files, et cetera, et cetera.
And they wanted to take this data and provide a real time visualization for it. So they wanted to create an automatically updating dashboard and a very important component of that dashboard was a map which would provide automatic updates of locations and incidents, and importantly, that maps should be automatically updating based on data triggers and that map and the complete solution really would be used by fire chiefs to make real-time move-up decisions where they preemptively move up equipment and resources as incidents are happening throughout the city. And it’s anticipated that down the road, we’ll actually help them implement some AI algorithms to help drive those decisions.
So let’s take a deeper look at what we need to build here. The incidents themselves are sourced from quite a variety of systems, the main one being the 911 system itself, but then we have GPS trackers, et cetera. All these events land into the central fire departments database. And we leverage a Change Data Capture, CDC infrastructure to get those events, and then hopefully forward them to an ESB, an Enterprise Service Bus which in this case was [Tipco] [inaudible]. So all of this infrastructure was existing infrastructure that was set up by the fire department. Insight tap into the Enterprise Service Bus through a WebSockets interface. So we built a component that received the events through the WebSocket interface, and then use Azure function to forward those events to our Cloud Ingestion channel. And we’ll take a look at our architectural requirements for our cloud infrastructure next.
So what are our architectural requirements? Well, we need to ingest events stream, we anticipated at least a 1000 events per second. So we need a very high ingestion rate. So we need a really high performance fault-tolerance ingestion service. We need to stream those events after the ingestion service. And then we need to do some domain specific conversion and real-time streaming analytics on that data.
Once we streamed off the events and converted them… We want to store them in a very high performance data store where we can have keyed access to the data. So we can refer to units by unique identifier. And then importantly, we need to perform UPSERT operations, right? Because we want a real-time representation, a real-time view of what’s going on. So we need to able to perform an UPSERT where we merge newly arrived events with already existing data.
And then last but least, we need to visualize that data in the real-time dashboard. And as we mentioned before, we want updates to be triggered by data changes in the underlying data store. So now that we know the requirements, let’s see how we actually decided to build the solution. So, in our solution architecture, we chose Azure Event Hubs as our ingestion channel, Azure Event Hub is an ingestion strength, very robust ingestion service that can literally process millions of events per second, so well beyond our ingestion rate. It’s high-performance, fault-tolerance and it also has a nice Kafka interface for both producers and consumers, so, it allows us to integrate with that popular ecosystem. We chose Databricks as our analytics platform, Databricks offers a great solution for our cloud platform which in this case is Azure. And then we combined Databricks with Spark Structured Streaming, Structured Streaming is a great analytics streaming service. And what’s nice about it is it allows us to use the same semantics as we do for batch processing since structured streaming is ultimately based on the same Spark SQL Engine.
And then we need the land to data, we chose Databricks Delta Lake as our data store because, it’s a high performance ACID data store. It gives us keyed access to data. It allows us to do those UPSERTS that we mentioned earlier, and we can use SQL to access, was very easy to access. So it really gives us that high, scalable asset big data store that we need to implement the solution.
And finally, we select the Power BI as our visualization solution. It’s easy to integrate and Power BI can seamlessly query Spark and Delta tables directly with direct query. So we can issue direct queries and then directly visualize those results, and Power BI also has that capability to trigger updates through data changes. So it’s a great solution here.
So what we are going to look at next is a quick little demo, since we can’t tap into the fire departments live feeds of course, I just created a small streaming events similarly, so that’s C#.NET application that simulates a little red firetruck that produces events. We create an Event Hub Namespace with one single Event Hub called Units Events Hub. These events are streamed off Event Hub with generic streaming Notebook. And this Notebook really only just reads the event of Event Hub, does some normalization and then only calls into a Unit Status Event processing. That architecture allows us to separate individual event processes from generic streaming infrastructure. So it’s a nice pattern to implement.
The event processor will update our Unit Status Table that it’s [inaudible] that real-time view and we visualize through a very, very simple Power BI premium report. The Delta Table itself is built by Create Delta Table generic Notebook, that’s called by another Notebook, and we’ll take a look at that in just a second.
So with that, let’s talk about how we going to organize this demo. So we first we’ll take a quick look at what resources we stood up to make this happen. And then we’ll do a code review where we look at the events simulator, how we created the Delta Table, our Spark Streaming Notebook and our Stream Processor. And then finally we do a quick run of the demo.
Next, let’s take a look at our infrastructure. So we here in the Azure portal and we’re in our resource group and a streaming resource group. First, let’s take a look at our Event Hubs namespace. So this is your standard Events Hubs namespace, nothing really special here. What we’ll do later is will take for this shared access policy, we’ll take our connections through primary key, and that was one of the items that we will need to store in our key vault.
So next, let’s go take a look at Event Hubs, and you’ll see we have a single Event Hub called Units Events Hub, and that will be the Event Hub we will use to actually store our events. And that’s what we’ll be working through, that’s what we’ll be streaming from. Let’s go back to our resource group. Then second we have our storage account, and this is a standard data collection to storage account, and it has a single container called Data Lake. Within that Data Lake you see kind of the classical organization that you see for a lot of modern data warehouses, wherever you have a bronze, silver and gold area for the Data Lake. The bronze is typically where extracted source data is landed, then the data’s [inaudible] and silver. And then finally the dimensions [inaudible] table are built and they land in the gold area and from there on are loaded either in Azure set-ups or in Azure analysis services.
Next, let’s go back to our resource groups, take a quick look at the key vault we mentioned, the vault there we have only a few sequence we really need, we have client ID and the client’s Secret, that’s the service principle that we use in Databricks to go and access the Data Lake. We have our connection string that we mentioned earlier for Event Hubs, and we have the [Tenant ID] for Azure [inaudible]. Okay. So we covered everything except of course, Databricks itself.
So let’s go and launch a workspace and we have a [inaudible] here because we’ll do some things with Azure [EPs] where we have a premium instance and here’s our workspace. Let’s take a quick look at the clusters. You’ll see we just have put a demo here, I just created a small little cluster maximum of two nodes, nothing really special, except for the libraries, we did install the Spark 2.1 Library, so that is the Azure [Thunders Spark] library that we will use to talk directly to Event Hub, to power events from the Event Hub.
For the rest we take a look at the workspace. We see, we will have a couple of different forms. We have a Startup folder where we have [inaudible] that mounts Data Lake we’ll take a look in the [inaudible] real quick there, and then we have [inaudible] streaming files where we create a Delta table and implement the actual streaming code.
Finally, want to quickly point out in the data area here we have… When you look at DBFS, we have set up a checkpoint directory. So this TMP unit status directory is actually a checkpoint location for our streaming processes [inaudible]. Checkpoint location like this set up will allow us to seamlessly stop and restart our streaming process without any issues. So it’s very important if you want productionalize all streaming process.
And so that pretty much covers our resources in Azure, besides the Azure resources I created a small little C#.NET program, and this .NET is only used to simulate a firetrucks. So we have a little red firetruck here that will move across the screen, and we’ll see that in the demo. So, that concludes our infrastructure walkthrough. Thank you.
Now that we have taken a look at our infrastructure, let’s take a look at our actual code, the code we wrote to implement our streaming algorithm.
Next, let’s take a quick look at our code. We’re here in the Data Lake workspace, and let’s start up by looking at the Startup folder. We have a generic Notebook here that can Mount any Adls Gen2 storage. So you’ll see we have three parameters. We take a storage account name, the file system that we want to mount, and the mount point path, how we want it look like in our Databricks workspace. So here we set up all widgets. We pulled some secrets from the key vaults, so we need a service principle under which we will be accessing the account. So that’s where our client or the client Secret is. We also need to pull a Tenant ID. We get our actual parameters. And then it’s a very simple [inaudible] to set up here to set up our configuration, where we passing on the Client ID and Client Secret, or Tenant, and then ultimately use the ABFSS syntax for the Data Lake and Mount the actual storage with [inaudible], so that’s really all there’s to it, very, very simple Notebook here.
Next, let’s go take a look at our actual streaming codes. So first we need to go and create our Delta Table. So again, we created here a very simple generic method, create Delta Table that takes a database table schema, and the location of the underlying Delta file. What we’re doing first is a simple little trick to use the schema and create an empty data frame out of that schema. So we pass in the scheme here, and query the [inaudible] hopefully have a empty DataFrame, which will have the correct schema. We then write in Delta format to the specified location, and then next we drop and we create the database of the table. And then ultimately quickly run a SQL statement, the described statement to make sure table was created successfully.
So we can evoke this Notebook from our Create Unit Status table, so the only additional a couple of items we need to take care of here is we need to set up a database name or table name and our location, we create our schema. So this is a schema for the incoming events.
And then next we just call the created Dalta Table, method that we just saw on the other Notebook, passing in all parameters and this all describes statements where we can nicely see that our table has been has been created successfully. So that’s the Delta Table creation.
Now finally, let’s take a look at the streaming code itself. We really have two parts here. We have kind of a generic first Notebook that just sets on the skeleton of the streaming. And then we have a processing function that actually processes the Events, so it’s a Unit Status Event process, and this is kind of a generic pattern that we tend to use a lot. And it makes it really easy to create these generic processing functions, which you could then hookup in a more genericized streaming code.
So, let’s take a look at the code itself. So we go and call the [inaudible] connections string, we set up our configuration and then we call a Read string here with [inaudible] Event Hubs, what else we’ll do is we’ll create a Streaming DataFrame, so this is our unbounded Streaming DataFrame that we received back. And we see the schema here, this is a [inaudible] schema for Event Hub, couple of things to note here, like you see, for example, the body is binary, a little bit hard to work with. So what we want to do is we want to take this schema and [inaudible] it something that’s a little bit more usable, which is what we do here with our good old PySpark withColumn method.
So what we do is, for example, is convert the body of binary into a string type, which will make it really easy in our processing function to go and pull out the JSON out of methods. So you see now when we compare these two schemas, we got a nice string schema here, and our final step here, the main processing logic is just taking the messages DataFrame and writing it with string. Well, we’ll skip the checkpoint for a moment, we’ll come back to this. So we do our Write Stream and we do a [inaudible] batch, so it’s means take [inaudible] batch and pass the DataFrame for a particular batch, so events for that batch, pass those onto this processing method, and then of course called start method.
So a couple of things to note, messages [inaudible] because we read it here with read stream, but DataFrame in that passed to our micro batch method, we’ll actually be a regular DataFrame which is [inaudible]because that means we can can count it, we can extract records for it. It’s just a regular standard DataFrame, that’s very easy to work with.
So next let’s go take a look at our processing function itself. And here very, very simple matrix, so we have our incoming schema that we set up, let’s take a look real quick, also at our processing method. So you see here we get in this DataFrame that I mentioned earlier is a regular standard DataFrame now, and we get a batch ID that allows us to track [inaudible], we set up our schema and then next, what we’re going to do is because we really will be interested is the body of the message. So we’re going to extract out the fields of the body and then do some transformations. So we call them transformations to convert timestamps to pull out things like battalion, division, bureau et cetera, and select all those out and get a nice clean Unit Status via DataFrame here, we then drop our duplicates, make sure we don’t have any duplicate records, and do some the [inaudible] printouts.
And then finally we go retrieve our Tables, we use Delta Table for [inaudible] to get a reference back to all Delta Table. And then we can just do our simple UPSERT. So we can just say Deltatable.alias [inaudible], use our standard merge statement, our name, because our name is the key of our table and we can say, when not match, we’re going to insert or and when match, we’re going to update also, standard UPSERT functions, nothing really special here.
So, that pretty much concludes the walkthrough of the code. So what we’ll do next is will take this code and we’ll do a quick run on all of this so we can see this in action. Thank you.
So now that we’ve taken a look at our code, let’s run our simulator and let’s run or Notebook so we can actually see this in action. Next let’s run or demo real quick. Will start out here with our a small little C#.NET events simulator. So what we have here is a little red firetruck, [inaudible] fire department, right? What we do here is we created the method Producer Client, Create Event Batch, take the position of our truck and serialize that into an event, serialize that into JSON string and add to Event Batch and send it to a Event Hub. Once we send that event, we increment the position of our truck slightly along the [inaudible] here and do this all over again. And as you can see, we have a total loop here of about a 10,000 so that’s our C# events simulator.
So next let’s go look at our streaming tool. So here we have our Event of Spark Streaming Notebook, and let’s go ahead and get this Notebook in a running state. And as things are being set up here, we can see just like we did a code walkthrough, connection strings that we read from the key vault. And then we use the connection string to do a Read Stream from Even Hub with our configuration, with our connection string. We see the schema here. So we’ve got a body that’s binary, offsets, et cetera.
So next what we do in our next statement here is just some simple withColumn PySpark transformation. So we can transform these columns into something that’s a little bit more usable and especially the body. So you can see here we’ve taken the body and casting that nicely to a string type, and that will make them a lot easier for us later to go get the JSON object out of that string. So here we have now a very nice clean schema with a string body and on timestamps.
Next, we just simply have to take our messages, DataFrame, and our messages DataFrame is of course, a Spark Streaming Data Frame. It’s unbounded for all practical purposes. It’s the same as a regular DataFrame, but it is unbounded, which means rows and columns will be an added at the bottom and we cannot do things like take a take count or take out the [inaudible] because it is unbounded the DataFrame. So we take the DataFrame, we perform a Write Stream, and we say for each batch that we collect call this batch processing function, that’s the batch processing function we looked at when we did our code walkthrough.
One more thing to point out is here that we specify checkpoint location. As we mentioned earlier in our infrastructure walkthrough. So let’s go take a look at our string. You see at the moment, of course, we got nothing coming in because we haven’t started our C# program yet.
So let’s go and do that now. So we’ll crank up this guy and you’ll see that pretty soon here we should start [inaudible], here we go. So let me go back and we’ll see a little slight delay here. It takes always a couple of seconds for the status screens to update, but we should shortly see our events pick up here, as we start seeing our events, there we go. So, you can see our events coming in. You see of course our batch ratio growing up, as we know have actual events to process.
We can also go look at our data. So here, for example, we see we have 6,300 some rows per second, so we can see how our processing rate is, et cetera, [inaudible] kind of go take a look here, switching back to our dashboard. And of course, if we want to we can any point in time since we are ultimately updating the Units Status Delta table. We can go and run the standard SQL query to get to the actual up-to-date status and coordinates [inaudible]. So here we see all that in the long coordinates of firetruck. And of course, as we re-run those queries we see that position being updated.
Finally, we can go take a look at our Power BI report and we again see here on position, so we have [inaudible] which is the current position of our firetruck. And we see it moving along the line there. So since this report has been set up to automatically update when [inaudible] changes, it will reflect those updates easily without me having to refresh the report. This is a nice feature of a Power BI [inaudible]. Okay. That was a quick walkthrough. Thank you for your time.
The need for large scale real-time stream processing is becoming more evident every day because organizations really need that ability to respond quickly to an ever changing business climate. And what we’ve seen in this short little presentation is how easy it is to integrate Spark Structure streaming to add a real-time channel to your architecture. It’s very simple extensions on top of Spark SQL. So this is really a call to action, go and implement streaming on your solution and let me know how it goes. Thank you very much.

Bennie Haelen

"Bennie is a principal architect with Insight Digital Innovation. Insight Digital Innovation is a Microsoft and Databricks partner. Insight was honored to receive the Databricks national consulting an...
Read more