Data Microservices in Apache Spark using Apache Arrow Flight

Download Slides

Machine learning pipelines are a hot topic at the moment. Moving data through the pipeline in an efficient and predictable way is one of the most important aspects of running machine learning models in production. In this talk, we’ll break down the modern machine learning pipeline and demonstrate how it can be improved with a modern transport mechanism. First, we will introduce Apache Arrow and Arrow Flight. We will review the motivation, architecture and key features of the Arrow Flight protocol with an example of a simple Flight server and client. Second, we’ll introduce an Arrow Flight Spark datasource. We will examine the key features of this datasource and show how one can build microservices for and with Spark. We will look at the benchmarks and benefits of Flight versus other common transport protocols. Finally, we’ll show a Demo of a toy machine learning pipeline running in Spark with data microservices powered by Arrow Flight. We will highlight how much faster and simpler the flight interface makes this example pipeline. The audience will leave this session with an understanding of how Apache Arrow Flight can enable more efficient machine learning pipelines in Spark.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Hi everyone. Thanks for coming to the webinar. My name is Ryan Murray. I’ve been working at Dremio for about a year and I’m working there as a developer doing some open source projects and some research and that kind of stuff. So today, I wanted to share with you one of the cooler things that we’ve been doing lately, which is our work on Apache Arrow Flight, and in particular for this talk, some of the stuff with the Spark Connector. And how that can apply to sort of building, data microservices.

So for those of you who aren’t aware, I just thought I’d level set on what Apache Arrow is. Get everyone on the same page before we go any further. So Arrow has really become sort of the industry standard for in-memory data.

Arrow has become the industry standard for in-memory data

I’m not sure how many people have heard about it, but as you can see, it’s in tons of different applications. So it’s used in Spark and in Dremio, and video is doing some interesting stuff with it on GPUs. It’s kind of spreading all over the place. And that’s really by design. One of the goals that we had when we first formed Spark, Arrow as a community was to make it a lingua franca for data. The idea that if you store data in Arrow format, which we’ll get to in a few minutes, there’s all kinds of tools which you can leverage to do calculations on that data and there’s understood and clear and standardized ways to move data between processes and between applications and machines what not.

So since the first release of Arrow back in December or so of 2016, it’s been growing exponentially. Every month, there’s even more downloads.

Part of that is the broad language support. So you can see there’s, over or coming close to half a dozen languages now that it’s implemented in, some of these are sort of using the C libraries and others are native implementations of these libraries. That really helps them with the lingua franca part of it because every programming language is sort of speaking the same language when it comes to data.

And as I said, the community is very active. There’s over 300 developers are doing a lot of interesting stuff, making Arrow work on CPUs, GPUs, and in more recently FPGAs.

So, what is Arrow? Well, simply it’s an in-memory specification for data. It tells you how to lay out memory, layout your data in memory, in a binary format that makes it extremely efficient for analytical workloads, large analytical workloads. And that’s irrespective of if you’re on CPUs or GPUs, or more exotic things. Aside from that, it’s a set of tools. So you have the standard. And then we’ve built a lot of tools in the community to help you manipulate that, the data in that standard. So you can think of that as, you know, sort of Lego bricks for building novel data applications. Some examples of this are IO getting data into and out of Arrow from various formats, whether it’s Avro or Parquet or something like that. And other things are compute kernels or engines, which help you do calculations on Arrow even to things like Flight which is a RPC mechanism or other ways of trading data with other applications or processes.

It’s important to say what Arrow isn’t. And it isn’t an installable system as such, you can’t go and download a copy of Arrow like you would Spark and run it. Whereas it’s a library Spark uses Arrow to be efficient with columnar data. Nor is it a memory grid or an in memory database or something like that. Well, you can build these sorts of things with Arrows, It doesn’t have any of these things in it on its own. It’s more a set of primitives.

And finally, it’s important to mention it’s not really designed for streaming records. So you’re not sending single row Arrow batches around, you tend to have dozens or hundreds or thousands or millions of rows and no record batch. And that’s mostly to be efficient. There’s the columnar data structure, which we’ll describe in a second, is designed for larger datasets. The overhead for small single operation, single record operations is too high to make it useful for reference streaming.

So that’s what it is.

Arrow In Memory Columnar Format

What is the format look like and the, as I mentioned, the important thing here is it’s a columnar data format.

So compared to the traditional memory format, down in the bottom right there, in your traditional format you’re going to have, every row in your table is going to be a block of contiguous memory. So to say you wanted to take the average of session idea, the maximum session idea or something like that. To do that, you’re going to have to read row one, and then you’re gonna have to skip ahead to row two to read the next value of session. And because the interleave fields, they might be a variable length, they could be strings or whatever. You can’t really hop to the next row, you have to read all those bytes. So you end up reading the entire data set to pull out your single column of that data set. And that’s really expensive for the CPU. So you have to load a lot of stuff into your cache, you’re constantly breaking your pipelines and lots of branches and all that kind of stuff and it’s not really efficient.

Arrow does something that will look familiar to the people who are familiar with pandas, it stores everything in a columnar data storage. So there you have all your session IDs and one can take this block of memory. So now when you want to do calculations on that, you simply load that block, you can load a good chunk of that block directly onto the CPU cache and do that calculation one go sometimes leveraging SIMD, Single Instruction Multiple Data operations, where you’re actually doing these calculations in single CPU cycles. So you get a huge efficiency by having this locality of data.

And allows you to do all kinds of interesting things. Like I said, you can leverage some of the interesting architectural aspects of GPUs. There’s lots of these scatter gather I/O type of operations that you can do to really perform these operations efficiently.

Example Arrow Building Blocks

So that’s the format what are some of the Building Blocks? So these are the Arrow Blocks I mentioned before. We have a quick survey of some of the four most interesting ones. First off is our Parquet readers and writers. So what these are designed to do is get data from Parquet to Arrow or from Arrow to Parquet very fast.

So this is done natively at the C++ level. So is an extremely fast operation. This is particularly useful because your Parquet file formats are so similar to your Arrow formats. So you can sort of think of them as cousins. So it becomes very efficient to string Parquet into Arrow. And then when the rest of your data pipeline is Arrow, you’re really not having to constantly change data structures and data formats and you’re never going to be marshaling data once it’s out of Parquet and into Arrow.

Similarly, a more dast implementation is the Feather implementation. If you’re familiar with one of the Arrow founders Wes McKinney’s work, you might have seen Feather. That’s a good way to get data between R/Python very fast. It’s meant to be ephemeral. It doesn’t last very long and it’s not a durable storage, like you’d expect from Parquet, but it’s extremely fast.

And then on the sort of the compute kernel side, you have something called Gandiva. And in that case if you have a, say you have an Arrow, some arbitrary expression, maybe some filters from a SQL statement, or you’re multiplying some columns together and dividing by the other column, something like that. You can take that expression, feed it into Gandiva, and Gandiva will spit out some LLVM bytecode of that expression. And then LLVM will just in time compile that into the machine machine code for your native operating system.

So here, you’re able to generate arbitrary expressions on your Arrow data, and you really get the speed and machine code regardless of your starting language. So it doesn’t matter if you’re in Java, JavaScript or Ruby or whatever else, you’re going to get full machine level speed on these Key kernels.

And finally is Arrow Flight. And that’s really why we’re here today. And that’s our newest member to these building blocks, and it’s our RPC mechanism.

So what iS Arrow Flight?

Arrow Flight

Simply it’s a… Again, it’s a protocol. It’s high performance protocol that defines how to move data between two systems.

And the key here is, it’s a bulk transfer system.

One of the reasons that it’s so much faster than other implementations is you take an error buffer, and write it directly into your network buffer. So you don’t have to translate your data before writing it onto the network. Similarly, the client is going to receive the error data from the network and it’s going to materialize directly into an Arrow buffer. So you’re not dealing with all the marshaling and the expensive CPU calculations of having to constantly change the format that your data is in.

So that’s where the speed comes from. This also is sort of the last piece in our interoperability problems. So as I said, one of the founding points of Arrow is to make a lingua franca for data.

And what the Arrow Flight does is it allows any system any operating system most any programming language to talk to each other.

In a understood known language, we never have to marshal data, change data, transform data. And it’s built up from the ground up to support parallel streams, which I’ll get to in a few minutes and security. So out of the box, in only maybe a dozen lines of Python, you can write a Flight server that will be SSL encrypted and have security attached to it. So, some of the features that you expect are builds built in right out of the box.

Arrow Data Paradigm: Streams of Batches

So how does this protocol actually look like? Well, so it’s built primarily, the underlying format is gRPC. So if you’re not familiar with gRPC, it’s a Google’s RPC mechanism.

And the idea there is you to find a concrete set of operations that you know how to do, and then clients come in and request you to do those operations by supplying you the data or requesting you to form something about it. Well, the core concepts around gRPC is a concept of streams. So rather than I give you some data, you give me back some data, I can open up a stream and continuously feed you data or receive data.

So what a client server interaction looks like is going to be something like, I’m going to send you data. Here’s a batch, here’s a batch, here’s a batch, here’s a batch I’m done.

That’s some of the efficiency and some of the ease of use around gRPC helps us do that and it’s it’s going to be really fast. We also interact with a relatively low layer of gRPC, which is where we’re able to get the zero copy between the network buffer and the in memory process buffers.

Some of the kind of things that Flight can support right now is you can do, PUTs and GETs. So you can give a server data, you can get data from Server, and recent addition is something a bidirectional streams, that’s constant interchange of data.

And everything is initiated by the client here. So, the client is in control of how these transactions happen. You never standing around waiting for a server to contact you.

So this is what got us thinking originally about the concept of microservices. So you’re gonna have, say you have a Spark instance Spark cluster in a Dremio cluster and you want a very large data set from Dremio inter Spark for some data science application where you can, so you use the Flink connector and you can stream in parallel, bring back a very large data set very quickly. Then you train your model and you expose that model as a FlightEndpoint. So now a further downstream consumers able to send you a matrix and get back up prediction vectors, something like that. So you’re able to start getting the concept of around have a bunch of small servers doing really concrete things.

Something more esoteric might be you’re doing a machine learning mixing model, so you ask the system for a prediction and then it federates out to a bunch of different machine learning models. And since they all know how to talk Flight and are using Arrow under the hood, they can be you know TensorFlow on Spark, you know, the Python it doesn’t really matter. Ask all those to train the data on their individual models bring all that back, mix it together in the upstream microservice before sending it back to the client. So you can start breaking down your data pipeline into a bunch of distinct components that can be reused, deployed and developed individually. So you can start seeing the sort of microservices architecture come out of this data world.

And for me, at least, I think this makes the concept of a data machine learning pipeline much more palatable to manage and control and build and maintain.

Big Datasets Require Parallel Streams

So that’s kind of the underlying. How does the Parallel Streams work?

Well, when you ask a server for a data set, you’re going to ask for a Flight information. And you’re gonna get back a set of FlightEndpoints and those FlightEndpoints represent your stream. So if you collect data from all those FlightEndpoints, you’ll have your entire data set. And the FlightEndpoint is simply a Flight ticket, which is a token to say you’re allowed to get that data set. and it identifies a very specific portion of a specific data set and an endpoint so you know where to get the data from.

So when you get back this set of FlightEndpoints, you’re then able to break those up any way you want. If you get 10 back from your Flight server, you can redeem Flight tickets, one at a time and zero if you want. Or you can do them in a big bank, say you can spread them across a parallel system either in memory for a single process or you can send them across a bunch of Spark executors. And you’re really just linearly multiplying your ability to move data (mumbles) And, right now, the way you get data from Spark or from Flight, excuse me is either sort of a dotted namespace so you can say, ListFlights, and that’ll say all of the namespace separated data sets and then Flights or knows about, or you can even send an SQL query. So you can send a full SQL query, the downstream server will execute that SQL query and send you the results back.

So more concretely, that’s sort of the parallel system will look more like this. If you have a Dremio cluster and your say a Python Client, the Python Client will issue a get Flight info to the to the coordinator to the Dremio coordinator, which will return back a set of endpoints and then the client knows to go to each of those endpoints with their specific tickets to get the data and in this case, it will go to a bunch of Dremio executors to get the data. So if a client was a Spark client and you can have your Spark executors facing off against your Dremio executors, and you’re just getting individual pieces of your data back into your Spark executors.

So, that’s Flight. Now let’s talk about what the Spark Source looks like.

Spark DataSource V2

First, our Spark source is built off of the relatively new DataSource version two. That came out, I guess, a couple of smart versions ago. It was a complete rewrite on the data source API. And has a lot of really interesting, really exciting features for people building data sources. And as big of a change as that was there is even another very large change going into Spark three. So it’s a lot of really cool stuff happening for these Data Sources. For us, some of the most important stuff is the columnar support, so you can do stuff with Arrow batches, for example, transactions which isn’t very important for this example.

But it’s really powerful. So you can actually perform like acid transactions on your underlying data source. And if that’s a Flight data source talking to a large scale, say Dremio cluster or something like that, then you’re able to do some pretty powerful transactions. It’s also easier to control partitions and how to map data source partitions into Spark partitions, and better support for push downs, which is really important for single sources.

Flight Spark Source

So the the Spark source for Flight what that looks like so it leverages the columnar batch as I said before, so you’re actually pulling Arrow offers directly from the Flight source into Sparks in internal Arrow representation. So the columnar batch is actually using Arrow under the hood. It’s also fully supporting push down so if your Flight server is able to understand SQL then you can generate generic SQL queries inside of your pandas to any side of your Spark data frame. And the source will translate that down into SQL before sending it off to the Spark source. Currently, the Data Source V2 is only capable of doing push downs for filters and predicates so it doesn’t support things like joints or aggregates. So it does limit how much heavy computation you’re able to push down to your downstream sources, but hopefully that’ll be added in the near future. And then that can really you’re really able to send arbitrary SQL down here to your Flight sources from your Spark data frames.

And finally, for our use case, we’re gonna partition by Arrow Flight ticket. So the Flights Source the Flight server is going to generate a bunch of error tickets and then those error tickets are going to be federated out as partitions in Spark. So that the the downstream system is actually able to control how it’s gonna best partition the data given the queries that’s being said.

So that’s it for the Spark Source. It’s relatively simple. There’s a link to the GitHub repo at the end of the talk.

I encourage everyone who’s interested to go and give it a try. For now let’s talk about benchmarks. The fun stuff.

So for this benchmark, we worked on AWS.

We took the most recent version of EMR and face that off against Dremio AWS Edition. So in both cases, we’re gonna have four nodes, four relatively large nodes. So relatively equal comparison of compute power.

And then, we are gonna run a bunch of different data sizes for each data size, we’re going to perform some non-trivial calculation on the Spark side. And that’s to make sure that we use every row and every column. And to make sure that Spark or Dremio aren’t playing games on this. When I was first working on this, we were Spark and Dremio are both relatively smart, so they were dropping columns that weren’t being calculated on them. But what that gives us is two times, the first time is the time for the entire calculation, the Spark Dremio and on the wire calculation, and the time in brackets is the on the wire.

Everything’s measured in seconds.

So just focusing on JDBC versus Serial Flight, you can see significant performance improvements between JDBC and Flight immediately on the single serial use case of Flight.

So there’s already a good argument for why Flight can be a good replacement for JDBC or ODBC in the future.

And then when we start talking about the parallel stuff, we can really see massive performance improvements. So we are seeing orders of magnitude improvements over JDBC, many, several multiple orders of magnitude in some cases.

For point of reference, when we are using eight nodes of both EMR and Dremio, in moving a billion records that total something like 80 gigabytes of data. And that boiled down to something like four gigabits per second of bandwidth on each one of those nodes. Which is actually a significant portion of the total EC2 bandwidth available. So at this point, we are actually moving data pretty much as fast as we can move it, on these modern cloud networks.

Which is really exciting. And I think this is what really makes for me the concept of these microservices which are distinct, isolated components, all talking to each other a real reality. So thanks everyone for watching and listening and happy to take any question and here are some links to get you guys started with flying.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Ryan Murray

Dremio

Ryan Murray is a Principal consulting engineer at Dremio in the professional services organization since July 2019, previously in the financial services industry doing everything from bond trader to data engineering lead. Ryan is a PhD in Theoretical Physics and an active open source contributor who dislikes when data isn't accessible in an organisation. Passionate about making customers successful and self sufficient. Still one day dreams of winning the Stanley Cup.