Saigopal Thota is a Principal Data Scientist leading the Customer Identity at Walmart Labs. His areas of work includes Graph optimization algorithms, developing ML algorithms for Data Quality, Scalable real time, and batch systems. Saigopal has a PhD in Computer Science from University of California, Davis.
In today's world, customers and service providers (e.g., Social networks, ad targeting, retail, etc.) interact in a variety of modes and channels such as browsers, apps, devices, etc. In each such interaction, users are identified using a token (possibly different token for each mode/channel). Examples of such identity tokens include cookies, app IDs etc. As the user engages more with these services, linkages are generated between tokens belonging to the same user; linkages connect multiple identity tokens together. A challenging problem is to unify the identities of a user into single connected component, to provide a unified identity view. This capability needs to extend beyond channels and create true unification of identity.Since every interaction or a transaction event contains some form of identity, a highly scalable platform is required to identify and link the identities belonging to a user as a connected component. Therefore, we built the Identity Graph platform using Spark processing engine, with a distributed version of Union-find algorithm with path compression.
We would like to present the following:
- Hello, everyone, my name is Sudha Viswanathan, today I wanna present this topic with my colleague Saigopal Thota. We are from Walmart Labs.
We will be talking about "Building Identity Graphs over Heterogenous Data." Before I begin, I want to take a moment and introduce my team.
We've been working on this project for the last two years across topic locations. So kudos to my team. Today we're going to be talking about identities and the problems that we're trying to solve around them. And then why we chose graph data structure in order to solve this problem. And then we're going to walk you through how we build and scale this system and tell you why we chose to do this in house. And then we're gonna talk about some of the challenges we faced and then we're gonna wrap it up by giving a brief introduction into real-time graphs. So let's dive in.
Identities at Scale. What do we mean by identities? Anything that helps us to uniquely identify who you are is known as an identity. Or we can call it as a token that helps to trace your identity. So when we log into a website, your log in ID can be an identity. The device from which you log in, the device ID can be your identity, even your browser cookie, it can be an identity. So when a business offers website and multiple apps and allows users to interact with those apps freely using multiple devices throughout the day, it is very easy for an individual to create a bunch of identities for himself. So now, when we think about the amount of identities that is generated by each one of us, throughout the entire population of this country, that can be a huge number. See, the identities that belong to the same customer are scattered across applications and channels, and we cannot have a unified view of a customer. Now why is this even important? Why do we need the unified view of a customer? Because for any business that is customer centric, if they want to personalize the experience of the customer and create meaningful interactions for them, it is very important for them to understand the entire persona of the customer. They cannot afford to work on bits and pieces of information. So for this, identity resolution is very important. So the idea is to collect identities of a customer that may exist on several platforms and stitch them or attach them together so that we get a unified view of that customer.
We can also do it at the household level because for certain businesses like Netflix or toll dash, their accounts are being shared across the family members. So in those cases, it makes sense to collect these identities and stitch them together at the household level. So now having seen the importance of identity resolution, let's move on and see a little bit about graph data structure. Graph is nothing but a combination of nodes and edges. In the case of Facebook, we can think of the users as nodes.
And if a user has a friend, then there's the edge running between the two users. So here in the figure shown, we're considering the customer identities as nodes and the relationship between two nodes is representative form of linkage and any additional attributes are both in node or the linkage is represented in the form of metadata. So graph data structure is going to hold all the identities that belong to a customer in the form of a single connected component along with the relationship information and metadata information.
Now, when identities are scattered across tables,
and we want to link them the very first thing that comes to mind is to use table joints because it is very popular and easy to implement. But when we think about performance, coverage, scalability, flexibility, graph data, that just goes very well. This is because in graph data structure once we know the starting point, we just have to walk through the network and hop on the edges to get all the related data nodes. So there's no lookup here, there's no extensive search and match computation. That is why graph data as such is highly scalable. And thinking about coverage, right? Let's talk about an example customer, all right? Let's say a customer has six identities as shown in the figure. And they are kind of linked only in the manner that is indicated. So we have A1, A2, B1, B2, C1, C2. Now, if we pose a question, something like get me all identities of type C that is linked to the identities of type A, because A1 is linked to B1 and B1 is not directly linked to C1 or C2, in case if we were to use the traditional join approach, for A1, we won't have anything to show. But this is important because, in case if we consider a node type A as user identifies a node type C as device identifiers, and we wanna target A1 for some marketing purpose, we wouldn't have anything to show as device nodes for A1. So this is a miss and this kind of miss cannot happen if you use a graph data structure.
Another thing I like about graph data structure is once they build the graph, we can actually view and query the data in any imaginable way. And it offers a wide range of traversal possibilities. So here in the list of traversal criteria that I've mentioned in this slide, if we were to use a traditional join approach, we would end up writing a sequel for each one of them. And it could involve multiple joins and multiple filter conditions. So now that we have seen why identity distribution is important, and the power of graph data structure, let's dive in and see how we build the system using graph data structure in order to stitch the identities that belong to the same customer in order to create a unified view of that customer. So before we started, we have certain performance objectives.
We definitely want a system to handle very large number of linkages and identities and identities and linkages are going to be created almost every minute. And in most of the cases and metadata surrounding them is going to get updated. We want our graph to be updated at least once every day to start with, and they wanted to leverage our existing Hadoop infrastructure.
So let's dive in and understand the components that make up this system. So the entire system is built using Spark. We start with analyzing the data and then we extract the data and feed it into the graph pipeline. And then we do a series of processing in order to connect in order to form the connected component and make it ready for graph. So let's see each one of them in detail. So the first step is data analysis.
This is a very critical step because this is where we cause the quality of the linkage that gets back into the graph pipeline. For example, let's consider a linkage between user ID and cookie ID. Generally a cookie ID connects to one individual or multiple individuals within the same household in case they happen to share the devices. But we have seen instances where cookie ID connect to thousands and thousands of customers. Maybe the shared computer or borrowed, we don't what exactly it is. But if we are going to allow that anomalous cookie into the graph pipeline, it is going to attract all the identities that belongs to the thousands and thousands of customers is going to create a very huge connected component that is not only difficult to build and traverse from computing perspective, but it's also not going to add any business value. So this is the step where we actually study the data and understand the data distribution and decide a reasonable threshold for each linkage. So an example of what we just discussed, we will probably come up with a threshold, say 30. And say that if we see a cookie that connects to more than 30 customers, we don't even want to allow it into the graph pipeline. So such decisions are made in this stage. The next step is getting the data into the graph pipeline.
So initially the data sources are few and they are manageable. But later on, the number of data sources and variety of data sources, volume of data is going to increase exponentially and they become magnificent and intimidating like the Niagara Falls shown in the picture. So it's very important for us to have a dedicated framework ingest data from these heterogeneous sources in parallel so that we extract exactly what we want in the format of the schema we want, to pass on to the subsequent stages. So moving on to the next stage is called the core graph processing.
So this processing involves three stages. So in the first stage, the aim of this stage is to feed only good quality linkages for the processing. So any source will have a combination of good and bad linkages. From the past processing, we will already have a list of known bad linkages. So we use that list and compare with incoming data and incoming data has any of those new back linkages we need them. Now we are left with good and some unknown bad linkages. This is where we use the threshold that predesigned the data analysis stage. And we apply the threshold and say for example, if we see a linkage with the anomalous cookie that connects to more than 30 customers, we just wanna filter it out. And we don't want to allow it to go within the graph pipeline. So now that we have applied the threshold and remove the unknown back linkages, we are still left with some bad linkages that lies within the threshold that escapes into the graph pipeline, and they pose their own challenges. We're going to talk more about it in the future slides. So apart from this, we also eliminate the duplicating cases in this stage and we add the edge metadata. This helps us to traverse the graph in the later stages. Moving on to the next stage.
This is the stage where we merge all the related linkages of a customer and create a connected component. And so the algorithm that is used to do this, is going to be discussed in detail in the coming slides. Moving on to the next stage. Now after the second stage, we have the connected component. But it doesn't have any relationship information or metadata information. So it kind of looks like this. Now, it is in this stage where we actually enrich the connected component by adding the relationship information and metadata information, make it ready for traversal. So now that we have seen all the three stages, let's go ahead and just sum it up. So we have three stages. In the first stage, we see the linkages are coming, so we eliminate the duplicates, we filter out the back linkages, and we add H metadata, and then we move on to the second stage, where we actually merge all the related linkages and create the connected component. Then, in the third stage, we actually introduced this connection component by adding the relationship information and metadata information make it ready for traversal. Now let's talk in detail about the algorithm that is used in the second stage to create the connected components. So this algorithm is called Union Find Shuffle.
it is built on top of the standard Union Find Algorithm Let's first understand how Union Find Algorithm works.
So, let's see we are first getting a linkage to nine we seen this linkage for the first time, so either we can make two as a child of nine or we can make nine as a child of two. Now let's see we are getting another linkage, two and five. So the very first thing that happens Union Find Algorithm is the FIND operation, where we find the topmost parent of every node within that linkage. Now if the topmost parent of both nodes are same, they belong to the same connected component so we don't have to do anything. But if the parents are different, that means we need to clap those two connected components, this operation is called Union. So here in this example, for two the topmost parent is two, and then for five, the topmost parent is five. So now there are two ways of unifying them. Either we can make five as a child of two, or we can make two as a child of five. Now, if we make five as the child of two, we can see that the height of the connected component is shallow. This is very important because keeping the height shallow improves the time complexity of future find operations. So as a rule, we will always make the parent of the smaller component point to the parent of the larger component. Let's see another example.
Let's say we are getting another linkage seven and eight. We're seeing this for the first time. Now we're getting another linkage with five is connected to center. So, for five we already know that it is a part of this cluster of the topmost parent is 2. For seven it is the parent of itself. Now, we will follow the Weighted Union and we will mix seven two point this two. So, now the cluster looks something like this. Now, let's say we are getting another linkage eight one. So, one way of seeing it for the first time and eight we know it is a part of this big cluster and the topmost parent is two.
Now we will follow data union and we will make one as a child of two. Now, we will also do one more additional work here, and we will need the node eight point directly to two. So this operation is called path compression where we meet the leaf node and all the other nodes between the leaf node and the topmost parent and make all of them point to the topmost parent. So this effectively compresses the path between the parent and the leaf node. Hence is known as path compression. And this further has to include the time complexity of future find operations. So, now the cluster looks something like this.
Yeah, so it is path compressed and the tree looks channel. So weighted union along with path compression helps us to create connected components in a very efficient way of linkages that is coming either in the streaming session or in the batch context.
So, now, I would like to hand it over to my colleague, Saigopal Thota who will walk you through the main presentation. Thank you so much. - Thank you Sudha, hello everyone. Today, we are going to talk about the distributed version of Union Find Algorithm. Even though as I said, I mentioned that the Union Find is a very efficient algorithm for building connected components to be able to scale to up to 30 billion as mentioned in our performance goal, and to be able to run it in a distributed fashion as well as in parallel.
We have built a distributed variant of Union Find called the Union Find Shuffle algorithm. I will describe the algorithm in the next few slides. So, this algorithm is divided into four parts. So, in the first part we divide all the linkages data that is available for creating connected components into multiple partitions then and to be able to run in parallel and on each of these partitions will run the Union Find algorithm locally with Union Find compression. So, what happens at this stage is, if a node has linkages that fall into multiple partitions, each of these linkages become part of different connected components creating fragmentation. So, I will explain that in the in this demonstration here. So, we start with a set of linkages which im divided into multiple partitions as mentioned earlier. So, when we run a local Union Find what we see is the connected components that are created as shown below. So here if I would like to highlight a couple of nodes, so if you see node number nine, number nine is present both in the first partition as well as in the second partition, and the number seven is present in second partition as well as third partition. So what happened here is because of the localized knowledge, they got fragmented into different connected components. So that is where we introduced the third step shuffle, which was not present in a traditional Union Find algorithm. So what happens in shuffle is, we do a series of iterations to bring nodes that are part of different partitions together, and we make a decision based on a termination condition. So here I'm highlighting two examples, node number nine and node number eight. So node number nine, we know is a part of two different connected components with the parent node four and parent node five, whereas eight, the node eight is unambiguously connected to only one component with node three. So, this is what we call as the termination condition. So, in this case, the node eight has reached a termination condition, whereas, the node nine is still yet to be processed to find its final parent node which where it belongs to unambiguously belongs to one connected component. That's why it proceeds to the next iteration. So, here are a couple of examples of showing like nodes which have more than one parent and needs to be proceeded further iterations whereas a number six which feeds its termination condition. So, the beauty of this shuffle algorithm is that at every iteration a significant number of nodes will reach their termination condition and they don't need to be proceeded further into other iterations. So what happens is, irrespective of how much volume of data we start with in terms of number of linkages, with each signal successive iterations, the volume reduces significantly, almost at a logarithmic level. And that's what makes this algorithm scale to any like billions of nodes to begin with. So the combination of shuffle and termination condition, it's followed by the path compression as described earlier.
This is what makes up the entire Union Find Shuffle algorithm. So in this slide like we will talk a little bit about how we implemented Union Shuffle using features of Spark. So as you've seen that like all the way from creating multiple partitions and the shuffle iterations, there is a significant amount of data processing and data transfers that happens. And spark has provided a great abstraction for us to be able to do this with the bulk synchronous processing, almost to be able to scale to 25, 30 billion scale. The second part is the shuffle and the termination condition steps. The item steps are resource intensive in terms of I/O, because at the end of each iteration, we are saying that a significant amount of data needs to be persisted to disk and not proceeded to further iterations. So, to be able to make that part efficient, so we have taken advantage of features such as caching and intermittent check-pointing available with Spark. This has helped us to improve the runtime significantly. So, this has also helped us to overcome some of the limitations that we have seen with other alternatives. To talk a little bit further about how we implemented the algorithm. So, we have divided all the linkages in the 30 billion nodes into thousand part files. And we have created 10 partitions to begin with, each containing hundred part files of equal size approximately. And we would run Union Find in parallel for all the parts and we have multiple instances like at any point of time, we'll be running five instances of Union Find to do the local Union Find and then the number of iterations too until all the nodes in the graph reach the termination condition of being belonging to one connected component and back-to-back condition after that. So, as of this point like, we have dealt with the scale aspects of our algorithm and were able to reach that goal. Then interestingly, we have seen a myriad of data quality challenges. For any internet based application, businesses like when data is collected, you would see inherently some noise creeping into the data. And interestingly, the noise gets exposed when you build a graph data structure, as opposed to when you have the data in the form of relational database. I would like to explain this, "the noise in the data" with an example.
So, the scenario that I would like to talk about is let's take an example where a user creates an account on any internet application.
And over time, a significant number of cookies get associated with this account. This is the picture that you see on the left side, where for the one customer account, you would see cookies going from one to 100. There are multiple scenarios, because of which this could be happening. It's either the customer is using multiple devices, multiple browsers, or clearing their cookies once in a while, this could lead to the account getting associated with multiple cookies. The opposite scenario, what you see on the right side could also happen where one cookie gets associated with 10s of hundreds of accounts as well. So when does this happen? This could be happening in case of shared computers or public computers in a hotel lobby, or in an airport or public library, where if multiple people logged into the same browser, at different points in time, they would all get connected to the same token. So just to go further on this argument, so let me show you a distribution of data in the form of a histogram.
So here what we are showing is like that one ID could belong to a one to five IDs.
In most of the cases, there is almost in the range of three to nine million and six to eight IDs, in some cases nine to 12. So pretty much reiterating the point that my colleague has mentioned earlier, like, if you take like up to 30, or 40, or 60, as the acceptable range, like most data falls into that, but what you see is the noisy data towards the bottom. It's a long tail, where one ID belongs to 100,000 IDs or like a million IDs also. So these are what we call as anomalous linkages. And if we see here, in the next slide, we would see that a histogram bar chart showing the magnitude of what is an acceptable range of IDs, and what most of the data looks like and what a long tail of data seems to be. So, the point here is like why do we need to care about these anomalous linkages? If we do not take care of these anomalous linkages at the data quality rating level, this noise will creep into the graph, the connected components that we would build and would not give an accurate picture of what the identity graphs of a customer are.
So how do we tackle with these anomalous linkages is so we have built a Gaussian anomaly detection model, which helps us to see what is the normal distribution of data for any linkage that we are receiving what is the normal average of what normal distribution of data and what is an outlier on an anomaly. So this helped us give some thresholds as guidelines first to eliminate and remove some of these anomalies linkages before they even go into the draft processing. So, at what cost? So if we remove our remaining linkages, that means we are losing some kind of data, some information about the customer. So this trade off between coverage and precision, which we will talk a little bit in the next slide. So here like what we're demonstrating is, for the same amount of data, if we keep a smaller threshold where you would be stringent and remove any IDs that have connections to more than 10 IDs remove them, what you see is the picture on the left, there are multiple small, precise connected components belonging to individuals, but there is one concrete component which is huge. Whereas if you keep the threshold at thousand by keeping more linkages in the system, what you would see is that the number of large connected components like the big connected components have increased from one to four. So and the number of precise connected components have reduced. So now this is a good segue into introducing the concept of large country components.
So irrespective of what threshold you pick, there is still a possibility that large connected components get formed. So depending upon the threshold, the size ranges anywhere from 10,000 to 100 million. So there's generally a combination of hubs in long chains of nodes, what you see in the picture on the right side. So here, what you see as red dots, really small red dots are each node of the graph and the linkages. And when they are all processed through the connected component algorithm, they form these big connected large connected components. As discussed earlier, the reason for that is either token collisions or noise in the data or bot traffic and et cetera. The important point why we should even care about this large connected component is that legitimate users could also belong to large connected component. So let's say when there is a customer who have like a few IDs and who's a user of system, and once if the user shared a computers or shares a device, their connected component would get pulled into this large connected component. So that is the reason why we need to build and traverse these large connected components. So, as you've seen the Union Find Shuffle algorithm like we have, it has to go through multiple iterations for a large connected components of this magnitude to converge and for all the nodes to have the same parent node and belong to the same connected component, but the Union Find Shuffle algorithm with our implementation on spark like takes care of this problem and is successfully able to create the connected component.
The second part of the problem is traversing the large connected component. So, as businesses and use cases demand, what position and coverage they give us whether the customer is in the small connected components with the precise connected components in small size or if they become part of the logical component, we should still be able to traverse and get the data. So for that we have implemented that first search implementation using Spark and we'll be able to traverse the last connected component as well. So with the implementation, we could go to a certain predefined depth in the last hundred common because we want to restrict how far down we want to go as part of the BFS. So in putting it together, the entire traversal on the graphs that are created using 30 billion nodes are not finishes within 20 to 30 minutes overall. So putting this all together like to give a picture of the entire graph pipeline, all the way from handling heterogeneous linkages, where we in stage one where we clean up anomalous linkages based on the thresholds and we using inferential for them to build the connected components.
At this day, you know, if the connected components are like which connected components are small and precise, and which of them are for large, connected components, so accordingly, we process the data.
So the pipeline that you see at the bottom of the screen is what is dedicated to process the large connected components both for creating it as well as for traversing it.
So before we conclude the presentation, I would like to give a sneak peek into how we would solve this problem or how we are currently working on solving this problem if in case of real time graph. So what does it mean by real-time graph? The linkages are coming in a streaming fashion as opposed to a batch. And we would need that the graph gets updated for different connected components in real time. So what kind of challenges lie ahead? So, few challenges will be if link are coming in a streaming fashion and if we would want the graph to be updated in real time, one of the problems that we see are challenges is concurrency.
That means what if there are two linkages coming for the same graph at the same time, so how do we handle and make the connected component to have a consistent and the right output without leading to deadlock starvation in creating of the connected components. So that's an important challenge. The second one is scale. When we're talking about the scale of 30 billion nodes and linkages, the data coming in, let's say click-stream data of browsers, people browsing on a web page, and et cetera, we would need to, the data needs to be sharded and separated into multiple partitions, which makes replication an important aspect for fault tolerance. So when we have sharded and replicated the data, data consistency becomes an important challenge to be able to keep the data consistent across all partitions and as they're getting updated as well. The third part is, as we have a real time system, we need to provide a high throughput API's for users who want to do real time querying and traversals. So being able to provide high quality, high throughput, querying and traversals while the graph is updating in real time and keeping the data consistent. These are the inter-tangled challenges that we are currently working on solving with respect to building real-time graphs. With this, I would like to conclude the presentation.