Migrating Your Data Platform At a High Growth Startup

May 27, 2021 04:25 PM (PT)

Download Slides

At Abnormal Security, Spark has played a fundamental role in helping us create an ML system that detects thousands of sophisticated email threats every day. Initially, we set up our Spark infrastructure using YARN on EMR because we had previous experience with it. But after growing very quickly in a short amount of time, we found ourselves spending too much time solving problems with our Spark infrastructure and less time solving problems for our customers. Given we’re in a high growth environment where the only constant is change, we asked ourselves: aren’t these problems only going to get worse as we add more employees, more products, and more data?

 

Over the past few months, Abnormal Security executed a full migration of our Spark infrastructure to Databricks, not only improving cost, operational overhead, and developer productivity, but simultaneously laying the foundation for a modern Data Platform via the Lakehouse architecture.

 

In this talk, we’ll cover how we executed the migration in a few months’ time, from pre-Databricks-POC, through the POC, through the migration itself. We’ll talk about how to really figure out exactly what it is that you care about when evaluating Databricks, splitting out the must-haves from the nice-to-haves, so that you can best utilize the Databricks trial and have concrete and measurable results. We’ll talk about the work we did to execute the migration and the tooling we created to minimize downtime and properly measure performance and cost. Finally, we’ll talk about how the move to Databricks not only solved problems with our legacy Spark infrastructure, but will save us huge amounts of time in the long-run from adopting a Lakehouse architecture. 

In this session watch:
Carlos Gasperi, Developer, Abnormal Security

Transcript

Carlos Gasperi: Hey everyone. I’m Carlos Gasperi. I’m a software engineer at Abnormal Security. And today I wanted to tell you guys about how to migrate your data platform at a high growth startup or how we did that over the past few months at Abnormal Security. So first a slight agenda. So first I’ll tell you guys what Abnormal Security is and what we do. Then I want to go in kind of chronological order starting in Q4 of 2020 as to the state of our data platform, what were the issues and challenges that we had, how we sought out to try something new with Databricks, how we went through the actual Databricks POC, how we actually executed the migration to Databricks over the last few months and what that sets up for the future. So, first a little bit about Abnormal. We’re a next generation enterprise email security platform.
What does that mean? So we’ve built a machine learning system that can detect and remediate sophisticated email threats. A little bit about our history to kind of illustrate our growth. So in 2018, we’re in stealth mode for awhile, in 2019, we launched with 30 employees, 2020 we’re around 90 employees, and in 2021, we’re expecting to be somewhere between 150 and 200 employees. And I tell you guys this to kind of illustrate that we’re in kind of a very high growth environment and what the implications are for our data platform. So first I want to talk about kind of where we were in Q4 of 2020 with regards to our data platform. So to kind of split up our data platform into three main pillars, for our data pipelines, we were scheduling our production of Spark jobs via Airflow. We’re a Python shop so they were all PySpark jobs with dependencies managed via Conda.
Our cluster technology was we were running on AWS and EMR. And as our blob storage, we’re using S3 kind of a typical setup and for storage formats, we’re using thrift and some Parquet. For our data science pillar, basically our ML engineers and our data scientists use Jupiter notebooks, basically live in Jupiter notebooks. And for our third pillar, which was analytics, we were pretty nascent. We had set up a Athena to query our STB data sets and any AdHoc quering would go directly to the production database that has that data and we had no data warehouse at all.
So first talk a little bit about our cluster infrastructure setup. So like I mentioned, we were running on EMR. We were running YARN clusters that, we had several long lived clusters. Jobs were submitted via SSH from the command line or Airflow. We had monitoring via Ganglia. And again, our third party dependencies were managed via Conda. So what were some of the challenges with this setup? So the core one is multitenancy. So we started out with one big EMR cluster, but then as we started having different applications that were basically different workloads at different schedules, we ran into problems of resource starvation where really resource intensive jobs would stop other jobs from getting on the cluster or executed. And because we had no in-house expert on YARN and also didn’t want to necessarily build that expertise. We used Terraform to set up our cluster.
So we would basically just have the band-aid approach of creating more clusters. And at that moment in time, we ended up with seven different clusters. So it was the multi-tendency issue. Then there was the auto scaling problems. So with having long lived clusters and wanting to operate them efficiently and having workloads that don’t need a really big cluster on all the time, we had to basically figure out the auto-scaling issues to be cost-effective. There was a lot of issues around this, like we had to make sure we had the right metric to auto-scale on because different workloads were running on different clusters and they had different schedules. We had different auto scaling policies per cluster, and it kind of just took a lot of time to get this right and we hadn’t gotten it fully right. For dependency management, like I mentioned, we used Conda to manage our third-party dependencies.
And the way that we would do that is that we would install the Conda environment on the master. And then we would distribute that kind of environment to the workers having Spark do that via like this tar. The problem with doing it that way is that the Conda dependencies are the same across the whole cluster, so all the applications would share that environment. So, any sort of problem with that environment, if you pushed a bad dependency that broke the environment somehow, it could actually break all the applications running on the cluster and we ran into this quite a bit. And because our scripting, we’re trying to keep all the Conda environments up to date, we had a few situations where we’ve just brought down all EMR clusters at the same time. So for another issue with having these long lived clusters is that you need ongoing maintenance, right?
We need to, if a job is running that’s using local storage on the driver, for example, it doesn’t clean up after itself, over time, the disk on the master could get full and would hit safe mode for example. I’d have to go in and clean up the cluster. So just spending a lot of time maintaining these clusters and taking away time from actually writing Spark jobs that run on these clusters . So for our notebook infrastructure, I mentioned that our data scientists and ML engineers basically lived in Jupiter notebooks. For this, we set up two big EC2 machines that would allow our ML engineers to run local, big Spark jobs or train models and stuff like that and allow them to do it in a quick way.
The problem with this setup is that, again, it’s the same multi-tendency issue. It’s a shared machine, meaning that if you run some big job that consumes a lot of disk and you don’t clean it up, somebody comes in later and can’t use the notebook because there’s one engineer that consumed all the disc on the machine. Similarly with memory problems, if somebody is running a really memory intensive job, it stops other people from running memory intensive jobs in the same chain. Another issue related to cost is that these notebooks are on 24/7, even though they’re only during business hours. So they were actually quite expensive just having these two big EC2 machines were something like five or 6% of our total infrastructure bill, which was quite a lot considering it’s only two machines.
The same problems with the Conda environments that we had on the clusters also exists on the notebooks because you have a single Conda environment that’s being shared by a lot of users and applications, so it actually happened that somebody upgraded the version of TensorFlow and then somebody else trained the model, not knowing that the version had been upgraded, and then turns out we pushed out a backwards incompatible model is a problem that actually happened.
So this is an example of just, we have a Slack channel where we have a notebook user Slack channel, and it’s just a search for people making requests to get resources on the notebook, to have people clean up space. If you search for memory, there’s also probably a lot of these where people are just, try to use a notebook, couldn’t, went in to Slack to basically be blocked until somebody let them run stuff on the notebook. Again, for analytics, it was quite nascent. We had Athena and then any sort of query that you wanted to execute went directly to the source data. So dashboards would go directly to MySQL directly to Postgres. These databases are shared with production, meaning that if you executed a really intensive analytics query had the potential to cause an issue in production.
Another problem with the setup is we had no way of joining across different data sources. You would have to split. It was actually a big, custom process to do so. So we were starting to see here the needs for a classic datahouse environment. So we’re in a high-growth startup, right? We’re having more engineers all the time. We have more data, more products, more customers. Over time, these problems are only going to get more difficult and we haven’t actually revamped the core way our data platform works since 2018, basically since the first time it was set up. So at that moment, we’re thinking it’s 2020, there probably has to be a better way. So that’s what led us to looking into Databricks and trying it out. So we had been thinking about it for awhile. Then in Q4 of 2020, we actually did it.
So pre-POC, we had these three main use cases that I talked about before that Databricks could potentially help us with. We wanted to potentially replace our EMR environment, wanted to potentially replace our notebook environment, and we potentially wanted to implement a data warehouse. But of these three use cases, we only have a certain fixed amount of time for the POC. What is the necessary sufficient condition to actually go through with buying Databricks? And we decided that was going to be EMR placement use case so we mostly focused on that. Something that was important, which might be obvious was I was trying to be as explicit as possible with our success criteria for EMR placement. So we kind of had a few different categories. We wanted at least just a basic feature parody, right? We wanted a way that we could install our Conda managed dependencies.
We wanted the easy ability to troubleshoot view logs, Spark UIs, et cetera, et cetera, the same things that we had [inaudible], it was table stakes for database. We wanted cluster management to work really well. So in Databricks, because we were going to start using job clusters, which are clusters that you bring up, run the job, and tear them down, there’s no big shared cluster, we wanted to make sure that this flow of bringing up a cluster, running the geometry down was quite quick. So we said, let’s make sure it’s not long. I mean, it doesn’t take any longer than 15 minutes. And it has to be reliable. We want to be able to do this a lot of times and have it be a reliable process. And the final two were cost was not a huge concern, but we were saying, well, we’re willing to pay a 10% premium if it means we can solve a lot of these problems that we’re running into.
And finally another table stakes thing was technical support. We need to be able to get somebody on the phone within 15, 30 minutes, if we’re having a production issue that that is due to Databricks. So a little bit for the mechanics of the POC sprint. We had two weeks to set up the experiments and gather all the data. We had check-ins twice a week with the database context which ended up being a little bit more than that honestly. And then we had a daily standup for the two engineers that were working on this, and we had a Slack channel that was super helpful for checking in with them. So during the first week, we spent probably the whole week just doing setup stuff. So we did rote tooling to start clusters. We were tooling to launch shops from CLI and from Airflow.
We figured out how to install dependencies, our Conda dependencies. We figured out how to package and install a repo. And then we navigated a few set of issues that weren’t terrible, but were probably expected. Like using Conda, we had to make sure that for the experiment we wanted to run all the right Conda tendencies were installed on the cluster. Ran a few IAM issues with S3, just making sure that Databricks had the right access to our buckets and our data, and then figuring out connectivity to RDS from the database BBC and how to translate Spark configurations that we had set up on EMR to use them on Databricks.
So during the second week, we actually got to running the actual experiments. So we tested our Airflow integration. We ran in a notoriously unstable pipeline that had caused us a lot of problems in our EMR historically. And we did other things like bring up a very large cluster, which is something we don’t do often, but we do occasionally need to do. We wanted to make sure that if you want to bring up a 500 node cluster that Databricks can do that pretty easily. So it doesn’t like tips and gotchas, so around tooling and dependencies, so for writing a client to interact with the database API, we look for open source solutions, but didn’t find anything that worked really well out of the box. So we actually just wrote our own. In retrospect, it was actually a good decision because it only took a few hours to do and it gives all the control that we need around, the eight guys you can call on secret management and stuff like that.
The Databricks runtime for ML is what we ended up using for managing our Conda dependencies. And this is important because if you are a Python shop and you use Conda, database runtime for ML is probably the closest thing to your setup in the sense that the Databricks runtime can set up a Conda environment that you can install dependencies into. And it has a bunch of pre-packaged dependencies that you can get it to be pretty close to what your running environment is. Again, if you are a Python shop or you have a mono revoice, it’s one single get repo, it works really well to just set it up as a wheel and have Databricks distribute that to the cluster for you.
We tried a few different paths but ended up getting to that one. If I could do it again, we should do that right away. Just use a wheel. On the Airflow side, Airflow has a whole suite of third-party open source operators that you can use, including as a database operator. But the problem with using Databricks operator is that if you want to run a job outside of Airflow, now you have to build a second code path that executes that job. So we actually ended up not using the database operator at all, and just writing a small wrapper over a normal Python operator that can schedule the jobs on Databricks. And that’s the advantage. It hasn’t got a single code path, whether that’s coming from Airflow or from CLI for actually scheduling the jobs.
A third one that I would say is it’s a little bit vague, but embrace best practices. So one example is for this notoriously difficult pipeline to run, what the pipeline does is that it schedules one job per customer, which you could argue is a good or bad practice. We’ll probably get away from that. But because of that, it was running dozens of jobs all at once. So our first approach was going to be, let’s bring up a cluster, run all the jobs for the pipeline and then tear down the cluster. It turns out that’s not the Databricks way. The Databricks way is for each customer specific job, just run on its own job cluster. It turns out it’s a lot cheaper too, because you’re going through creating a cheaper type of cluster, which the job cluster over the all-purpose cluster.
But that’s just an example of things that if you have these sort of questions, lean on your Databricks reps early about these, because they’ll just tell you what the best practices and just do that unless it doesn’t work. Another one that I would say is to take into account migration time when you’re post POC. So instead of figuring out the contract, instead of making it January to January for us, we probably should have made it maybe February to March to give us a little bit more wiggle room for actually executing the migration.
So these are kind of like the wins that we assumed from the POC sprinter that we observed. We were thinking that from the operational overhead perspective, from all this time savings that we’re getting about maintaining all these clusters, we’re probably going to save at least half an inch day a week saved on operations. The influence of the roadmap, because we got a lot of things for free when moving to Databricks, we removed several edge weeks, if not months, from the roadmap. One example is we upgraded to Spark 3.0 when moving to Databricks. And that gave us a bunch of things that we would have had to do. It would have to be a whole project to be done in EMR probably. And then another example is we had this project to figure out how to use rock YARN cues correctly so we could have one big cluster instead of seven.
That’s a project we won’t have to do now that we’ve moved on to Databricks. So scale. So I mentioned that we had this pattern of having per customer jobs in a pipeline. Historically, we saw problems of scheduling all those jobs on the same YARN cluster because we’d have scheduling bottlenecks. There’s only a certain number of jobs you can run concurrently on a YARN cluster. So, some of these jobs actually failed because they couldn’t get any resources. So that problem kind of goes away because this shop is running on its own cluster. Similarly for resource intensive jobs, they run completely separate from any other jobs. They can’t impact any of those other jobs. So as long as AWS can, as long as you can request that many instances from AWS, you’ll be able to run those jobs.
So for cost, I mentioned this wasn’t a super high priority for us, but it actually ended up being a really big win. So that pipeline that we were running that was notoriously difficult to run, we saw a 50% cost reduction from moving it over to Databricks. And some of the reasons that we think that happened was because running long lived clusters, you’re spending money on on demand instances that you don’t necessarily need when running job clusters because they’re very short lived. So we can run most, basically everything except for the driver can run on spot. We saw less inefficiencies from auto scaling. And I mentioned that that was a problem that we needed solved early on. And you don’t need to worry about scaling down for example when the job is done because it just throws the cluster away and gives all those instances back to AWS right away.
And finally, we think that another driver of the cost reduction was the fact that we upgraded Spark and we were using the database run time version of Spark which we believe has some efficiencies that just made these jobs perform better in general. And the final win that we saw was around usability and troubleshooting. So one classic example was on YARN EMR. Sometimes a scaling policy would just freeze because it couldn’t get spot instances. And it was actually notoriously difficult to understand why this was happening and you couldn’t actually find a spot error anywhere. In the case of database, when it couldn’t get instances, it actually just bubbles up an error via API saying, coming directly from the AWS API saying that it couldn’t retrieve more spot instances because of the spot market. Another thing on usability is management of jobs. So, because YARN was preemptively killing jobs when they would breach memory limits, that does not happen on Databricks at all.
So if you had an occasional one task that used a little bit more memory that was just reaching and causing the job to die on YARN, that won’t happen on Databricks. So the trade-off is that you do need to be more, you need to monitor the metrics of the job a little more closely, but if what you care about is just job success, then that happens more on Databricks types of jobs. And yeah, and finally we said, no more of this maintenance burden of needing to go in and clean up disk on the master because all these clusters just go away when they’re done being used.
Okay. So now onto the final actual hard part which is executing the actual migration post-POC. So we had to do a few different things. We had to do a production database deployment to actually set up a production of UBC. We had to build a lot of tooling and measurement around cost specifically. We had to train our engineers to be used to this new Spark environment and getting them comfortable with it. And finally, we had to actually migrate all these data pipelines over to Databricks. So for the Databricks deployment, this is quick. We ended up going with a customer manager VPC because it ends up giving us a lot more control and gives the least possible permissions to Databricks that it needs to operate in your account. We set up a few IAM roles and just pretty standard and then set up VPC peering as well.
So for our tooling, we had a few different requirements. We wanted easy troubleshooting. We want an easy way to access jobs, the Spark UI logs for your schedule jobs and your ad hoc jobs, make that super easy. We want it to do zero deploy migrations, which means the way that we were scheduling jobs from Airflow for EMR was that we would schedule both the job parameters and the Spark configurations all from code in Airflow, which means that under that setup, we would have had to make a code change for every change Databricks. What we did was we ended up implementing some tooling for a zero-deploy migrations. And finally, we wanted to have some tooling for cost comparisons. We wanted to make sure that as we did the migration, we were seeing the same benefits that we saw during the POC.
So for this year-deploy migrations, we ended up writing a configuration framework, which made this way easier to roll out the actual migration. So the way this framework works is you would have all your job parameters, basically anything that is needed to execute a job in Databricks or an EMR would be stored in this YAML file, which I’ll show on the next slide. And that YAML file is then stored in the repo and it’s uploaded to S3 and then there’s an execution path where Airflow will download that file and then depending on the parameters, will schedule a job either on Databricks on EMR with the right Spark configurations, right job parameters, et cetera, et cetera. And the same tool you could plug into, there was another flow for doing it from your laptop. So if I want to rerun a job that gets executed from Airflow, I can rerun that job without needing to remember all the parameters that Airflow is using, with also some ability to override some of those parameters.
So what does it actually look like? So on the left is an example YAML file of a job running on Databricks, and on the right, it’s the same job but running on EMR. So as you can see some common parameters, and then we have these cost tracking tags which basically influence the way that Databricks tags the instances so we can do proper cost measurement and cost tracking later. And not shown here is the actual Spark comp parameters you could also change, if you wanted to change the Spark executed cores, you could do that as well via the framework.
So for cost measurement for easy to costs, we use Databricks tagging quite heavily. This is a really good advantage of having jobs clusters that spin up for the lifetime of the job and then die. Because if you tag that job with exactly what it’s doing, you can track costs to the dollar level for a pipeline or a job, and be able to slice and dice the EC2 costs that way even in the cost explorer in the AWS console. We also integrated the EC2 costs into our homegrown cost frameworks to get compared to the EMR costs a little bit easier. And then for DBU costs, we just did kind of what’s out of the box. We just enabled billable usage logs, which just means the database would drop the data of your DVU usage into an S3 bucket of your choosing and then you can use a notebook that’s already prebuilt to kind of slice and dice the DBU costs.
For training, we did a few internal presentations to kind of just show everyone why we were doing this and what the benefits were going to be. We did an interactive demo post-POC that everyone actually got set up on Databricks and it was quite an easy way to get a large batch of engineers onboard at all at once. And then obviously we wrote a lot of documentation to make it easy for people to understand the new flows. So as far as the execution, our goal was to migrate 90% of costs in Q1. Note that that’s different to 90% of pipelines in our case. We actually only had something like a dozen or maybe 20 jobs that accounted for more than 90% of costs with a long tail that we think is okay to just have running on a single EMR cluster and then as we move those, that final long tail will fully deprecate that last cluster.
And yeah, as far as mechanically how the migrations work, it would migrate a job first to the configuration framework and deploy that. So at that point, it’s reading from configuration, but still scheduling jobs on EMR. And then at that point you can move the Databricks to job, sorry, move the job to Databricks via config so just be an S3 push. And then if you want it to roll back, you just do it again and you didn’t have to deploy and you can kind of minimize downtime that way. And that’s basically the approach that we did for every single pipeline that we wanted to move.
So some tips and gotchas around the migration. So around Conda dependencies, something that ended up being a week or two of work that we were not expecting was that we were using very old versions of some ML packages that the Databricks runtime will upgrade and maintain for you which means that we were trying to use a very new Databricks run time with very old ML packages, and that caused some grief. We at some point thought we were going to need to download, sorry, downgrade the Databricks runtime. Luckily we were able to just upgrade some of our packages and make it work with the leader runtime time, which was the desired approach. But I would recommend trying to de-risk this very early. If I could go back to the POC, I would say, try and test jobs that really strain various ML packages and use a lot of those packages. For Spark job tuning, a lot of custom memory tuning that we needed to do on EMR for the most part, you don’t need to move it over to Databricks.
Because YARN preemptively kills all these containers like I mentioned before, that won’t happen on Databricks so the jobs actually won’t get killed for memory issue. You’ll just see it as like a badly performing job, which in our case was a little bit better because it’s okay if our job performs a little bit worse, but we don’t have to spend any operational overhead of having to adjust all these parameters all the time. So for the most part to increase memory in Spark jobs so far, we’ve only had to tweak Spark executer cores, and that’s worked out just fine. Something around scheduling jobs. So there’s two ways you can schedule job runs in Databricks. One is a run submit API and one’s a run now API. So the trade-off is the run submit incurs less overhead because you don’t need to pre-create a job before you do it.
The bad thing is it doesn’t tie into the permissioning model. So it’s kind of hard. If somebody’s submitting a job, you run submit, it’s hard for a lot of people to be able to see that job. So with the run now API, it does tie into the permission model, but you do incur that overhead of having to pre create a job. So we ended up, the hybrid model we ended up with I think works well, is that all of our ad hoc jobs are scheduled via run submit and all of our scheduled jobs are scheduled via the run now. And that’s worked pretty well. Something that was very helpful was cluster policies, specifically around creating our notebooks. So we’ve created a cluster policy that can create a single node machine, which is very, very similar to our legacy setup. And we can put in certain restrictions, like make sure that it auto terminates after two hours, et cetera, stuff like that.
And it makes it really easy for enforcement that people are doing the right thing and making it easy for people as well. And finally, we had some back and forth around how to actually implement connecting to RDS from our VBC. And specifically, it’s not connected to RDS, it’s connecting to an RDS in a different VPC. There’s a few different ways that, if you look through the documentation, they recommend doing this. Just do VPC peering. That’s actually really the only way that fully works well. And it took us a while to figure that out. And there’s a lot of detail around that, but that’s actually the best way to go about it.
So yeah. Now I just wanted to talk a little bit about why this is important for us and what this sets up for the future. So we kind of think that moving to Databricks is kind of killing 10 birds with one stone. Not only does it let us move off of EMR and replace our legacy notebook setups, that’s just the start, but now we think that we’re setting ourselves up for a modern data platform via the Lakehouse architecture. And being in a high-growth environment, saving a lot of time is the best thing you could possibly do because time is the most constrained resource at startup. So we’re aiming to have a single source of data in common formats, represented in Parquet and Delta and S3. We have storage completely decoupled from compute with different compute environments depending on the use case, right?
So data engineers run data pipelines on workspace clusters, ML engineers and data scientists use notebooks and sequence analytics. And we’re starting to build out our actual data warehouse on Databricks using DBT and SQL Analytics, it’s something that we’re really excited about. And it’s something that moving to Databricks made much faster to be able to do. So yeah, that’s it. If any of this was exciting to you, we’re definitely hiring across all different roles. Or if you just want to talk about the presentation or about similar problems, we’d love to. But yeah. Thank you so much.

Carlos Gasperi

Carlos is a staff software engineer and leads Data Platform at Abnormal Security, a cybersecurity company using ML to stop sophisticated email threats. Prior to joining Abnormal, Carlos worked on high...
Read more