Nowadays, Spark is widely adopted in the big enterprise by handling the large volume of data. In PayPal, more and more complex data processing applications are running on top of Spark for its better performance and easy usage. Graphic analytics are among the emerging trend for different business use cases, E.g., risk control, compliance, etc. In this talk, we would like to share our practice while building the large scale graph applications on top of Spark. How to achieve 4-5x performance improvements while handling billions of nodes/edges? How to balance the performance and resources efficiently? What is the key learning while conducting the enterprise production-level pipelines by using Spark?
– Hello everyone, my name is Fuwang, and I am a engineer at PayPal, and as we all know, Spark is a powerful engine for large scale data processing, and which has already been adopted within a lot of companies. And, but in some realistic production environments there might be a lot of challenges that we may face to run a Spark application smoothly and efficiently. Today, I am here to share you my learnings of using Spark instead of PayPal, and I’m also here to discuss with you all, and learn from you all. Yeah, this is today’s agenda. We have a very simple agenda. Firstly, I will introduce the challenges we are facing and how we resolved the problems in the production environment and also share our learnings from practice as well. Here, let’s go to the challenges first. Basically, we have two challenges. The first one is the large graph with the data skew in nature. We have a huge graph in our production, and it has more than two billion vertices and more than 100 billion edges, and the degree of the load in the graph is different. The maximum degree of the load can reach 2,000,000, and meanwhile the average degree is just 100, as you can see there. This skew is, exists here. It is in nature, and our challenge we are facing is the strict SLA requirement, and that we have various limitations in the production. We have went limited in resources. I think some of you may have similar experience as me, just sometimes we ask for the resource from platform team, and they always, they usually push back the hour initially pressed, and then reduce the amount of our asked, and I think from, I can fully understand from their perspective, from the platform team perspective; but, from our perspective, we need to balance the performance and the resource where we are to meet our strict SLA and that is work we need to do. Basically we have two paths, one is about the, how to improve our scalability, and the other is about the performance optimization enhancement and, I first realized go to the scalability part. The optimization is the other topic. We always like to talk about the, how to optimize our application, optimize our performance, reducing our execution time, but in many scenarios, actually the basic and fundamental recommend is to finish your (indistinct), and get results; and, but, in many cases our application cannot finish when the data value increases, and become huge. It may crash in the run time, it may run into other memory issues. In other words, the scalability of the application is not good. Here, I would like to share one case, in which we are improve our scalability of our application. In our graph product, as you can see at the side; as you can see in the picture at the right side, we provide the user with the abilities of detecting communities, and digging out some community patterns.
And this ability can have our uses to do a lot of analysis work, such as for risk control, for compliance management.
For the community detection, there a lots of algorithms that we can use, but, and connected components is one of them, and it is a very simple one. It’s also a very commonly used one. I think some of you may have some experience of using it in your graph (indistinct) as well. And, here I would like to take an example to (indistinct) how is the connected component is generated By the way, we refer to this paper for the implementation algorithm of the connected components. If you are interested, you can take a look. Here, you can see a simple undirected graph. All the loads in the graph is connected with each other, directly or indirectly. So, they belong to a single connected component. Yeah and we can find one connected component, and we will pick the smallest note which is the one here as the representative load of this graph. Until we can get one community, we will get new pairs of the community, all the loads belongs to the community 1.
OK, here is some sample illustration. This is the input pairs. The next step is to make it directed and then we will get new pairs.
The next step is to group by it by starting node,
and then we will get a new pairs, a new (indistinct).
As you can see, we have different groups, in each group, we have a list of values responded to the key. And then we will do, we will find the smallest node in each group, and then linking all the nodes to the smallest node in each group. And then we will get generate new pairs, and then we will do the deduplication, we will do the dedup to get new pairs.
And this is a, after this iteration we will get our intermediate graph. And, we will, after this iteration, we will go through next iteration and take the iteration of, take the output of previous iteration as the input of the new iteration, and go through all the same steps, just like I introduced before. Just group by starting node again,
group by aggregated, and find the smallest node in each group and join new pairs, and then we will get another intermediate graph.
Yes, again, next we will go through next iteration.
Iteration 2, Iteration 3.
Yes, and we will go through the same steps. And finally, yeah, we will go through all the same steps,
and join new pairs, and finally we will find one community here, as the representative load of this community is one, and the members are 1, 2, 3, 4, 5 and 6, and then the algorithm, we are just finished. We already found one connected component. But now where is problem? Where is problem happen? Yeah, in our production case, in one single connected component, there might be more than 50 million nodes. It means in the group by step for a single key. There will be more than, there will be millions of values, and since in the original implementation of our algorithm, we keep all the values in memory, so it (indistinct) in the out of memory (indistinct). We can say that this skew happened in the group by step and the out of memory happened as well.
Okay next, lets introduce our approach to resolve this problem. Basically, we did three things to resolve the problem. First one is separate the keys into normal keys and the huge keys. We have set a (indistinct) for this. And next, the second one is splitting the huge keys into multiple parts, by doing so, we.. by adding the learn the number as prefix of the key. By doing so, we can increase the (indistinct). And the third one is to spread to disk in the reduced size when necessary to avoid the out of memory issue. By doing these things, we can resolve the out of memory and then skew problem in our production, and the finished algorithm smoothly. Okay, here is the some, here’s some lessons and learn of the scalability case. Just two points I want to highlight. First one is, do not blame Spark when you see out of memory, because from my experience,
I have never seen an out of memory issue, which is caused by Spark’s mechanics in the (indistinct). In Spark internal, either we are spill the data into disk to avoid out of memory issue. So usually, our out of memory issue is due to the inappropriate memory usage in our own use code. So, making our memory usage in use code adequately
is the key to solve the out of memory issue. And the second point I want to say is, the data skew is obviously exist, especially in graph data, they are in nature. We cannot avoid it, but the scalability can still be achieved, by different solutions. For example, what we did here is split the huge key, and spill to disk when necessary. This can, the scalability can still be achieved by various kinds of solutions. Okay, let’s go to the optimization and enhancement part.
Before I go to the optimization part, I want to say there a lot of common optimization tips, which has already been shared in the Spark community. For example, we can choose different kinds of speculations to make our application run faster. But, I will not include those details here today. I just want to share one case here, to (indistinct) how we optimize our application with help of understanding the Spark internal deeply. The first case is related to joining selection solution in the Spark planner. As we all know, Spark has different join solution for the the joining up (indistinct).
Please allow me to quickly recap the three join solutions provided by Spark. First one is broadcast join, it indicates a wider shuffle process. The second one is the local is the hash join, hash join can, it can, the shuffle is necessary but it, we are build local hash map for smaller side in the reducer side. And the third one is the sort merge join. It also needs shuffle, and it will sort both partitions in the reducer side,
and can do the merge to get the results. And the right side is the workflow. There are the Spark will follow to the side of which join solution to ask you. First we need to check whether there is some hint in the circle statement that indicates the broadcast join. If there is, it will go to broadcast join. Otherwise it will go check the data size of both tables to see whether it is suitable for broadcast join. If it is, it will go to the broadcast join. Otherwise, it will go check further conditions to choose from the hash join or sort merge join.
In our production environment, we have some join circles negatives.
For example, we have Table A, Table B, both of which are extra large table , and Table B has a lot of partitions, and the we just use one specific partition here, and this partition has small data size, maybe just one megabyte. As you can see, actually this is a join between an extra large table before a small partition. But what join solution will Spark choose for execution?
What’s your answer? (laughs) I think maybe many of you
will choose the broadcast join, just as I did, but the reality is broadcast join is expected,
but the sort merge join is selected for execution. We, I think all of you want to know why. (indistinct) Firstly, we have a circle (indistinct). We will go through the, Spark will use the parser to parse the stream to get the unresolved relation plan. And then it will use the analyzer to resolve the logic plan to get a new resolved logic plan. As you can see, the unresolved relation A and B is resolved to Hive table relation A, and then the Spark optimizer
will be applied to the logical plane sheet, and the filter will be pushed down closer to the Hive table B. As you can see, the both the Hive table relation A and B has a statistic item which is called the sizing bytes, and both of them are one gigabytes. After that, we will go through the Spark strategys including the join section it will go through the Spark strategys including the join section and why the sort merge join will be selected, because the one gigabytes is not small enough for hash join and broadcast join. So, we will choose the sort merge join for execution. Here is some, here is our approach to enable broadcast join, let’s see what we change here. Yeah, here is the (indistinct) that goes to parser, and next one, we will get the (indistinct),
and next, these steps are same, we will go through the analyzer and optimizer.
Yeah, yeah okay, please stop here, what we change here is add a rule, which is called Prune Hive Table Partitions
in the Spark optimizer. What this will do is to prove the partitions and update the size in bytes statistics item, in the optimization phase. And then, we will get the optimized logic plan sheet. As you can see here, we, the statistics of the table of the Hive table relation B is updated to one megabytes , because one gigabytes is the whole data size of the table B. Meanwhile, the partitions we are using is just one megabytes, so here, our rule, we’ll update, we’ll collect the statistics to one megabytes, and then we will go through the Spark strategist, in the join section, we are choose, we are select the broadcast join for execution, because the one megabytes is good enough for broadcast join. Yes, this is our change and the solution to enable the broadcast join. By the way, we raised a polychrest for this,
for this change. And which is, (indistinct) in Spark 3.0. If you are interested just take a look please, thank you. Okay let’s go to another case, this is about the enhancement in our production. This case is related to inserting data frame into Hive table. Just, like you can see, at left side of this slide, here is the DDL of the Hive table. We have 7 columns, and 2 partition columns. And at the right side is the code in our Spark application to used data frame. Firstly, we will new a data frame: df1 from other logic, maybe which is complicated. And then, we will register this data frame at TempTable. After that, we will select a circle, select a credit against this Temptable. And here you can see, we will rename the columns in the data frame to the columns, to the corresponding columns names of the Hive table to make sure all the tables, all the column names matches. And then we will use the data frame right, dot insert it into (indistinct),
to insert the data into Hive table. However, after this code, and (indistinct), there (indistinct) a list generated for us, and the platform team come to us and (indistinct) obligation. You know this is very serious, it’s not a good things. What is reason? Because in this case, the adjust column of the data frame has been mismatched to the counter column.
As you can see, they are in different order. So, this mismatch happened when the data is inserted into the Hive table. And the counter column in Hive table, its partition column. The counter column just have around 200 distinct value, so just around 200 partitions created, but the adjust column has more than 10,000,000 distinct value, so tons of new photos and (indistinct) generated or created in the HDFS, and, which overloads our namenode. So, there is an impact some other applications, and so platform team just come to us to (indistinct) application.
Yeah, you know, well I’m not sure whether some of you have similar experience, when the platform team come to you, and it’s not good thing, and we need to keep our reputation right, if we, if such problem happen, often happens, then maybe we ask for resource next time, and the platform team will just reject us. So, all the (indistinct),
avoid the such problem happen, going forward, we did some refinement in the Spark API,
make our developers avoid the (indistinct). As you can see, the DDL is same, and in the code, as you can see, one (indistinct) is added in the data frame writer dot (indistinct) into API. Yeah, we refine this (indistinct) into API, a little bit by adding a by lane parameter. If the parameter is true, the Spark will do
match the contents between the data frame, and the target table by name, instead of other, which can resolve the problem we mentioned here, and then if we let some column name in our code,
it will also slow each section to remind our (indistinct).
So, by doing so, our developers can pay attention to the potential mismatching, and do the (indistinct), and the collect our code if necessary. So, we have never seen a similar issue after we get this enhancement in our own Spark version, which is running in our production environment. Here’s our lesson learn of optimization and enhancement part. First thing I want to say is could we meet our strict (indistinct) requirement? Last thing is to (indistinct), to optimize our performance, we are willing to do anything to optimize our application. And, during this process, I feel that there, it is really really helpful for Spark application to understand deeply in their Spark internals. It is very helpful for the Spark optimization, and also for the enhancement. Just a tiny misusage may need to very serious impact in some shared service, like the namenode we mentioned here. So, we should be careful, and sometimes, explicit interface can hack us, can (indistinct) of such misusage, and can we avoid this, just like we did here. And all in all, with the continuously performance (indistinct) and enhancement, our performance of, the performance of application are being improved by 4 to 5 times. Finally, here is the learning summary. From, our practice our real cases in production,
in conclusion, we want to share two points here. First one is use memory adequately, as I mentioned, most out of memory issue is due to our own inappropriate memory usage in our code. It’s not the fault of Spark, so we need to use our memory in our use code adequately, to avoid out of memory, to make our, to improve our, improve the scalability of our application. And the second one is really helpful, to understand Spark internal, to understand how it behaves underlying, it is really helpful to avoid the issues, it’s also helpful for the optimization of our application. And finally, I want to say after continuous tuning, we achieved the performance improvement, from 2 days to around 10 days. But, what I want to say is the number is not important here.
The most valuable thing is the learning from our practice, and also I would like to discuss with you all, to share the learnings of my experience, and I would like to discuss with you all, and learn from you all.
Fuwang Hu is currently a MTS-1 data engineer in Paypal Global Data Governance and Regulation Technology, focusing on developing data applications to fulfill the requirements of various business scenarios, including risk management and enterprise compliance. Fuwang has 5+ years' experience on building data applications by leveraging various big data technologies, eg. spark, hadoop, hbase, etc, after obtaining the master degree from TongJi University.
Grace Huang is currently an engineering director in Paypal Global Data Governance and Regulation Technology, responsible for analytical data solutions. By leveraging the advanced big data and AI technology, her teams are enabling international e-payment services thru various data-driven business domains, including risk management and enterprise compliance. Grace has 10+ years' engineering experiences in big data industry after obtaining the master degree from Shanghai Jiao Tong University, where she focused on pattern recognition and image processing.