From Hadoop to Delta Lake and Glue for Streaming and Batch

The modern data customer wants data now. Batch workloads are not going anywhere, but at Scribd the future of our data platform requires more and more streaming data sets. As such our new data platform built around AWS, Delta Lake, and Databricks must simultaneously support hundreds of batch workloads, in addition to dozens of new data streams, stream processing, and stream/ad-hoc workloads.

In this session we will share the progress of our transition into a streaming cloud-based data platform, and how some key technology decisions like adopting Delta Lake have unlocked previously unknown capabilities our internal customers enjoy. In the process, we’ll share some of the pitfalls and caveats from what we have learned along the way, which will help your organization adopt more data streams in the future.

Speakers: R Tyler Croy and Brian Dirking

Transcript

– Welcome to my session. I’m Tyler Croy. I’m here to share some of the lessons that we’ve learned at Scribd as we’ve built out our new data platform that’s been very heavily oriented around streaming. I wanna make sure that before we get into this talk, I’m assuming that you understand a little bit of what Delta Lake is at maybe a high level, what Apache Spark is obviously, and a little bit about what Databricks as a platform offers. I should also mention that this talk is based on the experiences that we’ve had at Scribd in building out this platform. I don’t necessarily think that this is the perfect way to do it. I’m not saying that this is the only and the right way to do it. Most of what I’m trying to convey during the session is that this is how we’ve built things out at Scribd. So I am the director of Platform Engineering here at Scribd. I lead two teams, the Data Engineering Team, and a team called Core Platform. And a lot of what we do is build out infrastructure to support our data platform and the tools that sit on top of it. Part of the reason why I wanted to give this talk is my perspective is a little bit different than I think a lot of people that come into, I would say data platform infrastructure, data engineering teams in that this is not where most of my professional background is. Most of my professional background is actually in developing backend web services and data services that are serving production end users, think high throughput API services. And so I bring I’d say a fairly production-oriented mindset to the work that we do on the data platform here at Scribd. As a company, Scribd is arguably one of the largest digital libraries on the planet. We have been around for 12 or 13 years. And in that time we’ve accumulated a lot of really great books and audio books, and users have uploaded a tremendous amount of content, whether that’s scientific papers, manuals for toasters and things like that, or other legal documents, and all sorts of other interesting things that you might find in a sort of PDF or presentation type form. And what that means for us as a company is that we have a really rich data set that we’re hoping to use to achieve our underlying mission in all of this which is to change the way that the world reads. And for us, that means bringing the best contents that we can possibly find that’s gonna be the most interesting, the most relevant to our end users. And in order to power that, we really need a very strong, very robust data platform underpinning that. And that’s where the Platform Engineering Team comes in which is the organization that I lead. At a high level we’re not actually delivering services that are servicing scribd.com users. What we’re doing is really delivering internal data services that other teams like the Recommendations Team or the Applied Research Team or Business Analytics, Research, building tools for them so that they can build out more infrastructure. I wanted to make sure that I also highlighted a number of the individuals in the Platform Engineering organization whose work I’m really presenting here. Very little of what’s in this presentation are my ideas per se or definitely not my implementations in a lot of cases. So the list of people that you see here on the screen, those are the individuals that helped make the real-time data platform a reality here at Scribd. So in order to talk about the real-time data platform, I really should start by explaining where we began with our data platform. My history at Scribd, I’ve only been here since January of 2019, which honestly it’s felt a lot longer than that. But when I joined Scribd, we had a very batch-oriented, we had a very batch-oriented data platform. And what that meant for us is that we have, and frankly this is still a system that’s in place in a lot of ways, very, very different tools that have been adopted organically over the years running on this one big batch data platform. We have two versions of Spark. So we still have Spark 1 running around which I’m really looking forward to getting rid of. We use Hive very heavily. We have some internal customers that are relying on Impala. And probably the worst offender is we have some Ruby scripts that are running around that are querying Hive and HDFS directly. And all of this really comes together to make for a very difficult to manage and difficult to scale up data platform. But the tools themselves are not the biggest problem with our data platform. When we run our batches, we’re really only running them nightly. So that means for our data customers that they’re getting their data tomorrow in the best case. If there’s been any issues, say, one of those sort of poorly developed Ruby scripts fails and causes a lot of downstream jobs to fail, then we might have to run a recovery and then they might get their data 24 or 48 hours up to 72 hours in some cases where we’ve had significant problems from now. I should also mention that we’re also on top of an on-premise data center for this data platform. We’ve been moving things to AWS over the last six to eight months, but the legacy, I’ll call it, batch data platform is still on-premise which means we’re also running a lot of infrastructure that we’re not very good at running. For our users, this also means that there’s a lot of contention for the resources that we have in that data platform as well. So while we have the nightly running, nightly is what we call our batch jobs, while we have the nightly running, that means that ad hoc users or development users are competing with cluster resources at the same time, and that causes a lot of problems for us. And when I joined Scribd, I decided I didn’t wanna ask sort of what could we improve with this infrastructure, partially because when all you’ve seen is batch, what’s improvement to you might be faster batch, and I didn’t think that that was the direction we needed to go in. So I started to ask people what was the, like what can’t you do based on the performance or the tooling that we have. And what came back in a lot of cases was we can’t work with data or I can’t get answers in minutes or hours based on questions that I’m posing of our data which is a big problem. Underneath all of this data platform, what I consider the most important part of any data platform is the storage layer. And in our case, that meant HDFS. And our HDFS infrastructure, we had, like last time I counted, almost 60 to 70% of our files in HDFS were small files, so that means the files were significantly smaller than the HDFS block size. And if you’re not familiar with the small files problem in HDFS, basically it totally kills any performance that you could possibly get out of the cluster because for all of your reads that jobs have to do, they have to collect a tremendous amount of data. Our infrastructure that’s deployed in this on-premise environment, in some machines we will have 10 gigabit NICs, so 10 gigabit network interfaces, and in others we’ll have 25 gigabit network interfaces. During the nightly, we are saturating those interfaces between some of these nodes. When we started to actually survey some of the data last year that we had in HDFS, we also found that it was a collection of very, very different file types. So in some cases we were actually writing Parquet, and in some cases we were compressing files very well. But for the most part, we had RC file, formatted files, and even plain text files floating around in unoptimized forms. And all of this came together to really, I would say, adversely affect the performance of the data platform. And when we started to talk more about streaming and streaming workloads, it was just inconceivable that we could develop a streaming platform on top of this underlying storage layer. So last year we started to work on a prototype of what a new streaming infrastructure would look like, what that would look like in AWS. And we’ve started to look at a lot of different pieces of infrastructure that we hadn’t worked with before but we had a feeling that they might solve our problems. Central to I think any streaming platform is going to be Apache Kafka, which is if you’re not familiar with Apache Kafka, go check it out. It is by far one of the most useful pieces of technology in any streaming infrastructure from my perspective. But we were also uncertain on how much arc was gonna be part of our world versus Kafka streams or any other stream processing tool. We knew that we were gonna be putting this on top of S3 because we were deploying into AWS and S3 is certainly a great storage layer underneath that, but we also wanted to adopt AWS Glue Catalog, which is basically in Hive you’ve got HCatalog, AWS Glue Catalog is basically the same thing except it’s run by AWS. Part of the reason we wanted to do this is by running Glue Catalog, we were confident we would be able to integrate with AWS Athena or Presto or any other tools that sort of live in the AWS Glue or EMR environment. So the team spent some time working on a prototype that pulled some of these pieces of technology together. And this is a diagram that I shared with some of our internal customers around I would say June-July of last year for our prototype implementation. Now if you watch or if you look at the, not sure top left or top right for you, but for me it’s top left, if you look at the top left, we actually tried to use one data stream, so in our case, mobile analytics, and sort of float, take that and make that go all the way through a streaming data platform where we would do inline processing, this batch and validation, or debatching, excuse me, and validation workloads, and then bring that back into Kafka. So we had done our initial processing on it and then implemented a number of archival tasks to get that into S3. And our expectation at the time was that ad hoc workloads and Spark streaming workloads that needed to get streaming data were going to be hitting Kafka, and anything that needed to hit, like to work in a batch mode would hit S3 directly. And so there were some complications here I would say. And it really like when you squinted at it, it looked pretty good, like it showed promise. There was a lot of problems that we had with the prototype though. Probably the biggest one is we had a lot of questions and unanswered problems around how we would deal with the consistency model in S3. S3 is eventually consistent which is very important to know if you’re building on top of S3. What that means is if you have one job writing objects into a bucket, if it writes an object, it can read that object out immediately, but if another object is, or another job, excuse me, is doing a list operation or trying to query that bucket, it may not see writes as they happen. So when you’re having a series of jobs that are depending on data being written consistently to the bucket at a certain time and then the next job takes over, there can be problems there. S3Guard is a common tool that people will use to solve this. We didn’t end up deploying that because we basically found a better solution which I’ll talk about in a second. But we also had a couple of problems with the difference between stream data and batch data. So this model of all streaming consumers will just have to go to Kafka to get their data meant that we would have maybe about a seven-day window of streaming data that’s available, and if you don’t read it from there, you have to go to the batch archival data store which was S3. And that sort of like split our work or split our implementation to where we’d have to have any streaming application would have to be able to work with both. So sometime after we had finished our prototype, we discovered Delta Lake. And if you’re not familiar with Delta Lake, it’s actually pretty simple conceptually. It’s Parquet files plus a transaction log. And for us, both of those are sitting on top of S3. And when we look at it for our uses, it really actually simplified a lot of things. And because storage for us is the foundation of our data platform, what that meant was that we had a very, very strong place to start from with our streaming data platform. The number one thing that we get with Delta Lake which I think is a fantastic feature of Delta Lake are the transactions. That transaction log really solves some of your data consistency problems. And what that allows us to do is we can have one job writing completed transaction, and then another job can come along and read the transaction log and see when data has been written in order to pick up after that, after that data has been written. And when we were looking at solving this for ourself, we were sort of looking up at this big giant problem and thinking, oh crap, we’re gonna have to spend a lot of time figuring this out. Delta Lake gives us that for free which is pretty fantastic. There’s also this optimize command which I think is one of like the hidden bonus features of Delta Lake that really changes the game when you adopt it. When you think about a streaming application, a streaming application is gonna be bringing data sets in, and we’ll go from a Spark streaming perspective, we could take one batch of data and then we write it into S3. So we’ve got one little Parquet file, say our batches every minute, we’re writing every minute, we’re writing another little Parquet file. When another job comes in and reads from that Parquet or from that S3 bucket, it’s going to see hundreds or thousands of little Parquet files. The optimize command allows us to basically start a transaction, take a bunch of those files, smash them together, and create a new file that is optimized, joined together, and it basically removes any small files problem that you might have which when you’re building on top of S3 and when you’re streaming data into those buckets, that’s actually very, very important. For us, what that has meant is that the small files problem disappeared, and we also didn’t have to think about it at all. So just by adopting Delta Lake as the foundation for our platform, we had one of our biggest problems with our existing data platform just go away entirely. The performance of Delta Lake is also something that should be mentioned. Because Delta Lake is responsible for writing every file, there are always compressed Parquet, which for us where we had this cacophony of different files floating around in HDFS, that’s a huge benefit for us, but it also means that you’re getting probably the most bang for your buck out of the storage and the performance that you get out of that storage because it’s optimized all the time or your optimized writes. It’s also important to mention that every write is a new addition, or every time you do a mutation, it’s a write, it’s not an update, so you’re not updating files in place, which means that we can just stream data into Delta Lake as fast as we possibly can, and I’ll show you an example in a little while, and it all just works fine, there’s no problem writing a lot of data into S3 for us which is fantastic from a performance standpoint. One of the features we didn’t know about or we didn’t fully appreciate I would say when we started with Delta Lake was being a streaming sink and a streaming source. What this means for us is that we can use the same table to stream data into and have all of the downstream jobs that might also need to access that stream data feed directly off of that Delta table. Rather than having data come into a Spark streaming job and then go back out to Kafka and then come back to another Spark streaming job and then go out to Kafka, we’re basically cascading data through a series of Delta Lake tables and our n plus one streaming jobs are just feeding off of Delta tables as opposed to everybody going to Kafka which is pretty fantastic. And what this means for us in a very real practical sense is that we have one table, one table to rule them all, that sort of a thing, but we have one table that all of our streaming consumers can feed off of and all of our batch workloads. So this is an example of a data stream that I’ll demonstrate a little bit more in detail later, but this is me running a query to grab data, grab today’s data, so data that’s been streaming in. And I’m creating logs out of Fastly in this case. And when I look at that table, I can get the latest information, but I can also go back in time. So this is another query where I’m just going back to January and grabbing data that has been written back in January. And for our users, that means they’re always going to one consistent place for their data. They don’t have to figure out if I’m going to one place for stream versus a batch or a historical data lookup, which for me that’s incredible. But I should mention that Delta Lake is not perfect. It’s certainly got some caveats. And a lot of the caveats that we’ve encountered come into play when you’re working with Delta Lake as a streaming source. Compaction, for example, can cause downstream consumers to receive events again. There’s also some cases where if you optimize on a merged target table, you might have transaction conflicts going on. There’s some of these examples, I’ll make sure that we post on the Scribd Tech Blog to give you a little bit more detail. But suffice it to say that if you’re using streaming sources, definitely reading your streaming sources and sinks as part of your streaming architecture, definitely read the documentation thoroughly and pay attention to some of the caveats and be very aware of where you might have concurrent writes between different jobs to the same Delta Lake table. There’s some caveats that you need to be aware of there. But with all that said, I’m still a huge fan of Delta Lake. It’s by far been a tremendous piece of technology for us to build our streaming data platform on top of. The other half of the equation for us, however, is Databricks, which we’re a Databricks customer. We evaluated EMR as part of our work last year and then we discovered Databricks. And there’s a lot of great things that I can say about Databricks. It has definitely solved a lot of the problems that we both knew we had and that we didn’t know we had. When we were originally thinking about what our data platform was going to look like, we were thinking about a production data platform, so something where jobs were running that developers weren’t accessing. We weren’t really thinking about the development story. And the notebooks and some of the collaboration that we can do inside of Databricks, excuse me, that actually accelerated some of our development as we began to launch streaming workloads into Databricks, which was an unexpected win which I’m pretty happy with. I also wanna call out Delta caching because Delta caching was not a feature that we knew about until we really started to get invested in the Databricks platform and Delta Lake. When we were imagining our EMR environment, we knew that we had poorly performing jobs, we knew that we had a lot of jobs that frankly were not doing optimized reads from their storage, I mean obviously with our HDFS storage, but they weren’t doing optimized reads. And so we were considering some approaches where we would launch EMR clusters with a local HDFS installation inside that EMR cluster to perform basically local caching for the cluster while it was doing its work. So we would load data in from S3 to HDFS, do a lot of the work inside of the cluster and then report our results back out into S3. Delta caching is effectively that, but it’s a check box that you check when you launch a cluster which is fantastic. And it actually makes not only some poorly performing jobs run faster, but it also helps a tremendous amount with high concurrency clusters where you might have a lot of different jobs running at the same time, but also ad hoc workloads. We have some clusters that ad hoc users are basically sharing, and the Delta caching ensures that if you have let’s say 10 data scientists that are all pulling a couple of common tables from Delta Lake, that we’re caching that in the cluster and it gives them faster results while they’re doing their work which is pretty phenomenal. The challenges with Databricks though, and I don’t wanna spend too much time talking about every specific one, because some of these are on roadmaps with the Product Team at Databricks, and we’ve had a very good relationship in giving them feedback and them giving us some help on how we structure our streaming platform. But the big one that I wanna talk about is really monitoring of production workloads which is where if you’re deploying a streaming platform and streaming infrastructure on top of Databricks, that’s probably where you’re gonna have the most work to do that you might not be anticipating. Out of the box, Databricks doesn’t really support you. There’s no turnkey integrations for metrics getting out of Databricks or logs getting out of Databricks or sending alerts. So in our world, we think about Datadog for metrics and logs, we think about Sentry for exceptions and things like that, and we think about PagerDuty for alerting. In order for us to get what we need out of the platform and get what we need for these production streaming jobs, we have to implement all of that ourselves which was not something we were hoping to do but that’s where we are. I know that Databricks is going to be addressing this in the future, but right now just about the extent of what you get is this, it’s send an email alert when something goes wrong, which is not nothing but it’s not great either. If you come from the perspective of production readiness or if you talk with your infrastructure team about what they consider requirements for a production service, there’s just a lot of things that you have to go build yourself if you’re deploying streaming workloads on top of the Databricks platform. In all fairness, we were gonna have to build those ourselves if we went over to EMR or any other streaming infrastructure that we might deploy, but these are things that you should be aware of if you’re gonna be building out infrastructure on top of Databricks. All of that said, however, still a great, great piece of technology that we’ve built on top of. The combination of Delta Lake and Databricks has actually unlocked a lot of workloads, and a lot of features that we weren’t originally planning that have already accelerated the work that some of our internal data customers have been doing. And that is a fantastic win for us. So I wanna give you an example of one of the streaming pipelines that we have up in production right now, and this is using Delta Lake and running in Databricks literally right now. So this streaming pipeline, it’s different from the example that I showed you before when we did our prototype which was using mobile analytics events. This example is a production pipeline where we were pulling in logs from Fastly which is fronting a lot of our scribd.com traffic, and getting that in to do analytics work and store logs to make those available to internal customers that need those logs and compute ratings and some other stuff that isn’t really relevant to this slide. But suffice it to say it’s a very important data source, but because it is access logs from our production, our production web endpoint, the rate of data that comes through this is maybe I’m gonna guesstimate between four and 6,000 logs per second. So it’s a very high throughput pipeline. We bring that data into Kafka directly. For us, any streaming data source goes to Kafka first and then we work with it from there. Our first hop after that is a streaming job that is basically taking data from Kafka and mapping that into a Delta Lake table. This is a fairly simple thing to do but it does require some mapping of what one schema has to the Delta Lake or the Delta tables, table schema, which is important. When we write that data into this Fastly table, this is the logs Fastly table in blue, we actually decided to set up additional streaming jobs off of the Delta Lake table directly. So this view logs table is reading from the Fastly, or view logs jobs, excuse me, is reading from the Fastly table, and then that’s doing some computation and persisting those results into a view logs table. And there’s another streaming job that is waiting for results, computed results on that view logs table and so on and so forth. All the while these two tables are accessible to ad hoc workloads that can basically query, create information as it’s coming in. But one thing I should mention and I think this is an important thing to consider for any streaming workload going into Delta Lake is you need to run optimize. At some point you have to run optimize. And in our case we have a batch job that is triggered by Airflow which is periodically optimizing some of these tables. This is something where you sort of have to play with it to see what the right cadence is for optimizing depending on your own query times or creative needs, but you have to come in and optimize to make sure that you’re getting the best performance from your reads as you’re streaming data into those tables. I wanna call back to the queries I was showing you earlier, I didn’t point this out at the time because I wanted to explain the data pipeline. If you’ll note the time difference between when the data was queried and when it was actually persisted into Delta Lake, that is a nine-second difference. And so a log generated in Fastly is showing up for our customers in a Delta table in nine seconds which is phenomenal which means if you think about a customer that was here 12 months ago, they were waiting 24 plus hours to get results, we can start to give them results within seconds. For me, it’s still sort of mind-blowing, but for the users of this data, they never anticipated this, they’re over the moon happy. I can’t tell you how happy some of these users are. And one of the things that is kind of funny about this query to me, and I wanted to point this out as well, is if you look at the bottom left of the cell in the notebook, the actual query took 23 seconds. The time it takes for us to run the query against the Delta table is actually longer than it takes us to get this data pipeline running to bring the data in because the writes are really, really fast. The reads aren’t as fast because this table has not yet been optimized. And so the batch workload that I or not the batch workload, excuse me, the historical workload that I showed you earlier where we were looking at data from January, that got data in 20 seconds instead of 23 or 30 seconds which is about the norm for accessing this live data here. You might see a difference in how fast your data shows up in Delta Lake depending on the trigger interval and the Spark streaming job. So you can configure this if you have more real time requirements. You can set no trigger interval which is what we have here or if every minute is fine or whatever is appropriate for your organization, you can certainly change that yourself, but for us, it was no trigger interval. And that’s all running in production right now which I cannot tell you how proud I am of the work that we’ve done to get from you’ll get your data tomorrow to 10 seconds from the data being generated, you have it. So with that said, it’s super awesome. I can’t tell you enough good things about Delta Lake and Databricks, but I wanted to talk a little bit about where we’re going next because that’s just one data stream and we have so much more work to do in our streaming platform. We actually have a lot of different streams in our infrastructure, and we actually are bringing a lot into Kafka, but we’re pulling them out of Kafka in this nightly batch process. And so over the next few months, we’re gonna be moving all of those into a very similar looking streaming pipeline going into Delta tables with Spark streaming job, which we’re fairly confident it’s gonna change the equation for a lot of our data customers on how they access the data and how they work with it and what they can do. When we start to bring a lot more of these streaming workloads into Spark streaming and into Delta Lake, we also have to do a lot more tuning. When you’re running an optimize, you’re basically, in our case, we’re spinning up a Spark cluster that is running the optimize command on the Delta table, and that is costing money. So we have to figure out what’s the right balance of running optimize too frequently versus not frequently enough to give us the performance that we want out of this. And we also don’t have a good sense right now on how to structure our streaming jobs themselves, whether a single topic in Kafka should map to a single streaming job or whether we should co-locate a number of different data streams into one Spark streaming job that’s running in one Spark cluster inside of Databricks. But probably most important is there’s more production work or productionization if you will. And this is what I was alluding to earlier, the stuff that you kind of have to do yourself. I am not pleased with the amount of production readiness that we have in our current Spark streaming pipeline, so a lot of the work that we have lined up over the next few months is basically hardening that and making it more resistant to failures and more resistance to data problems or anything like that. Overall I think Delta Lake and Databricks are a phenomenal choice if you’re building out a streaming data platform or if you’re building out batch workloads. Both of them together allow us to have the flexibility to do both. And that’s probably the most important takeaway is the future for us is a very, very streaming-oriented, but our batch or periodic workloads, they don’t go away. There’s minor things that you’re gonna have to keep in mind and maybe do some manual implementations or do some things yourself that you’re not going to get for free from either Delta Lake or Spark running on top of Databricks. But when you take the whole picture together, some of the wins that you get from adopting this platform are just insane. They’re crazy big wins. I can’t even tell you how transformative Delta Lake and Databricks alone have been for the way we view data internally. All of a sudden we have people who are very aggressively getting into our data and starting to think, well, what other questions can I answer? I used to be able to get results in two days so I didn’t ask too many questions, and now they can get results in seconds or minutes. And so we have already started to see usage of our data infrastructure go up and up and up over time because people can, all of a sudden, ask one question, get an answer, ask another question, get an answer, and go through a really tight development cycle as they’re developing machine learning models, AB tests, or anything that’s involving the data that we’re working with here at Scribd. That said, if you go to tech.scribd.com, you’ll find a little bit more that we’ve written up about our streaming data platform and about the work that we do here in Platform Engineering. And we are also hiring. So if you’re curious about joining the Scribd team, you can find out more on the tech blog as well. But with that said, thank you for your time.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About R Tyler Croy

Scribd

R. Tyler Croy leads the Platform Engineering organization at Scribd and has been an open source developer for over 14 years. His open source work has been in the FreeBSD, Python, Ruby, Puppet, Jenkins, and now Delta Lake communities. The Platform Engineering team at Scribd has invested heavily in Delta and has been building new open source projects to expand the reach of Delta Lake across the organization.

About Brian Dirking

Databricks

Brian Dirking has been in the technology industry for over 20 years, in roles including product management, product marketing, business development and partner marketing. He regularly speaks at conferences and public events, and writes blog posts and whitepapers on analytics. He has previously worked at Stellent, Oracle, Box, and Alteryx.