Creating an 86,000 Hour Speech Dataset with Apache Spark and TPUs

May 26, 2021 11:30 AM (PT)

Download Slides

As part of its machine learning benchmarking efforts, MLCommons (mlcommons.org) has built an 86,000 hour open supervised speech recognition dataset with a commercial-use license known as The People’s Speech, incorporating subtitled videos and audio in the public domain scraped from the Internet. Creating a speech recognition dataset requires running inference on a pre-trained neural network speech recognition model to “force align” audio against a transcript (in this case, subtitles). In order to improve upon an initial CPU-based pipeline that took approximately 3,500 CPU days to one that takes 24 hours end-to-end, we created a hybrid data pipeline that used Apache Spark for general data processing and Google Cloud Tensor Processing Units (TPUs) for running the neural network speech recognition model.

 

I will describe in-the-weeds learnings on how to (1) use a non-GPU accelerator with Spark for inference, (2) share physical memory fairly between the pyspark UDF worker.py process and JVM process in the same executor, and (3) implement efficient joins of data that has been reordered relative to its source dataframe by batching by sequence length (tf.data.experimental.bucket_by_sequence_length).

 

If you do offline inference on sequence data with deep learning models, this session is for you. Our entire pipeline is open source under an Apache 2 license at https://github.com/mlcommons/peoples-speech.

 

In this session watch:
Daniel Galvez, Software Engineer, MLCommons

 

Transcript

Speaker 1: Hi there. I’m Daniel Galvez. I’m an engineer who’s been working with MLCommons for a little over a year now. And what I want to share with you is some in-the-weeds learnings about how you can effectively use Apache Spark with, in this case TPUs, but in general, machine running accelerators, especially for deep-learning, for offline inference use cases, in particular on sequence data. So, if you’re interested in sequence data and offline inference with accelerators, your data engineering pipelines, I think it will find at least some bits of this talk interesting.
I’m required to have this wide, but this is my first time giving a talk to such a wide audience. So, if you do have any feedback, I would really appreciate other than yes [inaudible] and self, so please do that if you ever get a chance.
We have a few topics, but let’s just dive right in. The data set I’m going to discuss, which is a speech recognition data set, it was worked on via MLCommons, and MLCommons is perhaps unfamiliar to some of you, but probably MLPerf is better known. And MLPerf essentially is today, probably the best, in the sense that it’s the most complete and least biased. Deep-learning training, and inference benchmarks for various hardware and software systems. You can see results on various setups of lights, ATPs, GPUs, and other accelerator systems and hardware setups. Where do we have an Excel Spreadsheet and a lot of you may know, it’s very hard to get numbers, concrete numbers on these sorts of things, but I won’t talk too much about MLCommons.
If you want to know more about it, the executive director, David Kantor, is actually going to go into these speaking, at least when I made these slides on Thursday at 4:25 PM, so if you might want to know more, I suggest you go to his talk.
But the reason why it changed its name from MLPerf to MLCommons is that, they recognize that just doing benchmarking, there is actually a need. So, benchmarking is difficult because there’s a lot of moving variables among different setups. And a way to make benchmarking easier is you expand it to what they call machine running best practices, which people vaguely know as MLOps, as well as creation of data sets, which is what I’ll be talking about today, at least one specific instance of data set development, anyhow.
I won’t talk too much about what exactly the People’s Speech Data Set is, because this is [Roy] in engineering talk, to help data engineers understand some of the issues I face and hopefully, that can help them out with their own work. But just to give some context, the People’s Speech Data Set that it is a speech-to-text data set. It’s designed to train things like Siri and the Google assistant and so on, or just transcript automatic transcription services.
And one of the things that MLCommons, in general, has found is that, it’s actually quite difficult to get a good benchmark data set for a couple of reasons. And those three reasons are best summarized as the following: a data set needs to be challenging, so it needs to [push] a state-of-the-art. If a data side is too easy, then you can’t really have a good training benchmark, because if everyone’s getting below 2% word error rates on a data set, humans have a word error rate, which is how many mistakes we made while listening to people speak up five to 6% or whatever [inaudible], right?
Once you go beyond superhuman speech recognition on a particular data set, it’s probably too easy. And I’d say the most common data sets today, at least in speeches, [liquor speech], which does have the next property, which is that’s [inaudible] and you can just double you get it, and you have it, which is very nice, if say you’re not a speech-to-text experts, say you’re just a machine running person, but you develop a new model or a new training mechanism, and you say, “I want to see how this goes on speech data.”
If you have to buy a data set, a bias speech data set, those often are on something like 10 to 20K, at the high end. And if you’re a student, or even if you’re like a professor, or a post-doc, or whatever, you have to be quite hard to justify those sorts of expenses. You would want a data set to be free for the sake of our academic friends, so that they can just use it easily. And finally, you want to commercial usage license and that’s for the friends on the industry side.
On the industry side, it’s usually acceptable to use, according to American copyright, any data you want for what’s called research purposes, but in industry, it’s very hard to distinguish between research and development, and business, especially if you’re a research and development company, which honestly applies to a lot of companies doing machine learning and data science today. You’re really want a commercial usage license, and I don’t want to talk too much about this, but the way you get that from data, which is scraped off the internet, scrape is perhaps the incorrect word in this case, download off the internet, is that you essentially need to check that it’s either public domain, or covered under the so-called CCBY or Creative Commons Buyer License. Works under these two categories are generally accepted at okay to use a machine learning data science, for reasons I won’t get into, but you can automatically detect those sorts of things actually in the HTML tax of webpages, and that’s actually how we got the underlying data of the People’s Speech Data Set.
The pie chart to the right, basically, it shows that even in the industry, most research papers actually use public data sets. And that’s because, even we may have a big group inside your company doing, say, speech recognition research, if you want to communicate outside your company, you can’t use your internal data sets. Even within an industry, there’s a big bias towards the open public data sets.
Anyway, let’s get to the meat of the talk, which is, I want to discuss what this workload is, what the offline inference workload I’m doing is, and all the mistakes and problems I found. Hopefully, that’s what you will find interesting.
First of all, you can’t just download a bunch of data off the internet and just say, “This is my train data set.” First of all, you need audio and text, so you need to have, of course, the audio for speech to text process. But if you want to supervise staff, which is what we’re doing, you need the associated text, but even that’s not enough. You needed to discover when each word in the text was that, and the jargon for this is forced alignment or segmentation.
If you Google speech orienting force and [WAM], you’ll find a lot of stuff, in case anyone’s interested in the exact details of how that works. And we need to do that because we need to split audio files, which may be an hour long or half an hour long, or even more, into about 15 seconds of audio. This requirement is a little bit odd, at first, because most people are used to end-to-end deep-learning, you put your data in, you get a train model on the other side. That doesn’t work today because essentially, due to how deep-learning training works, you get a linear growth and your memory usage as your time sequence goes on, so anything over about a minute is going to use too much memory. A train of time, you’re getting out of memory on your machine running systems.
This forced alignment processes is essentially a necessary [evil] to produce a usable train data set. And I’ll mention that, this does use a pre-train speech reduction model, so some of you may say, “Hey, is this kind of cheating? You’re giving a pre-trained model to web all your data?” You don’t actually web bond the data, you’re just figuring out when your human-provided labels were set in the audio, but that is a little bit confusing because it’s, in some sense, a semi-supervised methods.
I’ll be showing the sequel statement throughout the slide deck. The idea here is that, there’s a lot to cover and the sequel standard essentially says everything that you need to know. And I’ll be highlighting particular sections in particular [inaudible] to elucidate what particular product, probably the workload, I’m talking about.
The one thing I want to mention is that, here’s the motivation for using TPU or any XR writer for our task. Essentially, ASR neural network function run on the CPU, that takes 99% of the runtime. For our case, that was about 20 CPU years. And for perspective, we had, I think, only 32 CPUs available to us for various reasons. That would take a little less than a year to finish. And what these sorts of data engineering workloads, you’re not done after you do the first one, you want to iterate and keep trying new things, so you take a little over a year to finish the job. You need to find a new solution, in that case, that’s using accelerator system.
So, the first thing is that, some of you may no introducing Spark 3.0, there’s XR accelerator ware scheduling. And you might just say, “Hey, I can just schedule my TPU in Spark, right?” And the answer is no. Unfortunately, there’s a couple of reasons behind this. The first one is that, the Google TPU is essentially a software as a service, or you could say hardware as a service, over the network. So, if you’re used to using AWS or Google Cloud platform, when you allocate a GPU instance, that GPU is part of the computer instance that you create.
It’s local, it’s attached to the PCI express pass. For the TPU, that’s not the case. The TPU instance is a totally separate computer into what you can [SSH] . So, because you can’t SSH, you cannot create a sway, a sparks wave, essentially, to run jobs on that GPU, on that app machine. So effectively, it doesn’t really play that well with Spark, but there’re more reasons than just that, even in the case if you’re using GPUs or something else, which does support [inaudible] we’re scheduling. What you will probably find, especially, if you use a lot of user-defined functions, which is what we do, is that the number of CPUs you need greatly exceed the number of accelerators that you need.
So, if you just want one single work flow in Spark, like one single spark job, streaming each stage into the other, that doesn’t usually work too well, because if you say you need 100 CPUs for CPU jobs, you’re probably not going to need 100 GPUs for your GPU jobs. And there are things you can do, you can do fractional assignment of tasks, of a GPU to a task, which can somewhat alleviate that issue. But if you wanted to use something like GPUs fully in Apache Spark, you have to avoid using user-defined functions, essentially, because those work only on the CPU, and we have a lot of user-defined functions, in this case, because we’re using a legacy, not quite legacy, but an existing force alignment code base. You’re on your own and Apache Spark sequel is not the most fun. But Accelerator-Aware Scheduling does have its uses.
One thing I will say is, if you’re doing data parallel training and you want to do that on an existing spark cluster, go for it. That’s really the original use case that they had in mind, and that’s a great use case, but offline inference, it’s not so great.
So, the first thing I want to mention is not your right GPUs, but there are so many gotchas and surprises that I had in the side, I think it would help a lot of you. So PySpark, priding Python UDS and PySpark used to be just a big no-no because serialization overheads are too high. So, PySpark introduced Arrow UDFs a few years ago, and this solves a lot of the issues, but Arrow UDS are, at least in mine, what I initially expected were quite not what I expected, and there is reason why this is, but essentially, if you will get Arrow, the fundamental thesis on Arrow is that, it’s an in-memory columnar data format that allows separate processes to share memory.
And because they all know how that memory is weighed out, they can both operate on that memory. So, the ideal would be that you have this JVM Executor, and you have this Python process, and they both act on the same shared memory. Essentially, the JVM does your Spark, your tungsten, a workload, and your Python does your user-defined function.
That’s not what happens. In reality, what happens is that you end up doing copies between the JVM and the Python process, the serialization-deserialization. And there’re good reasons for this like basically, Spark doesn’t know the size of your data beforehand, except in a few, rare cases. It doesn’t have static sizing information, so it doesn’t actually know how much your memory to allocate, which is a reason why I actually does serialization-deserialization process.
The implication of that is that, memory uses doubles because you have two copies, one on the Java side, one on the Python side. That’s not a lot of fun, especially since there are subtle issues with the Java virtual machine garbage collector, which I didn’t know until now, and I hope this helps, at least I want it to be. The Java virtual machine garbage collector, at least as of Java 8, does not return physical memory back to the operating system. What that means is that, at the very least, on your systems, you want to add swap space. I know that some of you may not have that option just because it requires pseudo access, you might be using [shared] cluster. And maybe you can’t convince your dev ops team to do that. But you’re going to want to add swap space to deal with possible memory over-allocation happening, between the Python Java side.
Also, usually you set Spark executor on the entire physical memory of your machine, or actually that multiplied by the number of executor’s on a machine. You want that to fill almost the entire physical memory, maybe 90% of it. You can’t do that in this case, because that does not monitor the memories of the Python ETF. That’s using its own separate memory space, so that actually cannot set a sewing on both of those.
So effectively, what that means is that, you need to be very sparked with memory usage, if you’re going to use Arrow UDFs. That’s the short of it. Swap space helps, and just watch out garbage collection, and java does not return physical pages to memory. So, don’t be surprised that the first time you make a pipeline, the memory loads and Python overheats as well. But at least you will know what the problem is.
The last thing is that, Java can’t handle batteries larger than two gigabytes. And this showed up for me. We have some MP3 files that are almost a good two gigabytes in size. And what that means is that, we actually had to reduce batching on the Python side for that reason. We had to side one, effectively, just because if you have two, almost two gigabytes MP3 files and size and total, and you send those over, then try some of those over the wire, java is going to explode because it cannot allocate a buffer that big.
But this is probably what everyone’s interested, what’s it like using TPUs in a data engineering pipeline? It’s a lot of work. That is the way to put it. No, I’m not a Google spokesperson, but I would say, overall, they got the job done, but it’s a lot more manual stuff. It feels like I’m kind of in the MapReduce days, as opposed to the Apache Spark days of machine running plus data engineering.
Essentially, we use a TPUv3, a pod of eight of them, that’s the minimum size. We use the Google’s limbo code basis, it’s just a machine running a library that they have, which has good TPU support. We use a fairly basic automatic speech recognition model, don’t worry too much about that. Fold out your notes, you could see that’s very vanilla. You don’t want to take a lot of risks when you’re using some new software and hardware systems, so we just went with a basic thing.
TPUs require you to use Google Cloud storage as a file system. That’s because the TPU is on a separate physical node than the CPU system you’re running. So basically, what that means is that the TPU doesn’t have access to the local file system. So, we haven’t used Google Cloud storage for their policies, and that causes some issues later on. Also, probably the biggest challenge that I had is that GPUs just seem to be prone to cross with a fairly, it’s not the best meantime between failures. I don’t want to quote a number because I haven’t done exhaustive test, but my own observations has been that, when you’re trying to work on 86,000 hours, which effectively terabytes of data, one TPU job may not fully execute over that because it may crash in the middle. So, you need you to provide your own restartability watching. And I did add this, but it’s challenging for reasons I’ll get into later, having to do with batching.
It’s very hard to make acid guarantees, things like you won’t write out the same record twice if you have to restart a job, and restart the job from the middle, as opposed to just restart the job from the beginning. As state engineers, we’re very used to doing this. We’re very used to our software infrastructure taking care of restartability. I would say, probably the biggest problem that I had is that, failures would just happen. What these failures were, they weren’t programming errors. They were hard to diagnose effectively, because you do remote procedure calls and you get a kind of opaque strain her message at the end, when something goes wrong, and you can’t really diagnose for that. You can kind of Google it, but once your internal at Google, you’re not going to get a better sense than that one string error message.
I have heard that sometimes, the TPUs crash when they fell off the desk, but I really don’t know. I’ll find a way TPU code can’t use the string data type. Surprise. This actually makes a lot of sense because it needs to know static size information, that means you must use indeed your primary keys if you want a keyed prediction. And keyed prediction is the final thing I’m going to get into.
Keyed prediction is a machine run design pattern, which basically acknowledges the fact that, when you do miss a machine running in production, it’s not just a machinery model which has to execute. Usually, you have to join it to other data… at the end. Data doesn’t just sit along. In our case, we need to join the output of this speech directions model to the text that we want to align it against.
And this is actually surprisingly hard to do, because if you want to fully utilize a TPU, a TPU of a pod size 8 has a bat size 1024. If you want to fully utilize that, you eventually essentially have to do this thing called bucketing, bucketing by sequence line. This effectively does a bucket sword such that, even if your data was sorted on the input side by your key, it’s going to be stored the output side by your length because essentially, you want each sample to be of about the same length for full utilization of your processor for microarchitectural reasons I won’t get into.
And that actually makes things a very big pain, because when you have a bat size of 1024, effectively, what that means is that, even if your data is stored on the input, in our case, it’s sorted by this identifier key you can see at the top, right highlighted, both the audio and the transcripts are sorted by that key. When the audio comes out of the Neural Network process, it’s no longer going to be sorted by that key. It’s going to be sorted by its sequence length. So actually, you need to do a sort again, and the reason you want is [inaudible].
I haven’t heard people talk about this before. I think, the reason why probably is that, the output of most feature of most deep-learning models is actually a very smart, small output. I’ll give you this one, a cat detector, that’s theoretically just one bit of information, so it’s very small. In that case, maybe you can just do a hash join. And if you knew it has to, then that’s probably the best thing to do. But in our case, we essentially output [widgets] every 30 milliseconds, over 40 tokens. But in some cases, people do 256 tokens or many, many, many more tokens.
And for us that ended up being about 1.5 terabytes on compressed. So, there are two solutions I came up with, other than the hash join. Knew what I call it “maps” I joined. So, join what you need before using the accelerator, but that reduces your input bandwidth to the accelerator, and starting, which you can access through the partitioned by functions bar. So, you only need to sort each yard, but if each charge is too small, that can actually reduce efficiency.
And one of the challenges that the Google Cloud file system… it’s actually not well-documented how big [inaudible] needs to be for full-utilization. Now, if you use the two fosters, you know that you can usually back 512 megabytes, maybe one gigabyte file sizes. The Google cloud file system doesn’t document that. So, I’m proud. There’s probably like a two accs speed up somewhere in here, or even more, that would be quite hard for me to get. But not a lot of people will talk about this, and I do think that, it’s a deeper in software system wanted to be complete, it probably needed to think about this, in my opinion, since it seems solvable in software, it’s just not straightforward to solve [inaudible]
This is the last one. What I would say is that, first of all, there’s a lot of content in here and I couldn’t help to show it. All the code is available under an Apache license. If you want it, there’s the link go for it. My contact info is at the bottom, no promises. If you have something interesting, or you want to ask me, or if you’ve listened to this talk and thought, “Oh, this guy. He totally messed something up. He had a misunderstanding.” Let me know. I’m very interested. We’re all learning new things.
And I guess my closing thought is, I think that the ideal for sequence based deep-learning inference, and this is sequence-based specifically, maybe the sense of matter for nonsequence-based models, is not necessarily to add Accelerator-Aware Scheduling to your, to your accelerators in spark. I think, the ideal is for them to act as an asynchronous queue, receiving input into a batch as large enough to run efficient room. I think, that’s the ideal setup. And I don’t see why you couldn’t do this today in Spark because essentially, add a Spark streaming sync, which does this, and I don’t see any reason why we couldn’t do that.
If anyone has thoughts on this, please go and let me know, because then you can, say, avoid crony bunch of data-to-desk, MapReduce style, which perhaps is the biggest issue, but you know, terabytes of this does kind of add up.
And that’s the talk. Thank you very much.

Daniel Galvez

Daniel Galvez works on MLCommons's datasets working group. Previously, he worked on machine learning and content recommendation at LinkedIn. He has a BS in computer science from Cornell University, wh...
Read more