This is the story of a great software war. Migrating Big Data legacy systems always involve great pain and sleepless nights. Migrating Big Data systems with Multiple pipelines and machine learning models only adds to the existing complexity. What about migrating legacy systems that protect Microsoft Azure Cloud Backbone from Network Cyber Attacks? That adds pressure and immense responsibility. In this session, we will share our migration story: Migrating a machine learning-based product with thousands of paying customers that process Petabytes of network events a day. We will talk about our migration strategy, how we broke down the system into migrationable parts, tested every piece of every pipeline, validated results, and overcome challenges. Lastly, we share why we picked Azure Databricks as our new modern environment for both Data Engineers and Data Scientists workloads.
Speakers: Roy Levin and Tomer Koren
– So, hi everyone, welcome to our talk about immigration in Spark, I’m Tomer Koren. I’m a senior software engineer at Microsoft together with me is Roy Levine which is a senior researcher in Microsoft. Both of us are working on a product called Azure security center which is mainly focused on threat detection in the cloud. So today we have several goals that we’re going to speak about. The first one is tell you about the migration that we did recently for our production system. We migrated our big data platform from Legacy Technology to Spark. The second goal will be to present a methodological approach, how to accomplish this migration task and then last goal, Is to explain about the measuring quality, how we measure the quality, how we optimize the performance and what is the definition of them? So a little bit background to get a little bit context about the Azure Security Center and threat detection in the cloud. So as you may know, a lot of organization had their servers in their on- prem and the recently they start migrating all these load into the cloud. Now as you probably know, in the internet there are bad people, bad attackers that constantly attacking their resources and trying to do brute force attacks, trying to steal data and for that reason we wanted our customers to be safe. We have the product called Azure Security Center which protects the little bouts in the cloud and bouts in that on-prem system. Now, specifically the product, one of the key ideas to protect customer is to notify them upon security threats. So one of the component that we are responsible of is to provide some detection analytics, that whenever there is some attacks, we will notify the customer in the alerts. We can notify customer alerts about incoming brute force attacks, sequel brute force and a lot of other threats. Now, the engine, the key engine for the all those analytics is a big data platform. And this in this slide, you’ll see actually the detection pipeline, back end architecture, from the left, you’ll see that we’re processing several data sources, so data sources of various sizes, the biggest one of them is around, contain around four terabytes of events per hour, which is a lot. We then process all this information and we create the second level of data, we call it process data and then we have several detection pipelines. So some of our pipelines use supervised models. And the component usually is the feature extraction model, we extract feature and then we are classifying, we do some classification and then produce alerts. And other pipelines maybe pipeline that run anomaly detection. In those pipeline the structure is more complicated. We need to calculate state every hour and then start the state and then eventually create a long time series based on previous states and then run the anomaly detection engine and then send alerts. Some of the components in this diagram are reusable. For example, alert publisher the compliment that is used both by the anomaly detection and the classification based pipeline. Okay, so now that we know about the general pipeline, let’s talk about the challenges that we are facing during the immigration between the Legacy platform to Spark. So, you may expect a migration process to be a straightforward task but in reality things are starting to become a little bit complicated. So, what are the challenges that we are facing. So, when we migrate different technologies in our case cosmos, which has some flavor of Hive to Spark, first of all you have to maintain the semantics. You need on a given input, we will have the same output on both of the systems. In addition, we need to follow some real time constraint. Our system run on the hourly, some of the pipeline running an hourly basis, some of them run more time and we need every pipeline will finish in time. We cannot let pipeline run more than 60 minutes if it’s an hourly based pipeline. And then third is challenge is full costs. We cannot just spend as much as we want. So let’s focus on the challenges. So as for semantics, the first thing that we’re going when you start to migrate the code is to rewrite the UDFs. In our case, most of our UDF was written in C sharp and we had to translate them to Python. In addition, we have to rewrite all the transformation. In our scenario, legacy was written in USQL, and we had to convert them to Pyspark. This of course may lead to a lot of bugs as we discovered. Now, let’s for an example, we have two example of a transformation, one is written in USQL and one is written in Pyspark. You can see that the two language are totally different and some activity may require more code than the other. Now, another thing that I want to mention here for example, is the order by, as you can see both of the codes use a window function and a window the function as an order by expression. Because those are different technologies, in some scenarios, there are several records with the same time stamp, the selected order in this case, we use the row number. Each row may be get the different role number based on the platform. So when we use USQL we’ll see a different number and also in Pyspark. This could be a lead to a problem because in some of our features, we use to select several like top five or top 10 and then the roles that are being selected are different. This affect our semantics. Another point that I want to mention is different datatypes. In Spark we are using of course Python datatypes, in our previous system, we use a C-sharp. In this table, you see two basic operation that we did between two numbers, number one and number two, in some cases the number was null, in other cases zero and we tried all the combination. You can see that while in Spark you see a lot of nulls in scenario when the number is null or division by zero, in cosmos we get different types especially you can see the infinity and minus infinity. This change is very important because later on the pipeline, usually in the case of infinity, we replace this infinity symbol with a number and also for them minus infinity, When we replace it with low number. when we get null we cannot decide what we’re going to do and then there is chance for difference. And again, because we are changing a platform, the machine learning libraries are also going to change. If in the cosmos the other parts when we use something that’s called QLC, in Pyspark we use something called SK learn. So there is a difference in algorithm also. Now another thing that we add in our scenario was a change in the schema. In the legacy system, for example, we had a scenario when we described the connect network IP connection between source and destination. In this case, we also tried to map whenever we knew to allocate the host name for the specific IP we had a field called host name and then another field that is called if they mentioned, if the host name is belongs to the source IP, or if it’s belongs to the desk IP, because we only had one field for the host name in case both sides, for example, in this example, both the source and destination, we had the mapping for the house name we had to duplicate the role and one row put the true value and the second we had to put a false. Now in Spark we decided to allocate two fields for the host name. So you see that from the scenario I just mentioned, we have two mapping in one row. So in overall you see that instead of three rows in the legacy system, we have two rows in Spark. And other things that is important to mention, we also had some fields called host ID. Host ID and host name was related to each other and we decided to skip it in Spark but later when we did some investigation we realized that in 1% of the time, there is a mismatch between the host ID and the host name which also linked to semantics problem. And then again there are the real time constraint because we have several pipelines that are run in an hourly basis and some of them run for a lot of data, we need to make sure that once we implement them, the overall run time will be less than 60 minutes in order to avoid accumulated latency. And you may think of that some of the challenge can be mitigated by adding more and more and more server. It maybe true, but when you add more and more and more server the cost increase dramatically and then the manager is not very happy. So you need to stay on some level of predefined threshold specifically on our system because we migrated the existing system where the predefined threshold that we couldn’t just expand more often. Now Roy is going to introduce the formulation.
– Thank you Tomer. So I’d like to formalize the problem a little bit and formalize our solution. So if we go back to this pipeline, this detection pipeline that Tomer presented earlier, we can see that we have many different detections. And one way to go about it is to look at each detection as one big monolith and then try to migrate that. But that’s not really going to work because the detections have a lot of code like we’re talking thousands of lines of code, complex code. There’s no real way to just rewrite this entire detection and then compare the results and hope that they will be the same. So instead of doing that, what we did, is that we look at the each detection and see the components it’s built out of. So these components contains like sub parts that the detection requires like feature engineering, like classification, state management, stuff like that. We can take each one of these components and then migrate them separately. So if we take a customer’s component, we want to create an equivalent Spark component. And then what we need to do, is we want to thoroughly test these components. So to make sure that we don’t have any problems what we do is that we feed in the cosmos data inputs into the customers component to generate the outputs. We can then take those same inputs and feed them into the Spark feature engineering, in this case generate the outputs and compare the outputs. Unfortunately, it turns out this is not that simple because as Tomer mentioned earlier, the schemas don’t match. So there’s gonna be a schema a mismatch there’s no way to just take those inputs and feed them into Spark. So what we need to do is we need a translator that’s going to translate the inputs into the Spark component and then produce the output and then we can compare the outputs using some kinda comparator function that we also need to write correctly. This way we can validate each of the components to make sure that they pass our tests and they actually produce the same semantics. So how do we manage all of this together? So we have many, many components each from different detection. Some components are re-used and what about the connections between these components? So they’re connected in some way, you can’t just take each component separately and hope everything is going to work because you need to connect everything together and then test everything together when it’s connected. So that part requires validation and also it requires us to specify how to connect the different components. So to manage all that we introduce a framework that we used for this framework which is called CyFlow. So I’ll explain about this framework. So we begin by introducing multitransformer. A multitransformer basically is going to represent components, a single component. This component is going to receive as input the multidata items. The multidata item is basically just a set of data items. Each data item can be either a data frame or a model that was pre-trained and as being fed into their multi-transformer. Then in turn this multi-transformer is going to generate a new multi data item. And this multi data item can then be tested accordingly to make sure it’s the same as what we had in the customer scoreboard. Now we can connect the different voltage transformers with one another to create basically a DAG, a Directed Acyclic Graph of multi transformers. This could represent an entire detection. The detection itself can also receive the multidata items and produce new multidata items. So, in fact, when we use composition, the detection itself is also another form of multi-transformer. And the idea of a DAG or a Directed Acyclic Graph is not new, Spark uses it as well but Spark uses a DAG on very primitive types like operations, such as select, join, filters and so on, here we’re using the concept of a DAG of higher level objects or basically objects that have a sense in a higher level of abstraction. Things like feature engineering is connected to model training or model prediction. And that way, when we look at this DAG, we can make sense of it and understand what it does in the logical sense. So our multi-transformers can also be stateful because some of our components actually are based on historical values, like the anomaly detection that Tomer mentioned, it needs to read data from a previous slice and then compute a new slice which it will read in the next iteration in that way its safe. Okay, so I presented CyFlow, and let’s look at what would happen if we just wanted to do this using many many notebooks, just the simple note book approach and Databricks create many notebooks, maybe one notebook for detection and then run that. So first in terms of deployment, how would you deploy that? You would need to deploy these notebooks in some way into the cluster, maybe one cluster or multiple clusters, with CyFlow all you need to do is just take the DAG that you created and deploy that into the cluster. It’s easier to do. You can unit test because CyFlow, you’re developing everything inside an ID. You can do unit testing in a rather simple way just the way unit testing is done in natural programming. You create stand alone Spark and run your new tests. There is shared utility code which is easier to manage using your repository versus when you need to upload a wheel file and then use it in notebooks. I think the most important thing that CyFlow provide is the structure, the way that the different components are connected with one another. This in CyFlow is very simple you can even see it using the UI that spark provides whereas with notebooks, you need some sort of framework to say how these different notebooks connect with one another or how the flow in one large notebook actually works. It’s difficult to understand code containing thousands of lines. Finally, in terms of typing, so you don’t really have any schema checks when you connect different notebooks and some kind of a custom way. Whereas in CyFlow, every edge is being checked by the CyFlow engine to make sure the schema is matching properly. So, it turns out that when we want to validate components, It’s not as simple as you could originally think, So you have this output of Spark and output of cosmos and you want to compare them. So you wanna write this comparator which is going to compare them. But then there are a few challenges. So recall, the few challenges that Tomer presented, So for example, some of the ML models are legacy models which we don’t have available for us in Pyspark. The C-sharp code, we can’t run that inside a Pyspark. So the models are gonna generate slightly different results. It doesn’t mean that there is an error, It just means that the results are not entirely the same. Furthermore, some of the semantics are not completely deterministic. Like for example, selecting top K where some values could be equal as mentioned. Some schema translations could result in results not being always the same in some randomly generated UUID’s also cause a problem when you just want to syntactically compare results. So how do we deal with all this and how much should we really spend in this comparator logic to make sure that the results are really, really the parity’s full and accurate. So we try to achieve high parity but if the parity is not perfect, we could go to the final output results and see. So we have the alerts generated by the legacy component, by cosmos and the alerts generated by spark. We can look at the resources that we are generating the alerts on and compare them to see if they’re going to be the same resources then that it means that despite our results not being syntactically the same in terms of the alerts that we produced, we’re good, we’re fine, we can mark that as done and move forward. When the results are not perfect, we can use these measures like Jaccard Similarity, precision and recall and then decide based on these values. So for example, for some detections that are known to be a little bit more noisy, we can focus more on precision than on recall. So for example, we could set a higher threshold, to make sure we have high precision, maybe at the expense of some recall. If the values are low or just too low no matter what threshold is, then we need to maybe continue to hunt bugs that may still exist in the company.
– Thanks Ray. So now we talked about the metrics, let’s talk about tuning performance. So we checked then we verified that our two data screens are equal and the correctness threshold was achieved but what about the running time? So in this case, in these steps, what we’re going to do is just measure the running time on an actual data, this is the next step. So we just insert the real data, then you bring new data into the pipeline and then start to measure the running time. Now at the beginning, there were some pipelines the first time we run them and suddenly we saw that it takes too much time and some of the times I had to go to sleep and then woke up in the morning just to realize that the pipeline over time, was taking 14 hours which is way above our expected threshold. So in this case, what I had to do is start to debugging the problem. Now, since it’s very hard by looking at the actual physical graph to understand where exactly the component that calls for this high latency and there is not a tool that I known for providing profiling for blackboard we had to debug each component separately. So for example, let’s assume that we want to start investigating the feature engineering art. So what we’re going to do here, we’re trying to find the culprit. So what we’ll start doing is first divide the feature engineering part into sub transformation. We will select several transformation, group them together into logical group. And then what we’re going to do, it’s output the result materialize the result into this. Now let’s assume that the first check of the first part was taken us 20 minutes which is sound acceptable for us based on our expectation. And then we continue on the second part with the second group of transformation. In this case, we suddenly started, we have 13 hours. So in this case, we made it to find the problematic area. Now that we find a problem, we need to start solving it and start working on some optimization of this component in order to reduce the running time. So what’s worked for us best for tuning parameter was several things. First of all, for example, in the anomaly detection, we had the pipeline that aggregate a lot of hourly slices and then read them all in the resolution of one day or one month. Let’s assume that we have a lot of hourly slices and we read them 30 days of hourly slices. There will be a lot, a lot of files because each hour is a lot of partition and the number of total partition numbers files is very big. So what we decided to do, we created a daily process, that aggregate the hourly slices into one daily stream. So we have in daily aggregator and then we produce once a day, we produced the daily stream and this lead us to improvement in some of our analytics. And other optimization that work best for us is to modify the default partitions of the cluster. Let’s assume that our cluster as 300 course in total. So we decided to define the partition level based on the number of course multiply by three. So in this case, for the cluster I mentioned, it will be around 1000 partition. Those partitions are good for a shuffling partition and also for the general partition. Another thing that you are going to use, is to use a broadcasting when the UDF and the large signature and was used frequently in some of the cases there we had several UDF’s that are run once the time after another. And once we broadcast the UDF, we saw a significant improvement. Now, another thing that was very good for us was cache, some of the data frame that are being used and more than one time, it’s very wise to use that to cache them. But to be aware in the beginning, we start to cache a lot of data frame and we didn’t free the memory and one time and we got a lot of error for memory failure and spill error and it was very hard to debug them. So eventually what we did, we limit the number of cache and in addition, we used unperceived, we removed the data frame immediately after we use them and we don’t need them anymore just to free the memory and avoid those memory failure. So this was the tuning performance and now let’s summarize our talk. So in this talk, we present the general challenges that usually we facing when we come to migrate large scale and big data pipeline from Legacy Technology into Spark. Let’s still remember the challenges, the challenges were in preserving the semantics, real time constraints and core costs. In addition, we introduced CyFlow, which is a framework we use to increase the usability and avoid connectivity bugs. This allow us to separate the problem into separate components and achieve a good progress in the migration. In addition, we talked about different validation strategies how to compare the data. And at the end I talked about the overall time reduction by optimization. And once you optimize the code, you can use less stronger machine and which reduce the overall costs of the system. So this was the lecture, I hope you learned a lot and we managed to translate the message and we really enjoyed and thank you everyone, for your time.
Roy Levin received his Ph.D. from the Department of Computer Science at the Technion Israel Institute of Technology in 2013. He is currently a Senior Researcher at Microsoft and part of Azure Security Center (ASC). Roy has over 15 years of academic and industry experience in Machine Learning, Data Management and Information Retrieval.
Tomer is an experienced Software Engineer at Microsoft and part of Azure Security Center (ASC). Tomer has over 7 years of experience in developing Big Data and Machine learning pipelines.