The development of big data products and solutions – at scale – brings many challenges to the teams of platform architects, data scientists, and data engineers. While it is easy to find ourselves working in silos, successful organizations intensively collaborate across disciplines such that problems can be understood, a proposed model and solution can be scaled and optimized on multi-terabytes of data. In this session, the T-Mobile Marketing Solutions (TMS) Data Science team will present a platform architecture and production framework supporting TMS internal products and services. Powered by Apache Spark technologies, these services operate in a hybrid of on-premises and cloud environments. As a showcase example, we will discuss key lessons learned and best practices from our Advertising Fraud Detection service. An important focus is on how we scaled data science algorithms outside of the Spark MLlib framework. We will also demonstrate various Spark optimization tips to improve product performance and utilization of MLflow for tracking and reporting. We hope to show the best practices we’ve learned from our journey of building end-to-end Big Data products.
– Hello everyone and welcome to our presentation titled Advertising Fraud Detection at Scale at T-Mobile. My name is Eric and I’m a data scientist and I’m joined by Phan who is a data engineer. Both of us are on T-Mobile’s marketing solutions team, which is a new team within T-Mobile that provides services to the advertisement industry.
We’ll be talking today about our research project we developed intended to identify potentially fraudulent ad activity using data gathered from T-Mobile’s network. While discussing similar challenges we faced and other tips we learned in the process of developing this tool. To give you an overview of the presentation we will start with some background information about the ad tech industry. Then a final talk about Spark tuning and optimization. Then we’ll discuss Spark code for those with Python background and some specifics about one of the algorithms we are using to find fraud. And finally, a final finish by expanding our process of automating the project and our use of mlflow. Again, before get into the details of the project, I want to give some background about the ad tech industry for those less familiar, because it will help explain why ad fraud is a difficult and pervasive issue.
To give you an idea of the scale, it’s estimated that advertising fraud cost advertisers about 23 billion dollars in 2019. And that number is expected to grow in the coming years. We can say there are two main reasons why ad fraud is so prevalent. The first reason is a general lack of regulation around online advertising. Although that changed a bit after CCPA started being enforced earlier this year. But federal and global regulations are still fairly lax.
The second reason is that the ad tech industry is fairly complex, and not really understood by most people. With so many moving parts it can be extremely difficult to monitor everything that goes on during an ad transaction. Especially when the average American is exposed to about 5,000 ads every day.
To give you an example of the complexity of ad tech here’s a diagram of the main components used during an online ad campaign. You’re likely familiar with the idea of an advertiser and a publisher, but there are a few more components that control the movement of information and determine who sees what ad and when. For each impression, this entire process of determining which ad to show, how much the ad will cost, and who will see it, takes less than a second. And each moving part presents an opportunity for fraudulent activity to occur. Although, since it is almost always the advertiser who is the one losing the most money, the fraudulent activity is most likely to occur on the side of the publisher.
This is made more apparent when we dig into the two main types of fraud. The first is a bot farm. A bot farm is essentially a collection of devices with the sole purpose of browsing a particular website or app, or to generate ad impressions for that publisher, while the publisher benefiting from the ad impressions and the clicks. But to be clear everyone accept the advertiser makes money in this scenario. The other main type of fraud is referred to as domain spoofing. This can take several forms, but simply it is the process of a publisher sending a URL to the SSP, it is not the actual URL of the publisher to trick everyone upstream, into thinking they are sending the ad to a legitimate website.
And this leads us to the question of how we identify fraudulent activity? The first thing we need is data. We need to be able to see and track what is going on in the billions of online ad transactions that come through our network. Being a telecom company, we have the advantage of being able to utilize all of the network data used by T-mobile devices. Second, we need to use this data to develop a model that can identify several different types of fraud. And lastly, we need to be able to scale this model to handle the four to 10 terabytes of data that T-Mobile collects everyday.
There are a number of technologies that we use at T-Mobile, including both on-prem and cloud based platforms as you can see here. There are instances were we use a hybrid approach.
But the most important thing is to develop a working pipeline that can extract data from its source, so that the data can be used to train a model and generate the required outputs. The process always carries data from a traditional format, whether that be ORC, Parquet, or CSV to the desired output, whether that be a table of metrics, or a dashboard for visualizing data.
Many data scientists, like myself, are used to working with Python and SQL to accomplish some or all of these tasks. But when the scale of data becomes great enough, for example when you’re dealing with terabytes worth of data, Spark becomes a necessary tool.
Hive and SQL don’t really have the machine learning tools required to do most data science work, and Python just can’t handle such large amounts of data.
So with that in mind I’m going to pass it over to Phan, who is going to go into some more detail about Spark tuning. – Hey, everyone. My name is Phan, I’m a data engineer, a data scientist in T-Mobile Marketing Solution Group.
Working in Spark daily means you are going to involve a lot with resource allocations, as well as reading, writing, joining and aggregations.
Which just happen to be our topics today.
When you start your Spark application this mean it’s starting a location by default. This mean you have to tell resource manager exactly how many executors you want to work with. The problem is, you usually don’t know how many executors you actually need, before you’re doing your job, it’s not obvious. You often allocate too few that make a job really slow, and even fail, or you allocate too many that you’re wasting a lot of resources. In addition, those resources are going to be occupied by the entire lifetime of your application. Which also leading to waste resources unnecessarily. That’s why the flexible mode core dynamic allocation come into place. With this mode executors are spinning up and down, depends what you’re working with, as you can see in the left chart here. This mode however, come with a bit of set up. You might want to set the initial and medium executors. As the name suggests those are the medium resources for your app. Take a bit longer to spin up, but it’s faster when you’re executing your jobs. You may also want to set the maximum executors. Because you know without any limitation your app can take the whole cluster take over. Yeah, I’m talking about 100% of the cluster. Executor at the time of the conflict will keep your executor leave a bit longer than it should while waiting for a new job. Otherwise it could be killed by idling, and a new job will require new executors which need some time to start. Which also cause some delay. Cache executor at the time of conflict will release executors, though saving your cache. Mind you, if you don’t set this up and you cache your data quite a bit often, this dynamic allocation would not become much better than set allocation.
Spark doesn’t read files individually, it reads in batch. It will try to merge files together until a partition is filled up. Think about this obvious case you have one million files in a directory, and you have one gigabyte data, one million files, so you have one byte per file. With the configuration (audio skips) partition about eight megabyte here. You will have about 500,000 tasks, and the most likely job will fail. The bottom left chart show you the corelation between number of tasks. And the partition size is simple. The bigger partition you conflict the less size you will have. Too many tasks will make the job fail, because of driver memory error, AM memory error, or connection error. Too less size not only would slow down a job, because of lacking priortization.
And often it will to executor memory error, as well. Fortunately we found a formula that can calculate the ideal batches and size. It involve data size, number of files, number to read, number of executor, number of cores. Apply all of those parameters into this formula from the above example, you will have partition size of about two gigs, and 2,000 tasks. A much more reasonable number.
When you draw your table or do any aggregations, Spark will shuffle data. Shuffling, it’s a very expensive job. Your entire data set will be moved through network connections between executors. Tuning this shuffle behavior however, is an art. Take a look at these for example. A mismatch between four, between shuffle partition conflict I have four here, and a number of current available threads. Eight in this case, because I have two cores four executors who with four threads for nothing. The ideal shuffle partition can be calculated by this formula. You can easily get a number of executors if you’re on static allocation mode. But how about dynamic allocation? You map the executor if you have it, or if you don’t. I’m telling you a secret, try to use number of subdirectories of the folder you’re reading from.
Writing in Spark, it’s a bit interesting. See the live example here.
If you have 1,000 executors you have 1,000 files, the point is we couldn’t know if that is too many files, or too few files until we know the data size. From our experience with huge data keeping file size less than one gig.
Keeping file size less than one gig, and the number of files less than 2,000 is good enough. In order to do that, you can do either coalesce or in partition. Coalesce it’s good enough in general if you want to scale data files. Repartion on the other hand will shuffle each station, will speed up a job if your data is skewed. Like the right example here. Of course, this will be more expensive.
Now I’d like to show you quickly more about how this Spark tuning, how effective it is in our case. Our demo today has two sections. First, I would like to show you our customized kernel and its feature. We call it MySpark. This kernel is our attempt to work with Spark choosing Jupyter on an on-prems environment. It allow us to set up Spark configurations on each notebooks easily, with restarting Jupyter. It’s extremely useful for data engineer who has to work with different scale of data, and a different set of conflicts for a data set. It’s integrated with SQL, so instead of typing data frame dot show 20, false all the time you can simply use magic cell SQL, type your query and enter. It also applies a permanent fix for the broken format when you’re displaying Spark data frame. It connects to Spark UI and give you a quick review of current job progress.
Now let us present you the optimization technique that we mentioned. We apply it into two data sets. They’re almost the exactly the same about data and structure. There are seventy four directories in a terabyte daily. The main difference is the first one has about 250,000 files, and the second one has only 1,100.
If we read them both using different conflicts, for the first one you have 16,000 partitions and therefore you have 16,000 tasks. When you perform an action they’re going through an entire data set.
It took about 20 minutes to finish. For the second one it’s way less files. Number of partitions is about half compared with the first one, and it took only five minutes.
Now let’s apply the formula we showed you earlier. It’s equal with data bytes plus number of files. Type open co_byte which is four megabytes here. All divided by number of executors which is number of directors in dynamic allocation, times number of cores from our configuration. We’re gonna print out some useful information, so we can refer back later. Then we run the same code as before. You can see the number of partitions drop to less than 1,000. And our execution time is reduced in half. What if we apply the fix for the second data set? We ended up with 800 of partitions and tasks, and less then two minutes to accomplish the job.
Now, Eric, our data scientist talk, will go through another aspect, how a data scientist would work with PySpark. – Many data scientists code in Python. This is the programming language I am most familiar with, and I know there are many other data scientists with similar backgrounds.
Making the transition from Python to PySpark can be challenging for anyone, so I’m going to talk through an example of some code I needed to write in PySpark and how I got there with the help of ADF.
For a lot of what data scientists might do on a day to day basis, there are many similar functions and methods in Spark that would be familiar to someone working used to working with the Pandas in scikit-learn libraries. For example, a reading of CSV file, splitting your data into chain and test sets, and fitting a model, are all things that have similar syntax between the two. And if you are using only predefined methods and classes, writing code in Spark may not be very difficult.
There are a number of instances where converting your Python knowledge into Spark code is not straight forward. Whether that be because the code for a particular machine learning algorithm isn’t in Spark yet, or the code you need is very specific to the problem you are trying to solve, sometimes you need to write your own code in Spark.
And with Spark’s cryptic error messages to guide you that may seem like a daunting task. So I’m going to show you a piece of code that I wrote for this project, first as a UDF and then a Spark code, then talk through how I got there.
As someone who is more comfortable writing in Python than Spark, I decided to first write out my code as a Python UDF, which you can see here. Once I did that, I broke my code down into logical snippets, or lines of code that performed a fairly basic task, which you can see outlined in the colored boxes. From there I was able to find a function, or more likely a group of functions that served to accomplish the task I set out to complete. After doing this for each of the logical snippets, I just had to make sure that the bits of code fit into each other, and test it, the end result was more or less the same. Then I was left with code that ran in Spark, rather than a Python UDF.
And to justify this effort here are the results showing the difference in performance between the UDF and the PySpark code. PySpark code ran over twice as fast, which for a piece of code like this that runs everyday, it’s getting many hours of compute time over the course of the year.
So now I’m gonna take a minute to talk briefly about one of the metrics we use to identify potentially fraudulent activity in our network. The metric is called normalized entropy. For the data scientist watching familiar with decision trees, you may have used or know of Shannon entropy, which is a metric commonly used to calculate information gain. Normalized entropy in this case is the normalized form of Shannon entropy. For those unfamiliar with Shannon entropy, as defined by the equation in the top right, as the negative sum of P of X times the log of P of X for X. Where X in this case is a T-Mobile customer, and P of X is the number of times a given app shows up in the customer axis network, or C of xi, divided by the number of times the app shows up in the entire network, or C of capital X We get the normalized entropy by dividing Shannon entropy by the maximum entropy for a particular app. And the equation for this is given just below the other one, and is defined as one minus the sum of C of xi, times the log of C of XI, divided by the C of capital X, times the log of C of capital X. We can then multiply by 100 to get a value between zero and 100. The idea behind this metric is fairly simple. Since common apps tend to get intermediate traffic from a number of different people, we would expect safe apps to have a value of around 40 or so, give or take a couple deviations. You can see this by the histogram in the bottom right corner, which shows the distribution of normalized entropy values for all the apps we are tracking. Whereas values that are close to zero or close to 100, we we would score higher in terms of their potential (mumbles). This is of course, not a flawless metric, which is why we include several others in the final analysis. For example, often only people, people only use their banking apps every few days, so apps like these would have a higher normalized entropy that are unlikely to be fraudulent.
Now, back to Spark.
The code you see in the top left is a code we are currently using to calculate normalized entropy. And what you see on the right is the time difference between running this code with a default configuration, in an optimized configuration. The optimized config sets the executors to 100, four cores per executor, two gigs of memory, and shuffle partitions equal to the executors times cores, or in this case 400. As you can see the difference in compute time is significant showing that even fairly simple code can benefit greatly from the optimized configuration end up saving you a lot of waiting time. This is a great skill for all data scientists to have for analyzing large amounts of data in Spark.
The last thing I wanted to bring up with regards to Spark configurations is the difference between static and dynamic allocation. The two previous configs were static. But when I used dynamic allocation with the dynamic equivalent of the static config shown in the top right, the dynamic config takes almost twice as long to compute the same data using the same code as the static config. The reason for that is that the Spark cluster will initially only spin up the minimum number of executors. It is only when you start running your code that the number of executors increases and only as needed.
This adds time, since executors do not spin up immediately and can extend your compute time. This is most relevant when running shorter jobs, and when executing jobs in a production environment.
If you are doing something like EDA or debugging, dynamic allocation is still probably the way to go. – Now we scale our mode to have terabytes of data, we’re ready for the next step, productionize it and turn it into a complete end to end product.
This is a quick overview of our AFD platform. The data pipeline at the bottom of the S architecture, which is what we talk about in the last few minutes is scale and put to our production environment. The final of the aggregated as well, is the other measurement as you can see, would be aggregated and transformed into an intermediate platform in the middle here. Where web applications, analytic tools, and alloting system can reach out efficiently. As you can see, we also implement our own anomaly detection model and tuned it up with mlflow which we’re gonna talk about it shortly. We have a complete workflow to automate, monitor and operate this product.
In this project mlflow play an important role in monitoring phase, because of its simplicity and its built in function for tracking and visualizing KPIs. With a few lines of code, as in the right, we’ll be able to lock hundreds of our metrics in no time.
We tag them by dates for this visualizing purpose. And with anomaly detection model in place, we can observer our machine learning models output without constant monitoring.
The last thing I want to talk about how was our marketing team reduce the data generated from this project. We create a UI to help navigate the information provided. Navigating to the outflow project we can search for information on a individual app. For example, we search for an app develop by Kotech. A developer that was banned from Google Playstore in July of last year. We can see quite a bit of data collected about this app. Including basic app information, nego-traffic partitions and it just (unintelligible).
The marketing team will then be able to make decision, based on this information, whether they are comfortable publishing ads on this app. When we searched for New York Times app, we can see all of the data Azure shared with this app, including writing and GES core. Other features of UI include this version, and information about metrics used to generate GES Core, and GES core itself, which will have marketers (unintelligible) what just code they’re comfortable with. An alternative source tool that allows for sorting and filtering by different metrics. A monitoring page will show a selection of KPIs and the values over time including potential anomalies. We tagged it by date for this visualization purpose, and with our anomaly detection model in place, we can observe our machine learning model’s outputs without constant monitoring.
The last thing I want to talk about how our monitoring team would use this data directly from this project. We create the new UI to help navigate the information provided. As some of you might know validation is a very difficult process when it come to something like fraud, due to lack of clear definition of fraud. And it’s very important from legal and moral perspective to make sure it is done correctly. We are currently working with our marketing team to run AB testing to validate our process, and make improvement accordingly. And that is our presentation today. I hoped you enjoy it,
Eric is a data scientist on the T-Mobile Marketing Solutions team. Eric has a background in mathematics and experience working on a diverse set of problems including computer vision, predictive maintenance, and marketing analytics.
Phan is a data engineer on T-Mobile Marketing Solutions team, his current focus is scaling machine learning models in telecommunication network traffic using Apache Spark and supporting marketing decisions using data insights. With a wide experience from data warehouse, data analyst to software and product development, Phan brings a strong connection between different teams and helps to build a complete Machine Learning product from end-to-end.