The Bosch Center for Artificial Intelligence provides AI services to Bosch’s business units and manufacturing plants. We strive to generate value for our customers by deploying machine learning in their products, services, and processes across different domains such Manufacturing, Engineering, Supply Chain Management, as well as Intelligent Services. In our presentation we will discuss how we utilize Spark:
We will also share some of the learning points we have gathered as we have scaled up our use cases and explain the important role Spark has played in our internal operations and data collection, storage, and processing strategy.
– Hi there. Welcome to this talk. And we will be presenting to you about how we have leveraged Spark to develop products and services at Bosch.
In this presentation we will cover two AI applications that we have built at Bosch using Spark API. I am Prasanth, a senior data mining engineer at Bosch, and I will be presenting the first-use case called the Manufacturing Analytics Solution. The second use case will be presented by my colleague Goktug, who is a senior data scientist at Bosch, where he will take you through the financial forecasting use case. We are both working for the Bosch Center for AI here at Sunnyvale in California.
Bosch is a world-class manufacturing company with hundreds of manufacturing sites, thousands of assembly lines, and production of billions of parts every year. Therefore we have some of the biggest data sets in the world, comprising of process, machine, software data, just to name a few. Bosch operates in four main business domains. Mobility, industrial technology, energy and building technology, and consumer goods. We have around half a million employees spread across all regions of the world. AI is a key enabler for us in the transformation towards an AI-driven IoT company. By the middle of this decade, we want all our products to be equipped with AI, or AI to have played some part in their development. The Bosch Center for AI has been established to help Bosch achieve this goal.
Since Bosch has many manufacturing plants across the globe, there is a need to use this production data to improve the process efficiencies and increase the quality of the products. So we develop manufacturing analytics solution that aims to solve these problems.
To facilitate this, we built data pipelines and automated data preparation. We have created a centralized storage for all assembly lines. We also set up compute resources for in-house data analysts and engineers to directly analyze the data using self-serve analytics and standardized dashboards. We have also developed advanced analytics tools like root cause analysis, that will help us identify root causes for failure in a plant. Below is the architecture of a pipeline. And on the left we can see the data coming in from the assembly lines into our Kafka messaging system. From there, the data is pushed into Hadoop File System, where we have several Spark jobs running to perform data transformation, cleaning and augmentation. Many of these jobs are implemented using Scala-based Spark API. We have also developed Spark jobs that perform advanced analytics like root cause analysis, which have been implemented in Python. The output of these jobs is pushed into Tableau, which is a front-end application. The data is published to Tableau as static extracts as well as live connection between Tableau and Hadoop using Apache Impala.
In the rest of my presentation, I will focus on the root cause analysis where we are trying to answer the question, “Why are parts failing quality checks in an assembly line?” Let me illustrate with an example to explain this. What you see here, is an example of a typical assembly line in a manufacturing plant. The raw material comes in from the left to the Process One, and then goes through the Process Two, where there could be multiple machines working in parallel. And then the parts move to Process Three and Four, and finally reach Process Five, where the end of the line tests are performed. All the parts that pass these final tests are then shipped to the consumers as sensors, auto parts or home appliances. But there are also certain parts that fail the quality checks. And we want to find out the root cause for these failures. We need to do that because we have lost both time and raw material producing these parts, and that costs a lot. So the target of interest here, is the fail test from Process Five, and the potential root causes would be the measured processes parameters, the tools used, machine configurations from Process One through Process Four.
We have implemented this root cause analysis as four different modules. The first one being the part graph generation where all the process data for a unique part is stored as a unique part graph. These part graphs are then passed on to the feature extraction module, where we extract self-defined features that could help us identify the failures or could identify the potential root causes for the failures. Both these modules have been implemented using Spark API and the NetworkX API in Python. The next module is the feature matrix generation where we prepare the data so that the different machine learning models and statistical algorithms could be applied. We create a mapping between the target variables that is the dependent variables and the independent variables. In the final module, we apply these machine learning models on the independent partitions of well-defined data sets. For this we have used scikit-learn, SciPy and stat models API on top of a Spark API to accomplish this.
In these few slides you can have a peek into our code and implementation. Here, you can see how we have implemented the feature extraction module. As briefed earlier, we generate a part graph to encapsulate all the information of a single part. On the left, you can see the Spark data frame with two columns. The first one is the unique identifier for a part, and the second column is the corresponding part graph. Examining the part graph, you can see that it is in acyclic graph where each node represents the location that the part visited, and stores all the information that is collected in that location. Each part graph is then passed to the feature extractor, which extract the features out of it, and the final output is another Spark data frame where the first column would be the part identifier and the second column is a list of features that have been extracted.
Here is a small code snippet from the feature extraction module. In line four, you can see the definition of a typical feature extractor that takes a part graph as input, processes it, and returns a list of features that could be either categorical or continuous. In line nine, we have wrapped the feature extractor with a UDF depending on the return type of the extractor. And in line 12, we apply this UDF to the entire part graph data frame to extract the features for all parts. So in this code, there is a clear separation of expertise where the engineers who implement the feature extractors do not have to know anything about the underlying Spark API and are free to define the logic according to the customer needs. And the application of these extractors is exclusively hand-built by the Spark developers. And so there is a smooth handshake and separation. Extracting the root causes for innumerable tests done across plants within Bosch involves huge computational complexity. As an example, we have tens of thousands of assembly lines, and on average, each line produces around two million parts per month. Which amounts to around 30 billion data points per line. So it is incumbent upon us that we are able to scale these modules.
So talking about scaling our code, I would like to bring to your notice one of the many challenges that we had to face, which is the feature matrix generation. As explained earlier, we want to create a mapping between the target variables, that are the dependent variables and independent variables. You can see two matrices to the left, where each represents a particular group of features. The goal is to create a certain Cartesian product, so that we can create a data fame that is shown in the right. Joins are typically very hard in Spark, especially when we are talking about millions of rows per data frame, where each row contains lists of thousands of features. So we implemented the required logic using UDFs and smart partitioning, of course. And it was taking nearly seven hours for a typical assembly line, and that was concerning because we need to deliver results very frequently. And also for much bigger lines. Then we made an optimization to our code where we replaced all the loops and conditions with Python functional constructs like map, filter and reduce. This increased the speed threefold, and we were able to execute the code within two hours. So using functional constructs with Spark API is a very good programming approach, at least from our experience. So due to the constraint of time, I could not delve much deeper. But please feel free to ask questions or reach out to me as well. I will now hand over the presentation to Goktug. Thank you. – The next use case we will be covering today is the financial forecasting use case that we developed at Bosch Center for Artificial Intelligence.
First, let me set up the environment that this solution was developed in. We were a team that’s comprised of controllers, who are the end-users of the solution, and on the development side we have software engineers, data scientists and data engineers. We wanted to achieve automatically-generated sales forecast at large scale, so that we can improve the financial decision making at Bosch. And take it to a new level that combines the talent of AI and human intelligence, and reaches better decisions for the future of the company.
In a short sense, what we want to achieve is the forecast of several key performance indicators every month, and currently we are forecasting about 300,000 time series. At full-scale, we expect this number to be about three to four million time series. And to forecast these, we are using about 15 different statistical and machine learning models together with several data transformations.
Bosch is comprised of about 15 different companies, internally. And each company has a specific business structure. Therefore, the solution that we developed has to take into consideration these structures and has to be able to break down a given KPI in terms of customers, products, regions, business divisions, or any other custom need. And, given this constraint, we chose our pilot application to be revenue forecasting for Bosch. The forecast for revenue needs to be generated every month immediately after month-closing calculations. There is a very intuitive reason why we need this to happen very quickly. The point of these forecasts are such that the company can see the development and take measures to react to these developments. And if we take, let’s say five or 10 days to generate these forecasts, we are removing the opportunity of action and giving less time to the organization to react to these changes. Therefore, our target is actually to create these forecasts within hours of data availability. And let’s see what that would be in terms of data complexity. So if we assume we have one million time series, and we apply five models per time series, at five seconds per model we are looking at about 15 million seconds of computation that needs to happen every month. If we want to achieve this computation within a couple hours, we need to use thousands of cores and this is why we went towards using Spark.
Here is the technical architecture. Our data is stored in a SQL database and we ingest this data using our programming language and we understand the business structure of the data. In forecasting domain, this is called hierarchical time series. Now, once we create our data structure for hierarchical time series, we use an in-house built Python model to automatically select which statistical or machine learning models should be applied to each time series. Now we take these time series and the models and distribute them using Spark and R. And at the end, collect all the results back into our driver and then consolidate the hierarchical time series and push the results back to the SQL database. The model universe comprises of well-known models, traditional models such as ARIMA, or state space models such as exponential trend smoothing or machine learning models such as neural networks.
If you think about the task that I just explained in the previous slide, the task is embarrassingly parallelizable. At any point in this pipeline, every single time series doesn’t need any information from all the other time series. So we can take a single one, and the models that need to be applied to this time series, and send them over to multiple cores, get the results back, and then use them all together for the consolidation step. And this makes Spark a really good candidate for us. If you have a smaller scale to handle this, you can, of course, distribute this computation on a single machine using packages such as Parallel in R.
But at the scale that I have described to you, at hundreds or millions of thousands of, millions of time series, you actual need to do this in several servers and you need to use a technology like Spark. The next question I would like to answer is, “Why did we use R?” The latest and greatest forecast algorithms are actually available in R. Therefore, we don’t want to reinvent the wheel, but take what’s available out there and quickly put it into production for our company’s use.
Given Spark and R, a user has two APIs to choose from
to utilize these two technologies together. One is Sparklyr and the other is SparkR. Sparklyr provides a API that accepts Spark data frames and also returns Spark data frames. SparkR on the other hand, has the same functionality with dapply and gapply, but also can distribute UDFs over lists using the lapply functionality.
The lapply functionality, therefore, brings a new way of distribution, and enables use cases that are not like what Prasanth presented earlier in the presentation, where we want to distribute the computational repartitions of a data frame, but we can be little more loose in our definition of data structures and distribute it over lists. Given this advantage of flexibility, we have made the decision to use user-defined functions similar to the root cause analysis module that Spark, Prasanth (laughs) have shared with us. And we are going to distribute UDFs over list using lappy function.
The second reason, as I explained, we have chose lists is the flexibility of changing the data as we move forward with our application and as our requirements develop. The lists are, we can think of lists as folders. Instead of a data frame, when I distribute over a list, I can just put one more item into my folder and distribute. And I will not change my architecture or my schema, I only have to make changes to my UDF. So my data science team can really make data science changes without really interfering with the data engineering or the Spark development team and modify the whole architecture of the solution. That is why we have chosen the lists path. We also use spark.addFile to make a number of files that needs to be readily available on the note. And then, finally, I want to speak about the bug that was a very crucial issue for us in the beginning of our development. In 2.3.0, you couldn’t actually distribute UDFs over a list larger than roughly 46,000 elements. This was due to an integer overflow error, and has been since fixed, and I have listed the Jira issue here for the ones who are interested. But, until we were able to get the solution into our software, we had to create a workaround. And the way we worked is, if our lists were larger than 40,000 elements, we basically broke it into chunks of 40,000 at a time and distributed 40,000 tasks with Spark in a given moment. And you see the example code how we did this chunking logic on the top right. And once we moved to Spark 2.4.0, we were able to simplify the code to what you see in the bottom right.
To conclude the remarks, here is a graph
that shows the speed-up that we have achieved. In this experiment, we used about 2,000 time series and we have scaled the number of cores from eight to 64. And we have seen that we can get a speed-up of roughly 7.5x
when we increase the number of cores about 8x. And that, again, reminds us the well-known idea that there is no free lunch, but this is actually a really good speed-up factor that currently enables us to create hundreds of thousands of forecasts within hours at Bosch. We have had an incredible amount of collaboration and work put in by many of our colleagues during the development of these two applications, so we want also send our thanks to them and recognize their work in here.
So we really appreciate that you attended our session. Please send us the questions and we want to answer as much as we can. And please also rate and review our session.
Prasanth Lade is a Sr. Data Mining Engineer at the Bosch Center for Artificial Intelligence in Sunnyvale, California. He holds a PhD in Computer Science, specializing in machine learning, from Arizona State University, where he focused his research on applying machine learning in technologies aimed towards individuals with visual impairment. At BCAI, Prasanth works on leveraging Big Data from manufacturing plants and applying AI at scale across Bosch's products and services. His areas of interest include manufacturing analytics, natural language processing, and time series analysis.
Goktug Cinar is a Senior Data Mining Engineer at Bosch Center for Artificial Intelligence in Sunnyvale, California. He completed his PhD in Electrical and Computer Engineering at the Computational NeuroEngineering Laboratory at the University of Florida in 2015. At BCAI, Dr. Cinar leads the development of a financial forecasting tool for Bosch Business Units. Goktug's main research interests involve machine learning and information theoretic learning, and their application in musical signals.