At Microsoft, we store datasets (both from internal teams and external customers) ranging from a few GBs to 100s of PBs in our data lake. The scope of analytics on these datasets ranges from traditional batch-style queries (e.g., OLAP) to explorative, ‘finding needle in a haystack’ type of queries (e.g., point-lookups, summarization etc.). Resorting to linear scans of these large datasets with huge clusters for every simple query is prohibitively expensive and not the top choice for many of our customers, who are constantly exploring (and demanding!) ways to reducing their operational costs – incurring unchecked expenses are their worst nightmare. Over the years, we have seen a huge demand for bringing ‘indexing’ capabilities that come de facto in the traditional database systems world into Apache Spark.
Among many ways to improve query performance and lowering resource consumption in database systems, indexes are particularly efficient in providing tremendous acceleration for certain workloads since they could reduce the amount of data scanned for a given query and thus also result in lowering resource costs. In this talk, we present our experiences in designing, implementing and operationalizing Hyperspace, an indexing subsystem for Apache Spark that introduces the ability for users to build, maintain (through a multi-user concurrency model) and leverage indexes (automatically, without any changes to their existing code) on their data (e.g., CSV, JSON, Parquet etc.) for query/workload acceleration. We will cover the necessary foundations behind our indexing infrastructure including the API design, how we leveraged Spark’s Catalyst optimizer to provide a transparent user experience and also discuss our development roadmap as we work towards open sourcing our work for the benefit of the broader community. Through presentation, benchmarks, code examples and notebooks, this will be one fun session, so come join us as we get started on this journey.
– Hello, everyone, I hope you’re having a wonderful day so far. Today, we’re gonna be talking about Hyperspace: An Indexing Subsystem for Apache Spark.
Before I begin talking about the details about the project, let us introduce ourselves. My name is Rahul Potharaju and I am joined here by my awesome colleague, Terry Kim. Both of us are from Microsoft. We are a part of the product that Microsoft has launched recently called Azure Synapse Analytics.
We are part of the Spark team at Microsoft and it’s probably obvious by now, but I’ll just say it. We work on everything Spark. We offer Spark as a service to Microsoft customers, which includes both internal customers, like Office and Bing, and also external customers. Where possible, we contribute back to Apache Spark and we open source majority of our work.
Today, I will be covering the background of the project, the vision, some of the concepts that help you understand what’s happening inside this project, why is it something that you have to care about? And, my colleague here will showcase some real world examples to hopefully convince you by the end of this talk that Hyperspace is an awesome pack.
So, before we dive into anything technical, let’s just start with the most foundational question, which is, what is an index?
I can give you the most obvious answer from the textbook. In fact, I just lifted this out of a textbook. In databases, an index is a data structure that improves the speed of data retrieval, which, in other words, its query X completion, at the cost of additional writes and storage space. But if you are anything like me, then a real world analogy would be tremendously useful in just understanding what really is an index? So, let’s take another crack at it.
Right from since we were kids, you might remember pretty much going to the back of your textbook, trying to figure out where exactly is a particular key phrase appearing in the textbook and, at that point in time, you might have come across an index from the back of your textbook. For instance, if I wanted to find where in the textbook the phrase nested-loop join up, I would quickly go to the section which has every word starting with the letter N and I would look up nested-loop join and I will immediately see that it’s up here in between 718 and 722 page in the textbook. And, that’s an index for you. In short, one can imagine an index is a shortcut to accelerating some of the queries. So, let’s begin with the Overview of Hyperspace.
We have some pretty broad goals in Hyperspace. To begin with, our first and primary goal here is to be agnostic to data format. Our second goal is to offer a path towards low-cost index meta data management. What do I mean by this? Well, we wanna make sure that all of the index contents, as well as the associated meta data, is stored on the lake. And, this does not assume any other service to operate correctly. Our third goal is to enable multi-engine interoperability.
Four, we wanna enable extensible indexing infrastructure. In other words, what you see here today is more of a beginning than an end.
And, finally, we wanna make sure that we satisfy all of security, privacy, and compliance requirements.
At the bottom most, as you can see here, there is a data lake and, on the data lake, there are datasets that you already have or potentially structured, like Parquet, or even unstructured, like CSV or TSV.
The biggest assumption, as I’ve layered in my earlier slide, is that we assume that the index is also living on the data lake, both the contents and the meta data. Our first area of investment is the indexing infrastructure. This is pretty important and you can think of this as the layer that has no intelligence, but it is providing you raw APIs in order to be able to do everything that you can with Hyperspace. To begin with, one of the first pieces of work that we’re investing in is index creation and maintenance APIs. To name a few, things like Log Management API. So, how do you ensure that you’re able to provide concurrency across multiple engines on the same index? Once we build all of the index creation and maintenance APIs, next comes the complimentary dual of the indexing infrastructure, which is the query infrastructure. This whole work deals with, how do you ensure that an optimizer is index-aware? Today’s optimizer in Spark is not really aware of what an index is, for instance. So, as part of this work, what we’re trying to enable is a transparent query rewrite in the background, which is to say, if there is an index and we figure out that the index will potentially provide any kind of acceleration, then we transfer in and rewrite the user’s query. And, one of the most elegant benefits that you end up getting with this is users do not have to change a single line of code, other than setting a configuration to Spark of.
And, finally, do we want all of this in a manual way? Obviously, not. I think the Holy Grail of all of the world of indexing is index recommendation, which takes in a user’s query workload and provides them with a recommendation of top query indexes that they can build in order to expect some kind of acceleration in their workloads. Let me take a moment to just show you how simple using Hyperspace is going to be.
Today, we offer Hyperspace in three languages. One is Scala, second, Python, and third is .Net. Although, this slide shows you the APIs in Scala. The first set of APIs we provide is the Usage APIs and, as expected, using Usage APIs, you would be able to create, delete, restore, vacuum, and rebuild indices. The second set of APIs that we offer is the Smarts. If you have used Spark, then you will probably already know about df.explain, or DataFrame.explain. What it does is it takes your query, it produces the logical plan, the optimized plan, the analyzed plan, the physical plan. This Hyperspace.explain is built with a similar philosophy. What do I mean by that? It can take your query and it’ll give you a very cool dif between how it has figured out what indexes it has used and it’ll show you what exactly are the indexes used, how did the plan end up changing?
The two other APIs that we plan on introducing
in a couple of weeks is the whatIf API, which allows you to hypothetically test whether an index will benefit your query or not. And, the final Holy Grail of the Smarts, which is the recommend API, which will take a workload from you and provide you with a list of top query index recommendations that you can then go ahead and build in the background. One question that might be lingering in your mind at this stage is as I’m using all of these indices, where exactly are the indices getting created?
Now, the cool thing about what I mentioned and laid out in the goals and vision of the project is the indexes live on the lake. Why is this important? It’s very interesting because now if you start looking at your file system, like HDFS or maybe ABFS or even AWS S3, for instance, imagine you have your data laid out as I have shown in the bottom that’s basically just some path to data files. Now, the moment you start creating an index, what you will observe is the indexing API create index, takes a name for the index, and once you provide the name of the index, it creates a folder and the folder contains all of the information in a self-describing way to point back into the original dataset that you have created. Why is this important? You need to capture pretty much every detail of what exactly you have considered on the original dataset at the time of indexing because you wanna be able to detect if the dataset changed from the time you created the index to ensure that you’re not providing any incorrect inserts. And, I’m sure you must have noticed at this stage, the presence of something called a Hyperspace log. Obviously, it’s in a different color, so it’s probably something of interest. Hyperspace log is a progress log, or an operation log, which is the heart of our entire design. When you initially create an index, it captures the operation create. If, let’s say the underlying data ended up changing, then perhaps you might want to run a refresh. Once you do a refresh and the refresh completes, what ends up happening is the index has moved back into an active state. And, there is a pointer that gets laid out into, in this case, index-directory-2 and index-directory-3. What you will notice is, hey, wait a second. Why are we seeing index-directory-1, 2, and 3? That’s because we maintain different snapshots and different versions of the index to enable multi-deleter and multi-writer concurrency.
And, one of the very interesting aspects of having the index on the lake is it provides several benefits. Just to name a few, one, now index scan scales. The second biggest benefit is our index today is laid out in an open format and that open format is Parquet. And, one of the major advantages you get, if you have been lingering around in the Spark community, then you already probably know that there’s probably close to a decade worth of investment in making sure Parquet, as a format, is highly optimized. And, you don’t need to worry about it anymore. Whatever optimizations are present for Parquet, you automatically end up kinda leveraging all of those optimizations as part of the index itself. And, finally, as you can tell, we have enabled a serverless access protocol. What do I mean by this? It’s simple. Hyperspace does not really depend on any external servers.
So, before we move onto my colleague doing a demo, I’d like to introduce Azure Synapse Analytics, which is a product that Microsoft has released recently and Hyperspace is built out of box inside Azure Synapse Analytics. So, now I’ll switch over to my colleague, Terry Kim, who is going to show you the end-to-end experience of how Hyperspace looks like and how you can use it in production. – [Terry] Okay, in this demo, I’ll go over Hyperspace APIs for creating and using indexes. I will be using Azure Synapse Analytics, which comes with Hyperspace out of the box. And, we’ll share this notebook after the session. Okay, let’s get started.
First, I’m going to create two DataFrames. One for employees and one for departments. And, here are the contents of the DataFrames. Employees DataFrame has employee ID, employee name, and department ID. Departments DataFrame has department ID, department name, and location. And, these are the two DataFrames I’ll be using throughout this demo.
Now that we have DataFrames to work with, let me create a HyperSpace object, which has methods for creating and managing indexes. Next, I’m going to create a few indexes. To create an index, you need to have an IndexConfig object and specify name, indexedColumns, and includedColumns.
IndexedColumns are the columns used for join or filters. IncludedColumns are the ones used for a selectable project. Once you have this IndexConfig object, you can just say hyperspace.createIndex and pass the DataFrame which you want to build index from along with the IndexConfig object. In this example, I am going to create one index for employees DataFrame and another one for departments DataFrame. And, the reason I chose department ID as an indexedColumn is that I’ll be joining employees DataFrame with departments DataFrame on this column.
We also have an API for listing indexes we created.
To do that, you will do hyperspace.indexes.show and this will display all the indexes that are created. These are the two indexes we just created. And, this also has more information about the indexes, such as indexedColumns, includedColumns, indexLocation, and so on. The object returned by indexes is a Spark DataFrame, so, if you want, you can perform any DataFrame operations on it. Now, let’s start utilizing the indexes we just created. Before doing that, I’m going to disable broadcast join because the indexes we created are applicable to SortMergeJoin. This is the join query we will be executing.
I’m joining employees DataFrame with departments DataFrame on department ID and selecting employee name and department name. And, I will be showing physical plan, and then the results of the join.
And, this is the output. So, this is the physical plan of joining two DataFrames. So, here we see SortMergeJoin and ShuffleExchange. And, this is the results of the join. Now, finally, let’s enable Hyperspace. How do we do it? We just say spark.enableHyperspace. And, what it does is it’s going to inject the Hyperspace optimizer rules and starts utilizing the indexes available. And, we will be executing exactly the same join query. And, this is the output.
However, I think it’s a little hard to see what really changes after enabling the Hyperspace. And, that’s where the Explain API comes into play. So, here we do hyperspace.explain and pass the DataFrame. And, it will give you an overview of what changes if Hyperspace is enabled. And, this is the output.
So, this is the output from the join query that we just executed. And, this is the physical plan with Hyperspace on. And, this is the physical plan with Hyperspace off. And, the highlighted portion is the difference. And, you will see that Sort and Exchange have been removed from the physical plan with Hyperspace on. And, this also lists the indexes that are used and these are the two indexes we created in this demo. And, finally, it also shows you the physical operator stats. For example, what this saying is with Hyperspace disabled, there were two ShuffleExchange physical nodes. But with Hyperspace enabled, the number of this node goes to zero. And, this is saying for the Sort physical operator.
Hyperspace also works with SQL commands. And, this is equivalent to the join, using the DataFrame APIs. So, basically, this is joining these two tables on the department ID. And, this should give you the same result, as expected.
Okay, let me switch gear a little bit and talk about some of the APIs for managing indexes. First is Delete Index. If you wanna delete your index, you can do hyperspace.deleteIndex and pass in the name of the index you want to delete. This will be a soft-delete, meaning that it’s not going to remove the index from the file system. If I look at the output here, you will see that the state has changed from active to deleted. And, once the index goes to a deleted state, it’s not going to be picked up by Hyperspace. And, this API is useful if you want to disable some of the indexes temporarily. Next is Restore Index. So, this API is used if you want to bring back the deleted index. Let me show you the output. So, if you perform restoreIndex, it will change the state of this index from deleted to active. And, once the index goes to active state, it will be used by Hyperspace once again.
Next is Vacuum. Vacuum is a hard-delete operation, meaning that it will physically remove the index from the file system. The prerequisite is that you have to first delete the index. So, this is the sequence. So, you first deleteIndex and you will vacuum the index. And, here is the output. The index goes from active state to deleted state and when you do vacuumIndex, it will be removed from the file system.
And, last, but not least, we have a Refresh Index API. Suppose we have the employees who were added to our original data. Now that the index and data are out of sync, Hyperspace is smart enough not to use the index because it will give you a wrong result. However, you can use the Refresh Index API to rebuild the index and once the data and index are in sync, Hyperspace will use the index. And, that’s the end of the demo and you can switch back to the slide. – Thank you so much, Terry. Wasn’t that demo exciting? I’m itching to actually try this out. In fact, I’ll provide you with pointers in the later half of my presentation, how you can try this Hyperspace thing on your boxes today.
The covering index. The covering index, in other words, creates a copy of the original data in a different sort of way. So, there’s no black magic. Like I mentioned at the beginning of my talk, an index provides query acceleration at the cost of additional writes and storage. So, let’s take one example just to make sure that begun with a concept theory. Imagine a table having three columns and you are trying to execute a query. This query is super simple, it’s a SELECT b WHERE a equals Red. The only way to execute this query would be to do a full scan on top of this table and this full scan would end up taking linear time. Obviously, I’m not considering optimizations, like min-max, but I’ve tried to dumb it down and simplify just to get the concept across. Now, you can technically build a covering index on column a and include column b. Now, when you re-execute this query that I’ve shown you here, SELECT be WHERE a equals Red, now you can resort to doing a binary search and this takes logarithmic time. Well, the concept is pretty simple. It aids tremendously is doing things, like filter acceleration. Not convinced that it works? Well, let’s go into something more complex now. Imagine now you’re joining two different tables, Table A and Table B, on column A. And, assume that you are running this query, SELECT b, c where you’re joining on the column A from both the tables. Without an index, Spark will decide to shuffle both sides of the table into the same number of partitions and remember that the data is not sorted, so, as part of Step 2, Spark ends up going and sorting both the sides before it decides to go ahead and do a merge in Step 3. With an index, with the right index as well, the Hyperspace extensions that we have built enable the Spark optimizer to pick the right index. And, remember, the index is already pre-shuffled and pre-sorted, so Spark can happily move into Step 2, which is the merge step. And, I’m pretty sure you must have noticed by now that the shuffle got eliminated. And, since shuffle is the most expensive step, potentially, in a query, now this query can scale to much larger datasets and potentially run even faster. Now, having already with these two particular concepts, let me switch back to my colleague to show you some of the deep-dives into real world complex workloads and hopefully convince you that we can really get some nice acceleration. – [Terry] Okay, now I’ll do a deep-dive onto queries in TPC-DS, which is one of the well-known big data benchmarks. And, I’ll show you how much Hyperspace can speed up those queries. First, this is Query 93, which is a fairly simple query. You are joining two fact table and dimension table, followed by aggregation. Now, let’s take a look at how the execution plan looks in the Spark UI.
This is the plan executed by vanilla Spark and, as you can see, this is a join of two fact tables, followed by a join with a dimension table, followed by an aggregation. For the first join, we see that there were about 140 gigabytes of data being shuffled and sorted. This is a good opportunity to utilize the Hyperspace cover indexes to eliminate the shuffle. Now, the window on your right is the plan executed by Spark with the right set of indexes created. As you can see, the shuffle and sort have been removed, but the rest of the plan remains the same. Let’s compare the execution time. This benchmark was run with scale factor 1,000 and the data is stored in Parquet. The execution time has significantly gone down from six minutes to 32 seconds, giving you about 10X speedup.
Next, let’s take a look at Query 64. This is a fairly complex query with multiple joins and aggregations across about 15 different tables. So, let’s take a look at the Spark UI for the executed plan. Let me zoom out a bit to show you the overall plan. And, sure enough, it’s a complex one.
If I scroll down a bit, I see shuffle and sort of about 110 gigabytes of data.
50 gigabytes of shuffle.
110 gigabytes of shuffle.
50 gigabytes of shuffle and so on. After creating and applying the indexes, this is the new plan on your right. I will make it a full screen. The big shuffle and sort have been removed here, here, here, and here.
This is an interesting scenario where we create an index. If you cannot create a cover index on one side of the join because it involves aggregations, joins, and so on, you can just have the cover index on one side if applicable. And, you can also eliminate the shuffle for that side.
Finally, let’s compare the execution time. It went down from 10 minutes to 3.4 minutes to give you about 3X speed up. So, we just took a brief look at the speedups in two of the TPC-DS queries. Now, Rahul will talk about the improvements we observed across all the TPC-DS queries. – Thank you so much, Terry. As you’ve seen, Terry has just demonstrated Hyperspace acceleration on a couple of non-trivial queries. I’m pretty sure the moment he actually showed you this whole plan zoomed out, at least I zoomed out. I zoned out in fact.
Thanks a lot, Terry, now you’ve made me even more curious to try this out. Now, as he has shown you a deep-dive into two queries, let me talk about Hyperspace performance in general.
The Compute Configuration that we’ve used to test this out is on the left for anybody who is interested in improving some of our benchmarks. At the top is a graph that shows a workload evaluation derived from TPC Benchmark H, or, in short, TPC-H. The scale factor that we’ve used is thousand. And, the evaluation was performed using Apache Spark 2.4. We will end up getting support for Apache Spark 3.0 subsequently. And, remember one very interesting thing. All of these base tables of TPC-H are layered in Parquet. And, as you already know, Parquet is a highly efficient columnar format and all of the acceleration that I’m about to show you in this workload, as well as the next one, is on top of Parquet data. The graph here, the x-axis shows the TPC-H query number. TPC-H has 22 queries and that’s what you see here. And, the y-axis shows the duration that it took to execute that query. And, the dotted line that you see here indicates the gain that we achieved by using the indices in this case.
I want you to observe this and just digest how much gain we were able to get. Obviously, you can see a spectrum of gains. You see gains as low as about 1.1X, but you can also see gains as high as 9X. Now, let’s look at the second workload derived from the same TPC Benchmark DS, or, in short, TPC-DS. For brevity, we just show the top 20 queries. TPC-DS in our workload consists of a total of 103 queries. And, what you can notice here is a similar pattern. Again, there are no regressions and, in fact, up to 11X gains to be had. The order of benchmark is really key because it conveys one very important aspect, which is we don’t regress in any of the queries. So, if you look at both TPC-H and TPC-DS, one thing that’ll immediately stand out is we’re getting a pretty high gain of 2X and 1.8X, respectively. I’m also pretty thrilled to announce that we are open-sourcing Hyperspace as of today.
I agree, it’s version 0.1, it’s a start. To recap, Hyperspace is an extensible indexing subsystem for Apache Spark. It’s a simple add-on. We have not changed anything in Spark core. And, rest assured, you can feel thrilled that it’s the same technology that powers the indexing engine inside Azure Synapse Analytics. And, it works out of the box with Apache Spark 2.4 and the support for this, Apache Spark 3.0, is going to come in the next few weeks. And, we offer Hyperspace in three languages. No compromises, you can choose whatever language you feel comfortable with. Give us feedback, what would you like to see in the upcoming versions? And, we’ll build it for you. And, today, you can access, you can download, you can build. You can also use Hyperspace from one of these two URLs. They both will take you to the same place. Hyperspace is currently on GitHub. You can feel free to explore the core base. Give us suggestions in terms of what you’d like to see.
Having said that, I would also like to take a brief moment thanking everybody who made us reach this point in time.
In this slide, I’m going to discuss a couple of areas that we are planning on investing and probably also areas of opportunities where we could collaborate. The first area is metadata management and lifecycle. Do you like what you’re seeing? Did we capture enough metadata, for instance? The second area of investment is indexing enhancements. Well, what we demonstrated today is indexing all immutable datasets, or stacked data. How would we do the same thing for incremental indexing? What are the things that will change? What is the performance implications? How would you add support for updatable structures, like Delta Lake or Uber’s CoDi, for instance?
The third idea of investment is optimizer enhancements. Like I mentioned, this is a very humble beginning in this space. We are by no means experts at optimizers and this is where we could definitely use your help in making sure that we are taking the right decisions while optimizing queries. And, one of the idea of active investment that we’re exploring is query explainability. Why did you use a particular index? Why did you not use a particular index? The fourth idea of investment is more index types. More on this in the next slide.
The fifth one documentation and tutorials. Well, indexing is not a silver bullet and, in all honesty, we would like to tell you that indexes are always going to work, but I can’t really tell you that. You should take care while using indices, making sure that they really accelerate your workload. There are lots and lots of questions you need to ask yourself before you go ahead and decide to build an index. This is what we want to be able to capture as part of our best practices, gotchas, and we wanna be able to run more reproducible experiments for our community to try out.
And, finally, the Holy Grail.
When my awesome colleague, Terry, was showing you some of these cool demos and I was showing you some of the performance evaluation across all of these complex queries, I’m sure one thought must have struck across your mind, which is, wait a second, how did you decide what indices to even build in the first place? That was done through a prototype that we have implemented over the last few months in the form of an index recommendation engine. It’s quite simple today. What it does is it takes your workload and recommends the top query indices that you can go ahead and build. It’s not perfect, but we plan on open-sourcing that over the next few weeks. Let me answer the final question, which is what type of hyperspaces can we build together? Today, in our talk, we covered one type of a Hyperspace, which is the covering index.
But, in Hyperspace, it is very important to remember that the index term is used extremely broadly to refer to a more generic class called a derived dataset. In short, a derived dataset, well, as the name states, was derived from the original dataset, but it aids in providing some hints to the optimizer to accelerate the query. That’s the simple definition. So, what are some other types of hyperspaces can we build together? Three of you only have covering indexes, but we plan on going and implementing more types, like the chunk-elimination index, which is something you can use to do pointed lookups, or the materialized views, which is an entirely complex area, but it’s also very, very useful, especially when you have canned queries that you’re running and you have multiple users who are doing this. And, finally, also provide potentially statistics and maintain them so that our cost-based optimizer can utilize them to provide you with high query acceleration. With that, I’d like to conclude my talk by mentioning a couple of key things. Number one, today, I’m super thrilled to announce the open-sourcing of Hyperspace, a tech that we have built as part of Azure Synapse Analytics. It’s, albeit, version 0.1. It’s obviously not ready for you to use in production. It’s just our humble attempt at giving back something to the community. And, point number two, the tech was built without really modifying a single line of Spark core, which gives you some tremendous powers in terms of putting it on your own Spark clusters without having to worry about deploying Spark itself. And, because the tech is also deployed as part of Azure Synapse Analytics, rest assured, you can feel safe that there is a tremendous amount of product investment that’s going on in it. And, point number three, we’ve demonstrated a very preliminary acceleration on some of the key workloads. And, of course, I’d like to emphasize, this is more of a beginning. We might be making some mistakes. There is definite scope for improvement. It’s not perfect, but that’s why we really need your guidance in making sure that we evolve Hyperspace into something that our users would truly love and use in their day-to-day workloads. And, last, but not the least, I do have the URL right here on the right side bottom. It’s the whole Hyperspace project, today, is on GitHub. You can try it out, you can let us know. If you run into any issues, please feel free to open an issue on the GitHub thing. So, I, on behalf of my entire team, look forward to seeing you folks on our GitHub page and look forward to collaborating with all of you. Thank you so much for your talk.
Rahul Potharaju is a Principal Engineering Manager at Microsoft's Azure Data group working on Azure Synapse Analytics. Previously, he worked as a Researcher in the Cloud and Information Services Lab (CISL) at Microsoft. He earned his Computer Science PhD degree from Purdue University in a joint industrial collaboration with Microsoft Research and Computer Science Master's degree from Northwestern University. He is a recipient of the Motorola Engineering Excellence award and the Purdue Diamond Award. Rahul's work has been adopted by several business groups inside Microsoft and has won the Microsoft Trustworthy Reliability Computing Award.
Terry Kim is a Principal Software Engineer at Microsoft’s Azure Data group, focusing on scalability, performance, and query optimization. His current work involves enabling Apache Spark for .NET developers.