Accelerating Apache Spark Shuffle for Data Analytics on the Cloud with Remote Persistent Memory Pools

Download Slides

The increasing challenge to serve ever-growing data driven by AI and analytics workloads makes disaggregated storage and compute more attractive as it enables companies to scale their storage and compute capacity independently to match data & compute growth rate. Cloud based big data services is gaining momentum as it provides simplified management, elasticity, and pay-as-you-go model. However, Spark shuffle brings performance, scalability and reliability issues in the disaggregated architecture. Shuffle is an I/O intensive operation, which will lead to performance issues if using a typical cloud provisioned volume as shuffle media. Meanwhile, the shuffle operation of different tasks may interfere with each other thus limits Spark’s scalability. Moreover, shuffle re-computation in case of compute node failure poses significant overhead for long running jobs. Disaggregating shuffle from compute node is becoming more and more important for cloud-native Spark application.

In this session, we propose a new fully disaggregated shuffle solution that leverage state-of-art hardware technologies including persistent memory and RDMA. It includes: a new pluggable shuffle manager, a persistent memory based distributed storage system, a RDMA powered network library and an innovative approach to use persistent memory as both shuffle media as well as RDMA memory region to reduce additional memory copies and context switch. This new remote shuffle solution improved Spark scalability by disaggregating Spark shuffle from compute node to a high-performance distributed storage, improved spark shuffle performance with high speed persistent memory and low latency RDMA network, and improved reliability by providing shuffle data replication and fault-tolerant optimization. Experiment performance numbers will be also presented, which demonstrates up to 10x performance speedup over traditional shuffle solution and three orders of magnitude reduction in terms of shuffle read block time.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Thanks for joining the session today. I’m Jian Zhang, from Intel, Software Engineering Manager leading a team working on optimizing the Big Data software stacks on entire platform.

Today I’m going to introduce our topic Accelerating Spark for data analytics on the cloud with remote persistent memory pool.

So, here is the brief agenda. We will first start with the motivation and the background, why we are doing this work. Then we will give a brief instruction on the Remote Persistent Memory, what Persistent Memory and how people typically using it and how it can be used in different use scenario. And as for that, we will deep dive into the Remote Persistent Memory for Spark Shuffle. We have two implementives here. One is the place the Persistent Memory on the local compute node and accelerate returns back shuffle will remain. The second one extending the persistent memory to a fully disintegrated mode. So building a Remote Persistent Memory pool. Then we will wrap up the topic with a summary.

Let’s take a look on the motivation.

Motivation – Challenges of Spark shuffle

So based on our experience of the customer engagement, we observed several key changes in the Data Center Infrastructure Evolution. The first and foremost one is there is a trend to do the compute and storage disaggregation because with this people are able to scale their compute and storage independently. We were able to reduce their cost and can sort of provide better utilization on the earlier hardware resources. In low the environment, it’s typical that it’s a disk less environment. And it’s become more and more important. The second trend is the hardware deployed in data center is becoming more and more advanced. For example, people you already have like a 25 gigabit, and even 40 and 100 gigabit networks in our data center. So, it makes the local storage less attractive. The third trend is there were new storage product emerging storage class memory, the opt in Persistent Memory storage stuff. But on the other hand, Spark Shuffle do put some new challenges in the big data in a large scale data cluster. First and foremost one, people usually complain that the results of utilization are not very balanced and purely your job match out of the memory due to slow garbage collection. And if you are using traditional hard drive for the shuffling the disk I/O is too slow. And because Spark shuffle has a big spill, it will further degrade data performance. And when you have more data, your Shuffle will increase dramatically and hurt your performance. Even if you’re using SSD as a local server media, it going to be way out because the endurance it not that very good. And for some typical workload on typical jobs it is so long that you cannot even afford the recompute to cost. So those are the typical challenges a Spark cluster is facing today.

Re-cap of Shuffle

Let’s take a look on what Shuffle is. Basically, it’s a process of exchanging the data from the mapper to the reducer. For example, the mapper will read an input data and split it to multiple split and then using a user defined function to do the calculation and a module on the local drafts and then the reducer then we will fetch the data doing the sort handling prior to the final output. So from this process, what are we can, sorry, it’s like point to point network connection. A lot of disk I/O, a lot of the network network I/O. So this could be some real problem in a large scale production cluster.

Spark Shuffle Bottlenecks

So for the Bottlenecks, we have done a lot of the benchmarks here. Let’s take two examples. The first one is using nWeight, which is a graph computing workload. We observed that during the shuffle phase, we could see that the local hard drive here it easily become a bottleneck. The disk drives was fully utilized and the latency, the RPS is not going to satisfy the shuffle requirement. The other workload is the TeraSort, which is common workload I think we use. Many of the big Data Engineers remains to prefer to the cluster the performance. So for the TeraSort we can clearly see when doing the I/O phase the CPU was not sort of used because the disk I/O was too slow. And when doing the spark phase, the CPU was almost fully saturated. So we were warning, we can come to a conclusion the Shuffle in both CPU and I/O intensive fourth phase that need to be optimized.

So let’s take a look what is Remote Persistent Memory.

PMem – A New Memory Tier

IDC reported that the data is growing very fast. So the global data sphere growth rate is 27% compound annual growth rate. So that means the data is growing very fastly. But if we took a long look on the DRAM density, it does not grow correspondingly,. The growing rate will become more slower and slower. For now it’s like every four years the DRAM density will grow two times. So there will be a gap between the memory subsystem and data increase rate. So we need a new memory system to be met or close the gap. So with this, the industrial is developing a new kind of a hardware, it’s called a Persistent Memory. It’s basically a new storage tier seated between the DRAM layer and the traditional storage layer. It offers higher capacity compared with DRAM, but at a slightly lower latency, but it’s way too fast, will move fast compared with a storage layer. So it should trying to bridge the performance and operational and cost gap between the DRAM layer and the storage layer. So, take in Persistent Memory for example, it support two different operating mode, one is the memory mode where you can use the Persistent Memory as a DRAM. Where the original DRAM was a catch for the PMem So from operating systems point of view, you use a much larger DRAM capacity. The second memory model, we call it App Direct Mode, which it will be exposed as two different Shuffle resources. One is DRAM, original DRAM then the Pmem. You can have a fully control on how to use these Persistent Memory devices. So it gives you a lot of the flexibility to your applications.

Remote Persistent Memory Usage

The community exploring how this Persistent Memory can be used. So typically in the beta cluster, we can use it in the following three scenario. The first one is as a use it to solve the High Availability issue, we simply plug the persistent memory in each physical node and then the application will replicate the data in the local persistent memory across the fabric to another one. So this is more like a poster for backup for high availability. The second memory scenario is using the Remote Persistent Memory to enlarge the computer nodes memory capacity. So in this scenario, the applications are much larger memory capacity on the remodel. The third scenario is actually the most appealing one. Where you using uprooted PMemory resources as a shared a memory pool. So application can share their data among the disputed pools. So in this scenario is the most interesting one, the Remote shuffle service will be around.

Access Remote Persistent Memory over RDMA

So, it looks like exciting the Remote Persistent Memory will be the most challenging one. But because you know the Persistent Memory offers remote persistence without losing any characteristic of memory. It’s a really fast which means that you need ultra low-latency networking and need to have a very high bandwidth so like a two gig batch or even six gigabit or two gigabit for very small raid and six gigabyte for large sequential raid. So we will need a very ultra efficient protocol. The remote access must not add a significant latency. Otherwise it will have the bandwidth and the latency benefit that you can get from memory. If we take a look on the right picture RDMA offers moving the data between the two system without involving CPU. It will offloading all the operations to the network for low latency. The latency is extremely lower and it offers very high bandwidth. In today’s most advanced network on it actually RDMA capable. The 200 gigabit the 400 gigabit, they offer zero copy kernel bypass and the hardware offered a one sided memory to remote memory operation. It even offers a reliable credit to base data and control delivered by hardware. Makes the network resiliency scale out. So, this makes RDMA a perfect technology for remote memory access. But it does add some complexity for example, because of traditional RDMA, it does not keep taking into consideration the data durability. So the CPU caches need to be bypassed or faster to get the data into the ADR domain. When we’re writing to Persistent Memory, it will need a synchronized acknowledgement when the write to have made it to that durability domain. But currently RDMA write sort of lacks kind of a acknowledgement or support.

RPMem Durability

To make it durable today the solution is kind of a based on the software itself. So take a look on the right upper picture, actually when the application trying to write to a peer RDM Nick, it will need a RDMA read a small RDMA reader to force the rights to the Persistent Memory. This is major. This actually makes the RDMA remote access not that kind of efficient, but the community, the industrial is developing a new RDMA common op-code called RDMA Flush. With this RDMA Flush it will flush all previous writes to a specific region and it will provide a memory placement to guarantee into the upper level software. So basically we can leverage the RDMA Flush to force previous RDMA right without another RDMA raid. So this makes the Remote Persistent Memory operation much more efficient.

Okay, then let’s start to look at the remote persistent memory pool for spark shuffle.

Let’s take a look, recap on the first phase of this work.

The left picture is our original spark shuffle where a job serialize the objector to a off-heap memory and then it will write that to a local shuffle directory through the file system.

Re-cap: Remote Persistent Memory Extension for Spark shuffle Design

And after that the shuffle reader will read it from the local shuffle directories or file system and then send out the data through TCP-IP stack. So this during this process, we could see a lot of the user and kernels based contexts switch. We could see the fastest and more involved in particular had the benefit from the persistent memory. And the TCP-IP stack will be a lot of issue and consume a lot of CPU cycles if you have seven or even 10 numbers are unknown. To address those performance challenges, we developed the Remote Persistent Memory Extension for Spark shuffle basically like the picture showing, where we theorize objects will off-heap memory directly and then pass it to the persistent memory using the user space PMem to elaborate. So, then we will read through the remote persistent memories through RDMA read and RDMA memory region will combine, will register with the persistent memory, memory address. So during this process because we were using the US based Pmem elaborate , so, there was no user and kernels context switches. And because there was no file system, we using direct access mode to write to the Pmem directly so, we do not have the file system of hand. More importantly, we’re using RDMA to read the data from the server media and the combined RDMA memory region with a Pmem address. So, this is with a much faster implementation of the remote server.

The result is actually pretty prominent. We tested the performance with end to end workload Terasort and decision support workload. For the terasort workload we could see compared with traditional vanilla Spark, I mean, the hardware based shuffle, the performance improvement is significant. Even compared with four NVMs server we still see 1.29 times speed up.

On the other hand, if we look at the reader block time from Spark UI, we could see a significant tail latency reduction between the different solutions for example, the hard drive based utility is 8.3 minutes. NVM based shuffle solution that latency is the 11 second, but for the Pmem based shuffle the tail latency is only seven ms. So the performance, the tail latency has been increased dramatically. This is simply because a persistent memory provide a higher write and read bandwidth per node. And it has much higher endurance and has excellent latency because it fit into the Dim channel. So, the latency is much lower compared with NVMe which is attached to the PCI pass For the decision support workload, because the decision support workload is actually much less I/O intensive So, we didn’t also see such a big performance improvement, but we still see 3.2 speed up compared with hard drive based shuffling.

So, if we took a look around on the picture, the bottom picture, we could see that the I/O intensive queries was accelerated dramatically, but the CPU in table one was not the kind of image which is with our patient.

Extending to fully Disaggregated Shuffle solution

So we already see this prominent readout. What are we going to do next? We observed some new challenges in the last few cluster, which is disaggregated, disk less environment, and people want to do their shuffle with their computer independently. And on the call osculate cluster, the CPU and memory was pretty unbalanced. And some jobs they lasted for a very long time for example, like 24 hours or even longer. So if the shuffles phase failed then it will recompute the job again and start from scratch. The cost is almost intolerable for the users so they were trying to develop Elastic Deployment with compute and storage disaggregation environment, which definitely shuffle is the most important component in this new solution. So with this there’re requirement to decouple the shuffle I/O from the compute node with specific storage that is capable of delivering a dedicated as a service level agreement to different types of applications. So this will need to support a fault tolerant in case of a shuffle failure and so then there is no need to recompute. It also request to offload the spill as well. So this will help to reduce the computer node memory footprint and improve the memory shuffle utilization, bring a much balanced resource utilization across your compute node and storage node. So because of the traditional hard drives, and even MMSD, it might have the endurance issue for write, for frequent write. So they want to leverage the state out of a hardware as a storage media so it can provide a high performance high endurance storage back end. Those are the motivations that drive maintenance to build a remote persistent memory based fully disaggregated shuffle solution.

So let’s take a look on the remote persistent memory pool architecture. So basically, it’s a new fully disaggregate shuffle solution that leverages persistent memory and RDMA technology.

RPMP Architecture

It will include the three different components. The first one is the new shuffle manager. The second one is the persistent memory based distributed storage system, the last one is efficient RDMA powered network library that can bridge your spark shuffle driver with the PME distributed storage system. So for, this RPMP, it will provide allocator free read/write API on pooled PMemory resources, which makes it easy to use and accessible. The data will be replicated to multiple node. For example, when a shuffle write to the node one, it will be routed to another node for our DMA protocol for high availability. And because it provided the memory like API, it can be easily extended to add a unique scenario like data store, cache store. We can even use it as a cache layer for spark. So, with this fully determined RPMP architecture, it will help to improve this Spark scalability because there is a disabled in the shuffle from computer to independent cluster, it will improve the spark shuffle performance because it will average the persistent memory shuffle media. It also can improve the reliability in case of a shuffle failure because it replicated the data to another node in the RPMP cluster.

Remote Persistent Memory Pool bverview

So, if we take a closer look on the architecture side, we have multiple layers in RPMem pool. The first layer is a proxy layer. The proxy layer is actually the bit before the RPMP storage node to choose which scenario to use. It was using a Consistent Hash algorithm to avoid a single point of failure. It maintains active node map and using the heartbeat to keep track on which node is live which node is down. So, the RPMP core layer include, it actually includes three layer design, the network layer, control layer and storage layer. It will be in charge of a replicated the data from worker node to our driver node of RDMA. Under the whole, those different persistent memory devices from different RPMP node formulate a global memory address space to the applications. Once the DRAM node goes down, the worker node is still writable and readable because the distributed design.

So the RPMP client is provided a transactional read/write allocator free API and put/get interface to user. The network layer is build a library called high performance network library, which is based on lib fabric. So, we need to provide a protocol agnostic support. It can be working along with the Roce RDMA, iWARP RDMA or even InfiniBand. Now RPMP server is a three layer design.

The network layer have HPNL server running inside it so it can talk with the HPNL in our case the client from the content. And in the controller layer is responsible for the Global Address Management Transactional process. It also provides a set of accelerators which can do sort, merge, et cetera. The storage layer is actually responsible for allocating writing the data to the persistent memory device directly. It will be using the user space PMDK elaborately. So, it bypasses the file system directly.

Spark RPMP optimization features

There are multiple optimization features in our PMP. The first one is optimized the RDMA communication. And mentioned below, above we were using high performance network library. It run support multiple protocols.

On the design, we actually leverage the server to handle all the right operations.

We will have a server doing REM write to the clients on the read pass. So this will avoid one time RME communication. On the controller acceleration layer, we have the partition road which will aggregate a small partition to a larger one to accelerate the reduce phase, reduce the number of reducer connections. On the surface we could even sort the shuffle data on the fly to the no compute on reducers phase. It will reduce the computer modes resource utilization. It also provided a controllable fine granularity control or resource utilization if compute node on the CPU resources is limited. On the storage layer, we’d have a global address space accessible with a memory like API. Under the hood is actually a key value basis now, based on a lib, PMemory objective. So it is transactional means that you do not need to care about the autonomy of your data. And the allocator for the manager to manager on the memory or storage proxy will direct the requester to different allocators. So each allocator will run the data to the individual consumer devices.

So, on the digital workflow, we have a single node write and read workflow and distributor mode write and read flow. On the single node side, the client will write the data. The client who will be used by the Server Manager plug in. It will write data to a specific address. And then the server will issue or RDMA read from the client DRAM to the server DRAM and then flush the DRAM to the passive memory and then send ACK to the client side. On the read pass, it will, the client will read it from specific address. So the server will issue RDMA right from the server persistent memory to client DRAM directly. So in this way, we will notice that there was no DRAM involved. The data was fetched from the persistent memory device directly, because the customer may provide a memory like API so it can be used as RDMA memories indirectly. On the distributor mode, it’s a little different. Whether data was the right to the node once you run it will issue RDMA right to the second node. And so to replicate the data and then the second node will flash the data from it’s DRAM to it’s Pmem. On the distributor mode even on the read path, the second node will be not involved if the primary note was still leaving.

PMEM Based Shuffle Write optimization

So, on the on the write to drive part we implemented

we implemented optimized shuffle key memory, shuffle writer based on the lib pmemory objective. On the map face we will provision the P memory namespace in advance. We currently leveraging a circular buffer to build a un directional channel for RDMA primitives. We will serialize data to our buffer and once it hit with a predefined threshold it will create a block from through libpmemobj and then use a memory copy to write the data to the persistent memory. So, this new implementation it happened only append a writer and only write once. So there will be no need for index files because we can, saving the information in the libpmemobj metadata So this is libpmemory based. It’s bypass by the kernel, ultra efficient. On the reduce side, we use the memory copy to read the data directly and we reduce so we will read through pmemory directly from our DMA.

RPMP integration to Spark Shuffle

See, this is a brief high level introduction actual high level summary, of how the Shuffle Manager looks like. We implement a Pmemory shuffle writer and a Pmemory shuffle reader. It pretty much like hook and Sort Shuffle Manager plugin. On the beneath that, we support the block object writer and external sorter writer which handle the shuffle writer and external sorter directly on the right path. So the two different color actually represent, on the right part actually represents two different implementation. On the up level it’s more job based, on the below level is a CPP, literally code. So we will need some teeny wrapper on top of the native code to make calls to the libpmemoryobj API’s.

On the read past the P memory input stream will read the data from the RPMP directly and then send it to a pmemory buffer and listen to the P mem shuffle reader.

We did some performance evaluation of this RPMP solution. The last part is actually a micro workload benchmark. We simulate a client and server environment. We’re using allocator remote write and remote read test micro tasks to to test the theoretical or the maximum bandwidth over this RPMP pole. We saw very promising result, especially for the remote read. We were able to hit the 40 gigabit network theoretical throughput.

The random picture is integrating this option p to spark shuffle.

Currently we only evaluated the Shuffle with a two node comparing it with the running back shuffle. In this configuration we have a two node. Each have a heart to heart based HDD and HDFS and on the second configuration, a PMP configuration, we have two persistent memory on the second node playing the remotes pmemory shuffle Row. We could see in this prototype, we saw 1.9 times speed up of our PNP based remote fully disaggregated shuffle solution.

Okay, to wrap it up, we do see spark shuffle posed a lot of challenges in large scale production environment, and a remote persistent memory, extending persistent memory new usage model to new user scenarios with RDMA being the most advanced technology for remote persistent memory access. And the RPMP pool for spark shuffle, enable the fully disaggregated high performance low latency shuffle solution. It can improve the spark scalability by disaggregating shuffle from compute node. It can improve spark shuffle performance with high speed persistent memory and the low latency in RDMA network. It also improved the reliability by providing a manageable and highly available shuffle services that supports shuffle data replication. So call to action.

Accelerate Your Data Analytics & Al Journey with Intel

At Intel, we are committed to help you to accelerate your data analytics and AI journey by removing the technical barriers and inefficiencies.

We do not only work on some single part.

Instead, we start by enabling and building a modern cost efficient, scalable and intelligent foundation. Not only under the hardware level, but also at the software level. So you could see from here we have a holistic end to end software solutions. We have libraries working on the hardware. We have the optimized miscellanea and the deep learning tools. We have also closely collaborating with the partners like optimize cloud platforms to provide a holistic end to end data analytics and AI pipeline to the end user.

So the Remote persistent memory is actually part of our bigger project called the Optimized Analytic package for Spark, which include multiple optimizations based on inter architecture and a new mostly advanced hardware for spark. We have the native SQL engine, which can help you to accelerate your shuffle, accelerate your SQL query by leveraging the native implementation of the SQL engine and leverage the MDX file, share instruction for more efficient data processing. It’s column based native processing engine. We have a remote shuffle plugin and an RPMP. It fully disaggregated shuffle solutions. So if you’re interested, feel free to go to this link and download and have a try.

Thanks for attending the sitting.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Jian Zhang

Intel Corporation

Jian Zhang is a senior software engineering manager at Intel, where he and his team primarily focus on Open Source bigdata analytics software development and optimizations, and build reference solutions for customers. He has over 10 years of experience in performance analysis and optimization for many open source projects such as Xen, KVM, Swift, Ceph, Spark, and Hadoop and benchmarking workloads such as those from SPEC or TPC. He earned a master's degree in Computer Science and Engineering at Shanghai Jiaotong University.

About Chendi Xue


Chendi Xue is a software engineer from Intel data analytics team. She has six years’ experience in bigdata and cloud system optimization, focusing on computation, storage, network software stack performance analysis and optimization. She participated in the development works including Spark-Sql, Spark-Shuffle optimization, cache implementation, etc. Before that, she worked on Linux Device-Mapper optimization and iscsi optimization during her master degree study.a