Building a Streaming Data Pipeline for Trains Delays Processing

Download Slides

A major cause of dissatisfaction among passengers is the irregularity of train schedules.

SNCF (French National Railway Company) has distributed a network of beacons over its 32,000 km of train tracks, triggering a flow of events at each train passage. In this talk, we will present how we built a real-time data processing on these data, to monitor traffic and map the propagation of train delays.

During the presentation we will demonstrate how to build an end to end solution, from ingestion to exposure.

The presentation will take place as follows:
-Data Pipeline: how we set up a data transformation pipeline using Spark 3 and Delta with Azure Databricks and how Delta Lake makes dynamically updated data reliable
-Exposure: how we expose our output in the best way depending on the consumer Power BI or REST API.
-Production-ready: finally, we will demonstrate how we have structured our development process to make it reliable and aligned with SNCF best practices.

Speakers: Alexandre Bergere and Kaoula Ghribi

Transcript

– Hi everyone. We are glad to be here today to talk about one of our projects. How we build a streaming data pipeline for Train Delay processing. My name is Alexandre Bergere I’m a data architect at ITNOVEM, which is one of the IT branch of SNCF working on the digital transformation of the group. I’m accompanied today by Kaoula Ghribi, data engineer working with me on several projects. So, we were looking for a long time, a streaming new case in our team. And today we are glad to share with you how we provide quality data using Spark Streaming and Delta. To all entities of our companies. And how we build our pipeline in a few weeks with a small team. So SNCF is a French Training Company, one of the biggest in the world. There is more than 270,000 employees making possible and 14 million people are traveling on the road, 15,000 trains every day. Under each one of them, we are working at Data & AI factory which its aim goal is to accelerate innovation at SNCF. For that, we’re centralizing the project of the different entity and helping them to extract value from the data. So we are support teams from modeling and conception to the industrialization of their solution. One of our mission is also to provide qualified data for the different entities of the group. And Brehat is one of them. So today we’re going to talk about one of our data source called Brehat. Beacons are installed all along the ride and permit to have the full cover of train passage all over our rail range network Brehat contains two kinds of information. First, observation. Event are triggered at each train passage concerning real and theoretical time of passage. It can be generated from automatic tracking system or size bias traffic officer if there some issues The second one, is incident. Incidents occur when an abnormal situation disrupt rail operation. It could be wildfire accident, strike. So Brehat information are received and sent to Azure. Then our team clean and enrichide the data in order to make available a unified and qualified data to all the entity of the group. So during a project, there are two main goals. The first one was to build a real-time data processing to monitor traffic and map the propagation of train delay. The second one was to expose the output, our data in the best way depending on the consumer. Power BI, Rest API or directly in Delta files through the deslite. So just to give you some example of different projects who are using Brehat Data. The first one is Denfert is a model driving behavior between two train station in order to simulate traffic and schedule work on the rail. The second one provision water which use all the data Brehart in order to predict train delays. So let’s now talk about our architecture on how we ingest the data on all the data processing we did. So in our pattern architecture we are returning the data through MTV series and eventhub, which is a source of our sparks training through databricks. Then we are making our different ingestion process, putting the data directly to the Delta Lake through Delta format. One of the pipeline is using directly the stream process through probably AI on user can visualize them through the dashboard. All those user non-project can use the data we give to them through an API or directly through the Delta Lake, using the data format. We decided to use data format to store data for two main reasons. The first one is the Acid Transaction and full DML Support. Because the data received needs to be updated and deleted. So Delta provide more control to manage our big datasets. The second reason is the Unified Batch and Streaming Source and Sink. And the possibility to simplify all along the architecture. Depending on the product they can use the results in batch or streaming month. And now I’m going to let Kaoula talk our other data process.

– Hi everybody. So let’s talk about our data pipeline. The input of the pipeline is the observation data which is flow event receiver during the train parts and bacon, which is the type of sounds and fix it in the rails. This data as Alexandre explains come mainly from bacon source, from an automated system but it can be modified by operating agent and it can be also deleted or self-corrected by the sensors. So the first stream we set up is used to make the delay information reliable. For each train passage we want to keep only the last and complete information. In the diagram, this is the blue part of the diagram. It’s composed of three processes, P1, P2 and P3. P1 is a data quality management process. We apply the rules of data reject the rules that do not bring value like close with an empty circulation that times. P2 is enrichment process to locate the event, so to add GPS coordinate because events are initially land market in the SNCF network. And then P3 is the process of keeping only the last complete version of the event. So when receiving a new rule in micro batch, we apply an aggregation to group event with the same ID produced from each group a single consolidated event, and then we merge the result of the micro batch processing with those three output, which use data-like table. We needed the storage full DML Support. This is why we use a Delta Lake as an output format for this first stream. The second stream is the orange part of the diagram. It’s consuming data from the output of the first stream. So Data here is a Sink and the Source. The second stream is used to Gribble train traffic in terms of circulation which is departure terminal couple at a given date. And there are many observation during the circulation and many version of observation. So the second stream in terms of spark processing is stream to stream join between even target departure and even target terminal. Plus join with difference to identify the type of train its original train, its DGV, and to identify the railway station. So as you can see, we split our processing into two streams and we added the silver table. So it comes from two limitation that we visit with Spark Streaming. The first is that sequencing aggregation is not supported instructors streaming and our code was designed from the beginning to respect this constraint. That’s why we have two separate processing blocks separated by this The second limitation is that some writing mode is not supported after stream to stream join. So writing mode and update mode after a stream to stream join is not supported to the spark, spark stream that’s why we use an intermediate table. The silver one we wrote in this intermediate table in a pad mode load loaded and then a save rows in a panic mode. So let’s do quick demonstration here. In the development phase we use notebooks to implement and run our data processing. We use the one at book per stream. Each stream has a dedicated cluster. So this is the first stream observation, with observation cluster. The second one is the circulation server with dedicated cluster as well. And same for the third one. Those cluster have different configurations. The stream that I’m showing around 24 hour consumed the eventhub and writing in two gold data table. This stakes was for summation, which required views of state store are bound with watermarks and temporal join constraints. So waterworks values are defined using representative data sample. As you can see here, we have used a watermark of 15 seconds along late data to be integrated in the aggregation reserve. And we added a max buffer time of six hours in the stream to stream join. Notbook mode, helped us to quickly produce result. But once the code is stable and the compute resources are tuned we have switched it to the SSA framework which is a project template that allow pipelining data transformation. So this is our framework. The whole project as you can see here, the main function is a pipelining of a set of transformation that are described in the transformer package here. This framework allows to focus on functional needs by describing the transformation without bothering about pipelining, monitoring, logging, which are integrated into the framework. The framework as well makes it possible to push certain best practices, such as static type safety with the use of datasets. It integrates as well as wrapper over different format of readers and writers. So one of the goal of this use case is to share visualization showing the propagation of Train Delays. The visualization tool that we are using is Power BI. And our goal table is in data format. We needed to connect Power BI to data through data rate. so how to do it. We have two existence solution which are the Spark connector and the Simba connectors. The spark connector is natively integrated into power BI, but it suffers from high latency. The Simba connector is not native and suffer from poor integration with power BI. Databricks have introduced a new connector last September, which is the Azure Databricks connector. And this is the solution that we are using this project. It’s native, and we did not notice a high latency which can be easily seen by an end-user. So what we want to give to an end user are board of this type. Showing the state of the trains one passing over a bright sensors. So the purple color indicates that the train is late, the blue color for train on time and the green color for train circulating ahead of the theoretical passage time. This board allow to have a snapshot of the network and that is updated in real time. Let’s move to the next part. What we learned from this mini use case. First point is that it is more efficient to separate the computer source by stream source. One stream equal One cluster. This way, we make the processing less error prone and we avoid memory loss. The second point that is recommended to dedicate a small cluster to the BI connection, we will avoid undesired cause of latency. Databricks will soon introduce cluster optimize for this use case, 30 or clusters. The next important point is to archive constraint, to stream, to stream join. Factor of challenge of generating join results between two stream data is that at all times the view of the data set is incomplete for both sides. Making match match is is more difficult that any role received it can match with any future role yet to be received. Therefore we need to keep in normally the two stream inputs so that we can match all future and future inputs with the past inputs. But if we don’t put a time limit, we risk running out of memory. This is why adding the information, how long that need to be buffered is important. For example, in our case, we know that our situation duration do not exceed 66 hours. So we set max time between the reception of the event target departure and event terminal to six hours. The last point is watermarks for aggregation and some type of operational like drop duplicate. We should limit the time to keep the state in the state store with watermarks and watermarks value depends in data.

– So what’s next? In a global manner first we are working with Data race for a long time now. And we are going to continue using this report to optimize our users for workshop and support. We’re going to continue making patterns and architecture usable for the whole company, in order to mitigate risk and increase productivity. We’re probably going to work on streaming, Spark Streaming process for the next couple workshop. So the next step of our project is to improve our AP expert. We are working of making available all our different data in a synchronized way. In global manner, we are going to increase our project development and continue to make value about all the data we have. Thank you for your attention, have wonderful data and AI summit.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About Alexandre Bergere

SNCF

Alexandre is a Data Analyst & Solution Architect indepedent—MCSE, Cosmos DB & Delta lover, working for different clients, including SNCF. He developed his skills through various clients' projects, teaching at the University and personal proof of concepts. He's also the Co-Founder of DataRedkite, a product which can quickly give to its user a good management of data in Microsoft Azure DataLake.

About Kaoula Ghribi

SNCF

Kaoula is a Data/Cloud Engineer, working in the French Railways Company (SNCF) since 2017. Holding an engineering in computer science, certified Spark developer and GCP Associate Cloud Engineer. After engineering school, she joined CEA ( French Alternative and Atomic Energies Center) as R&D Engineer and spent a few years working in applying distribut-ed constraint optimizations in electricity households consumption prediction and optimizing. She decided in 2017 to specialized in big data processing and to join SNCF, Here she works on leveraging big data technologies to improve passenger security and trains punctuality.