Modern Industrial IoT Analytics on Azure - Part 2
Introduction
In part 1 of the series on Modern Industrial Internet of Things (IoT) Analytics on Azure, we walked through the big data use case and the goals for modern IIoT analytics, shared a real-world repeatable architecture in use by organizations to deploy IIoT at scale and explored the benefits of Delta format for each of the data lake capabilities required for modern IIoT analytics.
Read Rise of the Data Lakehouse to explore why lakehouses are the data architecture of the future with the father of the data warehouse, Bill Inmon.
The Deployment
We use Azure’s Raspberry PI IoT Simulator to simulate real-time machine-to-machine sensor readings and send them to Azure IoT Hub.
Data Ingest: Azure IoT Hub to Data Lake
Our deployment has sensor readings for weather (wind speed & direction, temperature, humidity) and wind turbine telematics (angle and RPM) sent to an IoT cloud computing hub. Azure Databricks can natively stream data from IoT Hubs directly into a Delta table on ADLS and display the input vs. processing rates of the data.
# Read directly from IoT Hubs using the EventHubs library for Azure Databricks
iot_stream = (
spark.readStream.format("eventhubs") # Read from IoT Hubs directly
.options(**ehConf) # Use the Event-Hub-enabled connect string
.load() # Load the data
.withColumn('reading', F.from_json(F.col('body').cast('string'), schema)) # Extract the payload from the messages
.select('reading.*', F.to_date('reading.timestamp').alias('date')) # Create a "date" field for partitioning
)
# Split our IoT Hubs stream into separate streams and write them both into their own Delta locations
write_turbine_to_delta = (
iot_stream.filter('temperature is null') # Filter out turbine telemetry from other streams
.select('date','timestamp','deviceId','rpm','angle') # Extract the fields of interest
.writeStream.format('delta') # Write our stream to the Delta format
.partitionBy('date') # Partition our data by Date for performance
.option("checkpointLocation", ROOT_PATH + "/bronze/cp/turbine") # Checkpoint so we can restart streams gracefully
.start(ROOT_PATH + "/bronze/data/turbine_raw") # Stream the data into an ADLS Path
)
Delta allows our IoT data to be queried within seconds of it being captured in IoT Hub.
%sql
-- We can query the data directly from storage immediately as it streams into Delta
SELECT * FROM delta.`/tmp/iiot/bronze/data/turbine_raw` WHERE deviceid = 'WindTurbine-1'
We can now build a downstream pipeline that enriches and aggregates our IIoT applications data for data analytics.
Data Storage and Processing: Azure Databricks and Delta Lake
Delta supports a multi-hop pipeline approach to data engineering, where data quality and aggregation increases as it streams through the pipeline. Our time-series data will flow through the following Bronze, Silver and Gold data levels.
Our pipeline from Bronze to Silver will simply aggregate our turbine sensor data to 1 hour intervals. We will perform a streaming MERGE command to upsert the aggregated records into our Silver Delta tables.
# Create functions to merge turbine and weather data into their target Delta tables
def merge_records(incremental, target_path):
incremental.createOrReplaceTempView("incremental")
# MERGE consists of a target table, a source table (incremental),
# a join key to identify matches (deviceid, time_interval), and operations to perform
# (UPDATE, INSERT, DELETE) when a match occurs or not
incremental._jdf.sparkSession().sql(f"""
MERGE INTO turbine_hourly t
USING incremental i
ON i.date=t.date AND i.deviceId = t.deviceid AND i.time_interval = t.time_interval
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Perform the streaming merge into our data stream
turbine_stream = (
spark.readStream.format('delta').table('turbine_raw') # Read data as a stream from our source Delta table
.groupBy('deviceId','date',F.window('timestamp','1 hour')) # Aggregate readings to hourly intervals
.agg({"rpm":"avg","angle":"avg"})
.writeStream
.foreachBatch(merge_records) # Pass each micro-batch to a function
.outputMode("update") # Merge works with update mod
.start()
)
Our pipeline from Silver to Gold will join the two streams together into a single table for hourly weather and turbine measurements.
# Read streams from Delta Silver tables
turbine_hourly = spark.readStream.format('delta').option("ignoreChanges", True).table("turbine_hourly")
weather_hourly = spark.readStream.format('delta').option("ignoreChanges", True).table("weather_hourly")
# Perform a streaming join to enrich the data
turbine_enriched = turbine_hourly.join(weather_hourly, ['date','time_interval'])
# Perform a streaming merge into our Gold data stream
merge_gold_stream = (
turbine_enriched.writeStream
.foreachBatch(merge_records)
.start()
)
We can query our Gold Delta table immediately.
The notebook also contains a cell that will generate historical hourly power readings and daily maintenance logs that will be used for model training. Running that cell will:
- Backfill historical readings for 1 year in the turbine_enriched table
- Generate historical power readings for each turbine in the power_output table
- Generate historical maintenance logs for each turbine in the turbine_maintenance table
We now have enriched, artificial intelligence (AI)-ready data in a performant, reliable format on Azure Data Lake that can be fed into our data science modeling to optimize asset utilization.
%sql
-- Query all 3 tables together
CREATE OR REPLACE VIEW gold_readings AS
SELECT r.*,
p.power,
m.maintenance as maintenance
FROM turbine_enriched r
JOIN turbine_power p ON (r.date=p.date AND r.time_interval=p.time_interval AND r.deviceid=p.deviceid)
LEFT JOIN turbine_maintenance m ON (r.date=m.date AND r.deviceid=m.deviceid);
SELECT * FROM gold_readings
Our data engineering pipeline is complete! Data is now flowing from IoT Hubs to Bronze (raw) to Silver (aggregated) to Gold (enriched). It is time to perform some analytics on our data.
Summary
To summarize, we have successfully:
- Ingested real-time IIoT data from field devices into Azure
- Performed complex time-series processing on Data Lake directly
They key technology that ties everything together is Delta Lake. Delta on ADLS provides reliable streaming data pipelines and highly performant data science and analytics queries on massive volumes of time-series data. Lastly, it enables organizations to truly adopt a Lakehouse pattern by bringing best of breed Azure tools to a write-once, access-often data store.
In the next post we will explore the use of machine learning to maximize the revenue of a wind turbine while minimizing the opportunity cost of downtime.
What’s Next?
Try out the notebook hosted here, learn more about Azure Databricks with this 3-part training series and see how to create modern data architectures on Azure by attending this webinar.