Spark has become synonymous with big data processing, however the majority of data scientists still build models using single machine libraries. This talk will explore the multitude of ways Spark can be used to scale machine learning applications. In particular, we will guide you through distributed solutions for training and inference, distributed hyperparameter search, deployment issues, and new features for Machine Learning in Apache Spark 3.0. Niall Turbitt and Holly Smith combine their years of experience working with Spark to summarize best practices for scaling ML solutions.
Speakers: Holly Smith and Niall Turbitt
– Hello and welcome to the Scaling Machine Learning with Apache Spark talk at Data and AI Summit. I’m Holly Smith, and today with me I have Niall Turbitt, and we’re going to talk about using machine learning and how you can scale them with Spark. So before we go into the topic for today, we are going to introduce ourselves. My name is Holly Smith. I am a senior consultant at Databricks. What does this mean? This means I help our customers with our databricks projects. So there’s a lot of Spark work, to either work on their pipelines or sometimes their machine learning work as well, and hopefully deliver a project. I’m part of the professional services and training team, we do more than just consulting work. It’s also advisory work, that we might do on architecture, but also training. Like some of the training that went on earlier in the week. My experience is primarily from a banking background. I started off in credit risk and decisioning. So a lot of that big scale machine learning there. From there I went on to mobile banking, how we use the data there. So that data collection strategy, the data engineering, and then finally surfacing it to a bunch of different BI tools. And from there, I went on to forecasting and optimization.6 Primarily around the operational side of the bank. So how many people do you need to work in your bank branches? How many shifts do you need to give to have working in your call center?
– My name is Niall Turbitt. I’m a senior data scientist, also Databricks. And like Holly, I also work on our professional services and training team. So with this, my time is split, pretty much between working with our customers to build and deploy scalable machine learning solutions, as well as then deliver training classes which focus on data science and machine learning with Spark. Prior to my time at Databricks, my background has involved building scalable data driven, machine learning solutions across demands, such as supply chain forecasting, logistics optimization, and also recommender systems.
– Okay, so what is it that going to talk through today? So the first thing that we’re going to go through is, why do we think this was a topic that was worthy of your time, that we thought that people at data and AI summit would want to listen to. We’ll go over there and a quick recap of our Spark architecture, a lot of people use it day to day, they might not necessarily understand what works under the hood, and so that’s quite pertinent to what we’re going to talk about afterwards. Which is around the actual paradigms of machine learning on Spark. Particularly around our training and tuning at our inference sections too. Now we have a bit of a confession to make. This is a recording by past Niall and past Holly. Future Niall and future Holly should hopefully be in the chat right now. So if you have any questions or any comments, please put them in the chat today and we’ll be there to be able to answer them to you. Please don’t feel bad that you might be distracting us or taking us away from the presentation, because it’s already been done. So why is this something that we wanted to talk to you about today? Why is it worth your time and a talk of a Data and AI Summit? So in our day jobs, we see that there is sometimes some confusion about where and when you can use the Spark for machine learning. The classic tool is to use Spark MLlib but that’s not necessarily the only tool you can use. And there’s also more places amongst, I guess the ML life cycle, that you can use this to your advantage. The second thing that we want to get across is that Spark is incredibly powerful. We can see these workloads that take from days, sometimes, even weeks to run to reducing to something significantly more manageable. We also see people being able to take advantage of all of their data and reducing the need to sample down. And so that’s why we really think that you should consider about using Spark, for your workflows that you’re creating. And then finally, it’s a very fast moving environment. Things change all the time. We’re going to be taking you through some new topics today, and then you get to say that you were using it before it was cool, like some kind of Spark hipster. So let’s have a quick refresher of our Spark architecture. This’ll be relevant later on, when we’re talking about why things are good for certain jobs, but not necessarily others. So here we have a diagram of what our Spark cluster looks like. A cluster is ultimately what’s going to be running everything that we submit to it. The dark blue lines that we, or the dark blue boxes I should say, ultimately, they’re just Java virtual machines under the hood, but we’ve assigned them different roles, a driver and a worker. Within the workers, we also have cores as well. And they are what ultimately process those individual partitions of data. So a worker, their main responsibility is to take the instructions from the driver, process that data, and then send the results back to the driver. When we have our driver, our driver’s the one that’s coordinating all of the work and where the work goes, and what should be returned, and how it’s ultimately distributed across all of these workers. The number of workers that you can have can grow, can shrink. How do you adapt to that? And ultimately, this is the part of why Spark makes it so much easier for you, because it does everything under the hood. You don’t have to program this all manually. When we talk about loading our data into Spark, what we’re talking about, is loading that data into a data frame. So similar to a concept that you’d have with which means that data is ready to be broken up into partitions, to be processed by all of those cores, or dark gray boxes. Sometimes you’ll hear us refer to a worker as a node. So when we talk about distributing across our nodes, what we mean is distributing it across our workers. Now there’s a couple of things, drawbacks to this kind of architecture. And that is that when you want to take data and shuffle it across our workers, it can be quite an expensive operation because data has to go and travel across the network. So it’s ideal to minimize the amount of work that’s happening here. And so what we’re going to talk through a lot of the concept today, is this concept of distributed libraries or Spark, where libraries, as they’re sometimes referred to, but also single node libraries as well. And if it’s Spark away, it means that it knows that those workers are aware it will tell you how to break down these particular tasks, and so that it’s using one of those, compared to a single node library where, if you’re lucky, it’ll just use one worker node, and if you’re unlucky, it will be processed in the driver. Meaning your going to then have a very slow and expensive way of running your code.
– So in order for us to consider how we can apply machine learning in a Spark context, we first need to consider the machine learning life cycle and think about the various points at which we can utilize Sparks parallelism. So in any ML scenario, we necessarily start with a training phase, either training a single model or testing on multiple algorithms, on various versions of them, on all the while attempting to find the best performing option. So in this training phase, we are gonna be fitting a model to a predefined training data set and the evaluating then, how that model performs on some , validation set, or test set. Any model that we select, we’ll have a default set of parameters associated with it. Virtuous hyper-parameters and setting these hyper-parameters to different values, will ultimately affect then the model performance. And we may choose to undertake some form of model tuning. Where we adjust these hyper-parameters in order to improve the model performance of our final model. Once we have selected the best performing model, we now have a model artefacts to apply to new unseen data. It’s then in the inference phase of our life cycle, where we load this model artefact and use our ML model to make predictions. Our then is to drill down into each of these different stages, in building an ML application. And ultimately identify where we can take advantage of Sparks parallelism, to efficiently scale both training and inference. So with that in mind, let’s examine at a high level the potential different ways that we could take to utilizing Sparks parallelism, in firstly the training phase of our machine learning life cycle. So the first approach that we could take is training a single algorithm, across a distributed dataset. An approach we’re gonna refer to us data parallel training. In this scenario, we will use a distributed machine learning library to coordinate the training process across multiple nodes, as Holly mentioned. Our algorithm then we’ll train on a single training dataset, partitioned across workers, and results in a single model artifacts being created. With this approach, we have the ability to scale up to very large datasets, and we’re not gonna be restricted to the amount of data that we can fit on a single machine. So this then gives us the ability to learn a much more representative model, given that we’re not forced to train on a dine sample dataset. One disadvantage of this approach however, is that, not all algorithms scale efficiently in a distributed setting. So one such example of this being K nearest neighbors, where the model itself is the entire dataset. Data parallel training may not always be necessary. However this doesn’t mean that you can still take the advantage of distributing certain aspects of the training process with Spark. One approach that we can take, a single node libraries or in other words, packages where a model is trained on data on a single machine, is to train multiple models at the same time in parallel. So take for example, a use case, where I might have many IOT devices and I wanted to train an individual model per device. One way that we can use Spark to paralyze this training process, is to train one model per device, training many unique models in parallel. Big advantage of this approach is that we can massively scale out, our model training in that we’re not going to be restricted to a sequential training process on a single machine. An important caveats to call it with this approach however is that we need to ensure that, all the data for each group of data, can fit on a single worker node. So another way that we can think about using Sparks parallelism, is in our tuning step. Where we can make use of parallel hyper parameter optimization. And this approach can be used for both the distributed training case and also the single node training case, where we’re going to use that distribution to evaluate, multiple hyper parameter configurations asynchronously. And this can be particularly powerful, whenever we have a single node machine learning library and we want to optimize hyper-parameters over complex search space. Say for example, a TensorFlow or PI torch model with many different hyper parameter settings. In this instance, what we do is fit multiple models to the same training set in parallel. However, each of these models has a different set of hyper-parameters configured. This can allow us to dramatically scale up the tuning process, in addition to taking the last time to find an optimal set of hyper parameters. So we’ve just taken a high level look at the different ways in which we can take advantage of Sparks parallelism in the training phase of our ML life cycle. Let’s not take a look at the inference side of things. So once we have a trained model, be that, either a single node model or a distributed library model, we can also utilize the scalability of Spark, in our inference pipeline. And how we use that, either the distributed or single node model use case, is that we conceptually take similar approaches. So we’re gonna take a trained model and then apply to new on-scene data, which is then partitioned across a cluster. This works in a highly scalable monitor in a batch setting where we might be making predictions every hour, down to even minutes where we might have a streaming use case. However there are also certain considerations that we have to bear in mind, such as, this type of deployment, which I’ll know I’ll pass over to Holly to explain.
– So would you Spark for machine learning? There’s a couple of things that we need to keep in mind before starting. First of all, let’s take a look at our data. How big is this data? What kind of data do we have? Do we think this is going to be something that’ll fit within to a single node in our Spark cluster? Or is this something that’s going to be so big, it’s going to need to be broken up. The next area that you want to think about are the compute resources that you have available. Do you have a cluster that is going to scale out endlessly to 200 nodes? You’ve got something that’s GPU accelerated, which is great. If you do, if you don’t you might be a little bit more restricted on what it is that you want to do, or if you’re training the same model but in parallel, you know what are the things that you want to maybe cut back on if you don’t have a massive cluster to your advantage? Again, the other area that you want to consider is do you want to have that single machine versus something that’s say distributed computing? And then finally, the last thing that you need to consider, is inference or otherwise known as, deployment requirements. Coming from someone who has a lot of experience in the field, this is normally where I see a lot of projects trip ups. I’d really encourage you to think about how are you going to deploy your model before you even start training? There’s a couple of different options that you have available to you. Batch streaming and real time, or they all do different things. It’s not the one is inherently better than the other. If you’ve got a lot of data and your latency or the time that you need to be able to do, perform this inference, is kind of in the hours to days area, that I’d really recommend that. So an example might be, customer churn prediction. There’s very little things that you can do to impact the way a customer is going to churn. Minute by minute, it’s normally an email that’s sent out. I don’t know, an hour or a notification that you might push. It doesn’t necessarily have to depend on something that’s minute by minute. Or wherever you were looking at something that’s in the kind of seconds to minute area, streaming might be a good option for you. In terms of the throughput. It’s not as big as the batch, it’s a good bit more medium sized. A lot of this comes with a kind of in general theory there are exceptions to this, but in general streaming it doesn’t have as big a throughput as it would for batch. An example might be predictive maintenance. When you want to know when something has maybe broken and you want to be able to repair it as quickly as possible. And then finally, real time where the throughput is significantly lower. It’s normally one record at a time, but the latency is a millisecond. So you can do it incredibly quickly. An example might be fraud detection where someone is trying to make a transaction or payment or log on, and you need to be able to give that response within the milliseconds. Something that’s not on here is around edge deployments and embedded devices. That could be a whole talk in itself. So we’re not going to talk through those, but it’s around batch streaming and real time. Batch and streaming you can very easily do with Spark. Real-time does require a bit of extra work to make work with Spark 2. As and most of the focus is on batch and streaming today. So the first thing that we’re going to talk about, is the very well known Spark MLlib. It’s Spark’s built in machine learning library, which aims to make practical machine learning kind of scalable and easy. The library consists of a range of machine learning algorithms implementations, various featurization techniques, and the ability to chain these operations into a single workflow via it standardized pipeline API. It does all that hard work for you, at distributing your training, instead of having to implement this yourself, which would be a lot, a lot of hard work. Most people kind of compare this to sklearn. I’ll look at some code later. You’ll see where the familiarity comes from. But there is often some confusion about, where to use Spark Mllib, versus other single node equivalent, such as sklearn. Ultimately the choice of package is determined by the size of the dataset that you’d like to train on. If the training set that you’re looking at can fit onto a single machine that the likes of sklearn and other single node libraries they can be a valid choice for you. However, when it comes to much larger data sets, where data has to be distributed across a cluster, MLlib is far by far the more viable choice. I’d really encourage you to use this when you have large data, as you know down sampling leaves you with like less representative data, and ultimately a less representative model, where valuable information could have been lost in this process. Due to differences in implementation, often you won’t get exactly the same result, as when building a model with sklearn versus MLlib. If you’re porting existing workloads from sklearn to Mllib, I’d strongly advise you to look at the documentation. To evaluate the parameters differ and not just where they differ, but what the defaults are. And if that’s what you wanted, when you had it with sklearn, once you’ve tested with some data that’s on the same sample, and you can validate that you’re getting similar enough results that you’re looking for, that’s when you can start to scale up. One caveat to call out with regards to Mllib, is real time inference settings. Although it’s great for large-scale model training, and batching and streaming use cases, it won’t be a single node model for real time inference on small data sets. And if you think about it, that’s because there’s a lot of data that’s going across a network. Because it’s going across all of our workers, there’s a lot of network travel that’s happening there. And so it’s not going to be as fast as something that’s on a single node. Just a note on terminology, a Spark has two machine learning packages, Spark Mllib, and Spark ML. Spark MLlib was the original machine learning API based on the RDD API which people don’t use as much anymore. However, it has been in maintenance mode since Spark 2.0, which means that no new features are going to be added. Whereas Spark ML is the new API based on data frames instead of RDDs. The term MLlib is used ubiquitously as an umbrella term to refer to both of these ML libraries and packages in Spark. So if you are looking through the documentation, you’re getting a bit confused. This is what is meant by MLlib. It can be a bit confusing. So here we have a code snippet, it’s pseudocode, it’s not designed to run. However, for those of you familiar with sklearn, this will look incredibly familiar. You’ve got your standard import statement at the top, you have your train and test data frames, so remember that spark.read, which is going to make it a spark data frame, instead of like a Panda’s data frame. And then you have the.fit and the .transform, which again is very similar to those familiar with sklearn.
– So as Holly just mentioned, Mllib isn’t necessarily the best solution for all machine learning needs. So it might not meet certain SLAs needed with regards to low latency serving requirements, or there may not be built in support for the model that you want to train your dataset. So when Spark MLlib doesn’t necessarily meet these requirements, there are still a large number of ways that you can leverage the parallelism of Spark. The first search cluster we’ll look at is how to use Sparks Pandas function API to create a custom function, and then train many models in parallel with that function. So introduced in Spark 3.0, under the new Pandas function API, is the apply in Pandas method. With this what we do is, take a Spark data frame and kind apply a native Python function on different subsets of our data. When applying this function, we treat each group as if it were a Pandas data for them. And as such can make use of all the functionality that you would get with Pandas, albeit returning a Spark data frame. So this groupby data apply approach, out of split apply combined pattern, which a lot of people might be familiar with from using Pandas In this manner what we do is group by a certain column, splitting our data into groups, apply a defined function on each group, and then combine the results into a new Spark data frame. This approach is really powerful in cases where we might want to train many independent models. So just to make that more concrete. Again thinking about a use case where we have many different IOT devices. I want to train a separate model for each device. If we have a look at the little code snippet to the side, what we can do is define that Python function, which will train a single node ML model such as the scikit-learn model, and taking as it’s input a Pandas data frame. Once defined we can group our Spark data frame, by that device ID, and apply the function above to the data from each device in parallel. This enables massive scaling of the training process. So where before we might have to train these models in a serial manner, we can now train many models, asynchronously in parallel. A caveat to raise here is that, when calling the groupby prior to the apply impounded method, we necessarily trigger that that Holly was referring to, offer a dataset thus moving data between our workers. So something to be aware of here is that, we need to ensure that both our model, on the data for each group, can fit onto a single machine or a single worker. So some people might also recognize this API to be very similar to that of Sparks existing groupby that applied, that was introduced in Spark 2. Under the hood these two APIs trigger identical operations. However, this newer API so they apply in Pandas, is recommended as the older API. So the groupby .apply method will be deprecated in future releases of Spark. Another approach that we can take to harness the parallelism of Spark in our ML lifecycle is during the tuning phase where we want to take advantage of Sparks parallelism. So as mentioned previously is a crucial step in the machine learning workflow and allows us to optimize the performance of our algorithms and essentially involves tweaking the many different leavers of our algorithms in order to find a best performing model. So this process can involve training hundreds or even thousands of different models all with different permutations of hyper parameter configurations. So if we take, for example tuning a neural networks, such as a keras model where there can be many different hyper-parameters to set and where it might be very computationally expensive to train. One approach that we could take to training is that a grid search, where we exhaust the inspect every combination of these values. An alternative approach however could be to utilize any information of previous models that we have trained to make more informed decisions about what hyper-parameters, we should try out next. And one such package that allows us to do this in a super efficient way is hyperopt. So hyperopt is an open source Python library that enables us to do this process in a highly efficient manner. So given a search space hyperopt lies to train many models either in a serial or parallel manner to find a best set of hyper parameters. We can apply this to both a single node and distributed ML library. However one thing to call out here is that there’s not the ability to train multiple MLlib models in parallel with hyperopt. So we can either make use up the parallelism of our cluster to train many single node models with hyperopt, or alternatively to train one single MLlib model at a time across the entire cluster. And under the hood, what hyperopt uses is a bayesian based approach to adaptively select the next best set of hyper parameters at each step in the tuning process based on what we’ve already previously seen. So rather than exhaustively search all different permutations of our hyperparameter values, what we can do is be much more efficient and economical on how we search, how we decide on which configurations to try out next. So not only does this mean that we can explore the hyper parameter space in a much more intelligent way, but it also allows us to sample from a much wider space. So there two ways in which we can scale hyperopt with Spark. The first of which is just using standard hyperopt, and under the hood then using a distributed library such as Mllib. And when we use it in this manner and there’s nothing that you need to specifically configure with hyperopt versus any other library. However, where we can get massive scaling, is an approach where we use distributed hyperopt with a single node library using hyperopt Spark trials class. So with Spark trials what we can do is combine distributed hyperopt with single node models. And as such, we can make use of the likes of scikit-learn, TensorFlow, keras, XG boost, and can easily conduct parallel hyper parameter search, using these single node models. And it works as such, in that each iteration in the tuning process, hyperopt coordinates the training of many independent models in parallel, each model with a different set of hyper-parameters. This, then means that the data is replicated around our cluster, but each with different models being trained at the same time across our workers. Once a model has been trained this result of them reported back to driver of our cluster and high profitable, then generate a new next best set of hyper parameter configurations to try it next. So whilst this approach is necessarily sequential in nature. So it’s dependent on what we have previously seen, at each step in this tuning process, we’re providing more information about the state of the hyper parameter search space, to hyperopt, then if we were only training in a sequential based process, training one model at a time on a single node. So there are full kind of dedicated talks that get into a lot of the details and do much deeper dives as this, however, why we’re not covering explicit code here, we will have a full coded notebook of this and all the other techniques that we’re gonna cover, and we’ll share that in our resources section at the end. So lastly we’ll take a look as how to enable distributed inference. So we have discussed using Sparks built in machine learning library for the training and tuning aspect side of things, and explore different ways in which we can utilize the parlors and the Spark for both training multiple models of parallel and doing parallel hyper parameter search. So let’s actually grind off the full life cycle of this by exploring how we can then utilize Sparks parallelism at inference time. Once we have that trained model artefact. So a common workflow that we see being used to increase in the really powerful fact, is taking a single node bubble and using Spark to distribute inference of these models across a much larger dataset that would be possible on just a single machine. So this might mean taking either the likes of a scikit-learn model, a TensorFlow model, that we’ve trained on a single machine, or perhaps even on a subset of the data, and then performing distributed inference with that same model in a distributed setting with Spark. In instances where a model is very large or very costly to load into memory, a very efficient way of doing this, is to utilize Sparks Pandas Scalar Iterator UDF. UDF here standing for a user defined function. So again, our recent addition to Spark 3.0 these user defined functions can accept an iterator of upon this series or data frame, and then apply a defined method in batches. So we’re gonna start with a Spark data frame and then under the hood, this data frame will be split into batches and our defined function called on each batch. For larger models, as I mentioned, there can be significant overhead and repeatedly loading, that model into memory on each batch, in the same Python worker process. So through using this Python iterator, what we can do is, load the model once, cached in memory instead of repeatedly loading it over and over again.
– So that was scaling machine learning with Apache Spark. So we found out that it’s not just for training, it’s applying spark to different areas of the life cycle. We spoke about spark Mllib, about paralyzing that single model training, also paralyzing training of independent models using something like pandas function API. We can also use things like hyperopt for our tuning, but it’s not just for that training phase. It’s also for inference too. And just as now mentioned about the for Pandas Scalar Iterator UDF about distributing your inference. One final, final, final reminder from us, please decide how you’re going to deploy your model before you start training. Now we are aware that we spoke about quite a lot of things here. I’m gonna stall for time whilst you take a screenshot of this, but this is a link to all of the resources that we have that are available to you. Niall has written up a wonderful notebook that I really recommend you go and check out. The Panda’s UDF blog post, and also the links to the docs too. And finally, first of all, thank you so much for coming to this talk today, Niall and I had a great time preparing it. We really hope that you enjoyed it. If you did, I’m Holly Smith, and this is Niall Turbitt, and if you didn’t enjoy it, I’m Ali Ghodsi, CEO of Databricks. Thank you.
Holly Smith has over a decade of experience working with Data & AI teams in a variety of capacities from individual contributors all the way up to leadership. She has spent the last two years at Databricks working with many multi national companies as they embark on their journey to develop their data maturity. She also works with the non profits Datakind UK and Tech Talent Charter to advise on data strategy and operations.
Niall Turbitt is a Senior Data Scientist on the Machine Learning Practice team at Databricks. Working with Databricks customers, he builds and deploys machine learning solutions, as well as delivers training classes focused on machine learning with Spark. He received his MS in Statistics from University College Dublin and has previous experience building scalable data science solutions across a range of domains, from e-commerce to supply chain and logistics.