Upserting millions of events per day into Delta Lake using Spark Streaming and Autoloader.
Gousto is the leading recipe box company in the UK. Every day we have to keep track of a huge amount of ingredients flowing through our warehouse until they are shipped to customers. In this talk, Gousto’s Data Engineers will describe the challenges faced and the solutions found to merge the incoming stream of inventory events into Delta Tables. Come to hear about the bumps along the way, and to discover the tweaks implemented to improve merge performance. Today Gousto has real-time insight into the flow of ingredients through its supply chain, enabling a smarter, more optimised measure of its inventory performance.
Speakers: Eoin O’Flanagan and Andre Sionek
– Hello everyone. My name is Andrea Sionek. I’m data engineer at Gousto, and I’m here with Eoin, lead data engineer, here at Gousto And we are going to talk about how we are building a real-time supply chain here at Gousto, And this involves how we deal with a large amount of data to perform, merge in absurd into our Delta Lake, to track ingredients to our supply chain, and get benefits for your business with this.
– So just to tell you that Gousto, Gousto are the leading recipe box subscription company in the UK, and we have about 15 new recipes for weeks. So the way it works is you go in, you pick the recipes you want, and you get a box every week as part of the subscription. We describe ourselves as a data company that loves food. So everything we do has data at the heart of it. We drive all our decisions through data and it’s a key part of our business. The box gives you everything you need for the recipe. So all the ingredients are pre kind of prepared, pre-sorted and you just get to focus on the cooking. To make this happen, we have a fulfillment center, where the boxes are literally, the ingredients are picked and put into each box. And that is a key part of our business, if that works well, that ensures we are operating well and enabling our customers to have a great product. We’re looking to move to a multi-site model. So because of that, our supply chain that drives the creation of these boxes. So that means opening new factories. And because of that, the supply chain is super critical to our company. So we had a recent initiative internally to improve the performance of our supply chain by creating a whole set of key KPIs that help us look at our operational performance and how our supply chain is processing. So, what does that mean? Well, our supply chain metrics, some of the idea, some of the metrics behind this, we’re focusing in online throughput, how many, literally how many boxes are we processing through our factory per hour, per 15 minutes? We can identify bottlenecks through that. Also, we look at our picking stations. So do we have the right ingredients available there at the picking stations, put in the shelf, as they’re put into the box from the shelf, and are they performing efficiently? Is there some process or performance issue happening there that we need to try to address? Obviously we need to be able to order and understand what we’re going to need to order, for our boxes in the coming weeks and months. So we need to do forecasting. Our forecast accuracy is key and the data from these systems drive that. And lastly, well, one of the last areas is our actual stock performance. So that is the kind of upstream of our picking stations, how to stock it into our factory. How is it move through our factory? And then how is it doing to get into the boxes and the tasks we need to create to drive them. This talk will mainly be focusing on the stock performance please. And, you know, that’s been it’s a key technical challenge for us. And so that’s what we’re gonna talk about in this. So, What does it mean? What does stock performance actually means? So, as you can imagine, our ingredients arrive on a lorry in a bunch of pallets. Those pallets are received, their quality checks, They’re checked for food safety, that any of the stored or staged or both, you know, stored then staged, they’re broken down even further from they go pallets, they’re stored as boxes, they’re broken down even further to individual ingredients. And then those individual ingredients are going into the box. So that generates a lot of events for our systems to try and track. As you can see, each time we break something down from one to many, we do all the various quality control checks. We do a box, goes from, you know, one box to any ingredients or packet green beans or individual packets of chicken breasts or whatever that means. So it generates a huge amount of data for us to keep track of. So, we had a technical problem to solve. we had a requirement from our business, to help them track this data. And the key requirement we had was to generate a real-time dashboard from this data we keep KPIs on it. That dashboard had to have a 15 or less than 15 minute lag. So pretty close to real-time reporting. We had to capture all of the ingredient events through that line that I just described, and that equates to more than 8 million events a day. So, not that big in big data terms, but quite big to do in that 15 minute time window. And a key thing, we’re still a startup, We worry about our costs. So this thing needed to be cost effective to run. Thinking about the solutions behind that, we knew we needed to build a real-time change data capture pipeline. We have break shift in the business, We have DBT. So we’re very familiar with RDBMS systems and SQL and in the kind of old world, you would do these kinds of upsets and change data capture type processes in that. But really they proved to, they are very expensive, very easy to code, very easy to write the SQL behind it, but expensive at scale. And also for that real-time use case realistically too slow. In previous roles, or a couple of years ago, you might’ve looked at using spark and parquet, but as we’ll kind of describe later on in this talk, doing a change data capture pipeline and spark streaming, and without some other kind of technologies in the loop, is pretty tricky. So what we really zeroed in on, is would Delta like help us work for this use case.
– Okay. So I’m going to describe to you how our current data pipeline works and what we deal with already with Databricks. So we have data being ingested from different sources. You can take employee management where we control shifts and like employees leak to stations and everyone working in the factory. We have also data that describes how boxes are moving to the lines and each stations they are going in, which items are being searched in each of those boxes. So all this data is already being ingested with different methods, Lambda functions, DMS. We have like different approaches for this, and they are ingested into our S3 bucket roll here and in the street. And those files are either a CSV, parking J zone, It doesn’t really matter. It’s just a roll here. And then we have a data bricks job, running in the Data bricks cluster that loads this data and transforms it into a Delta table in the cook layer. So you might find a little bit funny the name here, but we’re a company that works with food. So our data lakes, they are called raw, cooked, and served. So it’s kind of different from the conventional approach, but yeah, the idea is the same. So this data is ingested from raw to cooked using auto-loader. So what happens is, when a new file arrives into a roll bucket, it sends a notification to auto-loader, or actually auto-loader does all this work for us. It does the wiring between SMS, SQS, and bucket notification. So it received this notification and it sends to our streaming inspired Danny, Hey, there is a new file here to be processed. And then we process this file, we applied the transformations we did, and we save this as Delta Indigo player. And here, this process is a pain only. So if you think about this job in terms of coding, it’s quite simple, is read from stream, apply some transformations and save, and that’s it. We don’t need anything else. And it works as a charm. It works really well. And this data is in the middle of course, to kind of give all the schemers and support the stables’ fee to serve from the stables. And then the last bit is we serve this data to analysts, data scientists using Redshift spectrum. So recently AWS released a new integration between Redshift and Delta leaks. So this one is much easier to do now, but anyway, we are doing this using spectrum. So then we have DBT running SICO in Redshift. So we get all this raw data that is stored in cook layer in Delta tables, we applied the business logic. We apply all the transformation we need, and we have a model layer. And this layer is actually what is exposed to the business users. And this is where we get all the value. So here we do all the joins. We do all the filters we did, and we exposed this. So our greatest challenge here is, okay, how do we move from this attained only architecture to I streaming introduced streaming in change data capture. So now we are not just receiving new events. We are also receiving updates, inserts, and deletes. So how do we apply this? And this is what I want to discuss with you. And like the drawbacks we handle them the way and the solutions we found. So if you think about this before Delta and this was really painful to do so we basically had to load the target table in memory, we had to load the change data We’re receiving also in memory, do the merge between the two tables and then override the target table. So this means there’s a lot of overhead in processing power, and it’s very is low. We need a lot of memory and we need a lot of time doing IO between files and saving files back because we’re saving the whole table. So this approach, it works for small files, but it doesn’t work at scale. Then if you go to Delta, we have a similar approach, but much easier in terms of coding, we receive change data, We apply Delta merge and we write it to Delta table. So this was our first attempt to do CDC in sparks streaming. And it works. However, when we tried to merge millions of rows, this was really, as though it was unpractical, we couldn’t go to production and actually we deployed this to production and we had to roll back because it was really slow. We increased the cluster size, but it didn’t really help too much. So we roll back and we went back to the drawing board to understand how we could improve this. And going back to the drawing board, we also went back to the documentation to see what else we could do to improve this. And we found about doing partition pruning, but when do we emerge? So the idea here is that we want to minimize the scan on the targets Delta table. So it doesn’t, we don’t have to scan the whole table every time we’re doing a merge or like update. So the real challenge here was to find the right partitions for each table. We’re grading because we have to think about practitioners in a way that they minimize the scan, and we don’t scan the whole table every time we have new events coming. So then the approach is after that, the approach is basically the same. We use Delta API to perform a merge, but here, the difference is that we do a merge on the partition keys. This means we receive the change data that is kind of applying changes to just a few partitions. And then we just have to read and apply the changes, to those same few partitions in the target table. And the idea here is, we don’t have to rewrite the whole table, We just override the partition and we’re then should the partition. So it works, but it was still using a lot of compute resource when we compare to our current pipeline, that one that was append only. So we still had to increase our cluster size. So it wasn’t really good. We are concerned about the cost as well. So after that, we started investigating a little more, how we could improve, and the solution was actually quite simple. The only thing we had to do was to upgrade Databricks to runtime seven. And with this, we got spark 3. And the main benefits here were the partition pruning, the dynamic partition pruning that we got with spark3 and also a lot of improvements in Delta Cache. So those two things combined, they helped us to deliver these data in real-time. With much less overhead in terms of resources. So here’s our new architecture. And if you see, this is quite similar to the previous one that I showed you, the big difference here is that now we have warehouse management data being ingested, and this data is not append only. This is merge. This is CDC data change, data capture. So here we are doing updates, inserts, and digits. And the cool stuff here is that our architecture is exactly the same by using Databricks auto-loader and spark training. We could use the same architecture, the same technologist, and just extend the functionality to add CDC and add merge. So this was really nice and it was really good for the engineering team, the data engineering team that didn’t have to write a new process or refactor code. It was really simple and really straightforward to implement this. And also like no CICB changes anything we could use as it was. So this was a really huge win for us as well.
– So what are the key benefits of enabling this? Well, I mean, the main thing, or a big thing is this as hugely resource and cost effective. If you think about, you know, in previous roles having to implement streaming jobs at this scale, you would have had a fairly, meaty cluster trying to run it, and right now we have 18 of these streaming jobs on a single cluster, a very small cluster of two to four of the smallest Delta cache enabled nodes in AWS, the I 3X largest. And you can see it scales up and down only when needed, which is another amazing benefit of working with Databricks. And in that sidebar, you can see, we only really have a need the three nodes, very rarely if we had to go up to four. The other real benefit for us as engineers. I mean, this it’s fast. So it meets our other requirement of getting the data through within 15 minutes, it gets it through much faster then it gives us plenty of time to run our DBT modeling, to make sure our metrics are performing well on those dashboards. So, this happens in a matter of minutes, which it then gives us that 15 minute or 10 minute window to get that data properly modeled in Redshift. It’s very few lines of code compared to similar systems I’ve worked on before. So we like that. It’s very maintainable, a single merge statement is kind of the bulk of it. So, really easy to support, really easy for us to run. And lastly, it’s really stable. We’ve deployed this, it’s been running for a hundreds of hours already, and we’ve barely need to touch it. It’s, you know, you can see on that batch three per graph there, it just stays flat and runs itself. So really nice for our engineering team, really low support. And obviously we met the business requirement. The other thing though, that the bigger thing, the real business benefit to Gousto is this is kind of been the missing piece, in our kind of KPIs around our supply chain data. So, joining that with align trooper, with the picking performance forecasting, and now having that real insight into our stock system and even down to an individual ingredient, how that moves through the line gives us amazing insight into our supply chain, which means better fulfillment and a better customer experience. It also really sets us up well for the future. We’re opening new factories, very exciting, great to be part of a company that’s growing and every solution we’re building now, we wanna keep that in mind. So this thing we know we can scale it. We know it will scale, We’re using so little resource now, we know it will scale fairly linearly, and that’s really gonna set us up well for the future. And that’s it. That’s all we wanna talk about. Thank you very much.
Eoin O'Flanagan has more than 12 years experience in data and has held a variety of data roles across multiple industries including financial services, social media analytics and retail. He is currently Lead Data Engineer at Gousto, where he is enjoying finally bringing some of his passion for food to his place of work.
Andre Sionek worked for different companies in Brazil before moving to London to work as a Data Engineer at Gousto. His past experiences include financial services, e-commerce and real estate. He regularly teaches data engineering courses and also writes about data for his blog.