Brad May is a Principal Systems Engineer with Starbucks Coffee Company with over twenty years experience in Data Engineering and Systems Development from point solutions, enterprise-scale data warehouses, and most recently, Big Data solutions in the Cloud. He is currently focused on establishing best practices for efficient and highly resilient data pipelines for the Starbucks Business Intelligence and Data Services team.
Running a global, world-class business with data-driven decision making requires ingesting and processing diverse sets of data at tremendous scale. How does a company achieve this while ensuring quality and honoring their commitment as responsible stewards of data? This session will detail how Starbucks has embraced big data, building robust, high-quality pipelines for faster insights to drive world-class customer experiences.[saisna20-sessions] [transcript-play-code]
- Hi, everybody. Welcome to operationalizing big data pipelines at scale with Starbucks BI and Data Services with Brad Mae and Arjit Dhavale. My name is Danny Lee, and I'll be the host for the session. Let's start by having Brad and Arjit introducing themselves, Brad. - Yeah, Hi. My name is Brad May. I'm a systems engineer principal here at Starbucks in the business intelligence data services organization. My career path has taken me from desktop application development to web app development, to working in our enterprise data warehouse and, most recently engineering big data pipelines in the cloud leveraging spark and Databricks. - And hi, I'm Arjit, I'm an engineering manager at Starbucks with over 12 years of experience in developing enterprise scale applications. At Starbucks my focus is on delivery of technology to enable large scale data processing and machine learning on enterprise data analytics platform. - So the first question that I'm gonna actually ask for you Arjit, is how does data and AI relate to coffee and customer service? That seems not the a non-sequitur here. - Sure, Danny. As you know coffee is at our core
and if you see our mission is to inspire and nurture human spirit. One person, one cup, one neighborhood at a time. It started with one store at Pike Place in Seattle. And today we are 32,000 plus stores, 300000 plus employees, 20 million active customers with millions of daily transactions. We have a great innovative team and cross-functional partnership to provide the Starbucks experience to our customer. One of the key enabler of this experience is our enterprise data analytics platform. EDAP is used as a foundation of machine learning and AI analysis of the transactions and the areas so that we can solve more relevant and timely digital interactions and offers. You see EDAP stack, EDAP is Azure data stack with petabyte scale Delta Lake with more than thousand plus data pipelines with 13 domains, 20 sub domains and thousand plus active users across the world. And this scale was achieved progressively with the precise planning and execution, and Brad who has been with us for more than 15 years can provide us a quick view on the journey and how we are law processing, law skills, data pipelines.
- Yeah, thanks Arjit. Like you said, I've been with Starbucks for about 15 years now and I've seen a lot of changes during that time. Back then most data analysis was done using local data marts and Excel. One of the first projects I worked on here was migrating a demand planning reporting system into SQL server from a set of four Microsoft access databases that had been linked together because they had exceeded their maximum size of one gigabyte each. It would take eight hours to process that data leaving very little time to do any actual analysis. I call this our data justified era. We had a lot of savvy business leaders with great intuition, but due to the challenges in collecting and processing data, it was mainly used to validate decisions after the fact. By the time I moved into BI data services organization, they had invested in a massively parallel data warehouse solution and things were starting to change. We were then able to process terabytes of data and load in process at much faster than before. I call this our data informed period because people now had the much greater ability to leverage data ahead of time in order to make their decisions. Then about four years ago, we started our migration into the cloud and a whole new world was opened up to us. We're now in our data driven phase, where we are able to store petabytes of data and process it in near real time. You may have heard about Deep Brew, the platform our data science team uses to power our customer order recommendation system, which combines machine learning algorithms with real time data processing to increase sales and customer satisfaction. None of that would be possible without the robust data pipelines that our teams have created.
We're only able to do this by establishing robust, repeatable pipeline patterns. Like most organizations of our size we have a large variety of data sources, databases, flat files, JSON, streaming data, et cetera. And we try to have patterns established to make ingestion and processing as streamlined as possible. We have a couple of examples that we'll show you in a minute. The first step in most of our data pipelines is what we call the integration zone. This can be a landing zone for trusted third parties or a location where we decrypt data before further processing. The data here is loaded as is segregated by source and Has a limited retention period of seven days or less before being deleted.
Data used to go directly into our raw zone, which many people call their bronze zone. But with data privacy becoming such a big issue, we no longer preserve data that we don't have a full schema for. Our raw data is segregated by domain and partitioned by the date that we received the data. We apply a schema to this data and save it in Delta like format so that it's easily updatable or deletable should it need to be due to our privacy guidelines. The next step is what we call our published zone, which is a combination of what many people would call their silver or gold zones. The data is still segregated by domain and the processing can range from a simple deduplication and re partitioning based off of usage patterns to data products that have complex business logic applied and sometimes contain data from multiple other published data sets.
- Wow. Thanks for much for adding the business journey that you went through Brad that was really enlightening. You discussed a lot about the structure and I guess what I'd like to have us dive into is like, could you probably provide some concrete examples please? - Yeah. I've got some batch processing examples to show you. And then our Arjit got something on our streaming pattern. One of our keys to success has been to establish easy to follow repeatable processes for our data pipelines. Like a lot of companies our size, we had a large enterprise data warehouse with about 10 years of pipeline development that wasn't gonna be migrated to the cloud overnight. During our transition, we started by extracting and loading finished data products directly into our Delta Lake while we worked on bringing in component datasets and reconstructing the end to end pipelines using Spark.
At first, we issued a single extract statement using a JDBC connection, but that could take a really long time and we would sometimes lose the connection midway through requiring us to start over. It was clear this approach was not gonna work at scale. We solved this problem by breaking the extract up into pieces. And we developed a generic notebook that we could parametrize to start producing these pipelines and assembly line fashion. The extracts SQL statement, the name of the destination Databricks table, the number of concurrent extract partitions we want and the name of the source table column to be used for the distribution value are all saved in a Delta table in our production workspace.
We pass parameters into the notebook through our orchestration tool in this case airflow and securely retrieve the database connection details through an Azure Key Vault.
So why do we specify a distribution key value and the number of extract partitions?
As I mentioned earlier, we have a massively parallel data warehouse solution that works similarly to Spark, and that there is a parallel coordinator which works like a driver's note and a set of parallel slaves that act like workers. A common mistake I would often see is that when people want to quickly extract data from our warehouse they would issue a query with a high degree of parallelism on the database side. But when the data is returned to the parallel coordinator, they hit a bottleneck. As the data is consumed through the network by the client application.
Instead of issuing a single parallel query against the database, we issue several non-parallel queries hashed into separate deterministic buckets.
Since this is Oracle, we use the Ora hash function on the specified distribution column value, divide that by the number of buckets and take the module of the result, which breaks the extract up into multiple buckets.
To avoid having uneven bucket sizes due to skew within the data. We usually use the Oracle pseudo column row ID, which guarantees us and even distribution. If we're extracting from a view instead of a table, we would either include the row ID from one of the component tables or else pick some other column that has a reasonably even distribution of values.
Since this is coming from a database and we already know the schema, we land this directly into our raw zone in a Delta Lake format table.
Our notebook supports two modes, a pinned and overwrite in case we need to run the pipeline more than once.
So now we have data in our raw zone, but it's not what quite ready for most use cases. I have here a sample pipeline to take some weather forecast data into our published zone. As I said earlier, we load data into our raw zone partitioned on low date, which is typically the date that we received it. Most of the time though, this is not the best way to query the data. So we need to re partition it based off the most common query pattern. We will also be applying, naming conventions and data types, deduplicate and sometime apply business logic to keep things standardized.
Commonly our developers will extract the source column, the destination column and the data type directly from the mapping document they're working from into a list. And then they will iterate through that list to generate the schema.
We then remove any unnecessary columns and add any metadata that we want to save with the table.
In this case, our raw data was partitioned by low date, which is the date the weather forecast was created. But the access pattern for this data is usually by the date that the forecast was for. If we have forecast for the next 10 days, we will need to re partition this data into those 10 partitions when we write to the publish zone. We accomplish this by getting a list of the store forecast dates and specifying that in the replace, where option, when we are writing in overwrite mode. There are multiple ways to accomplish this, but using the collect set function, concatenating the results with a common eliminator is one the easiest.
If we weren't replacing the entire partition, we would instead merge the data in using standard Delta Lake merge syntax.
One of the benefits we get from Delta format is that the underlying data is always available while we're performing these transformation due to the transaction isolation provided through the Delta log. - So, hey, thanks, Brad. You've provided a really awesome batch demo, but how about that structure string. Arjit can you help us out with that one? - At Starbucks we use spark structured streaming with Azure Event Hubs. So why did we pick Azure event hubs as a choice for our streaming platform and event injection service?
When we started selecting the streaming platform, our motto was cloud first, Azure first, platform as a service first. Azure event hubs met all our requirements. Along with these, it enabled us to process millions of events per second, is scalable up to terabytes of data, and is reliable with zero data loss and it supports multiple protocols and SDKs, which helped us to establish cross-functional partnerships. Few scenarios where we use screaming patterns include transaction processing, device telemetry, live dashboarding, archiving data, and many more.
Before we dive into the code, it is important to plan each component of our pipeline. Creating a screaming pipeline is simple. The loadings we have in the streaming pattern are on the configuration of different components. So let's discuss a few of them.
When we configured event hubs, we are provided options to select throughput units and number of partitions. Let's talk about managing throughput. With one throughput unit you get up to one megabyte per second of ingress event and two megabytes per second of egress events. This means that our throughput units sets up around for the throughput of the streaming application. This app about needs to be tuned with Spark consuming application as well. In spark this is done using max events per trigger option. Now, how does this tuning matter? Let's say we have configured event hub for one throughput unit this means that sparking consumed two megabytes per second, from the event hub without being throttled. If max event per trigger is configured such that spark consumes less than two megabytes, then consumption will complete within a second. However, if spark consumes greater than two megabytes, the micro batch will always take more than a second to be created. The best consuming from event up takes at least a second. This means we need to increase throughput units. Similarly, selecting number of partitions is also important. This configuration directly relates to number of concurrent readers. An important point to consider here is that once an event is configured with specific number of partitions, which can be between one and 32, then this configuration cannot be changed. We need to drop and recreate the event hub with the required number of partitions, hence consider long term scale when setting partition count.
Now, assuming that a perfect tuning is achieved and we can get data at large scale. Let's focus on the consuming site.
Processing a lot of stream of data at scale in quickTime does not guarantee that the consuming applications will also be able to perform optimally. We still need to solve a common problem associated with the streaming and that is the number of small files generated. Hence, we now need to focus on the data sync, and the capability it offers. We selected Databricks Delta as our sync. Delta offers many capabilities, example, faster query execution with indexing and auto caching, data reliability and transaction guarantees, time-travel and many more. But the one capability which directly benefits a structured streaming scenario is delta is the ability to provide small file efficiencies. To see this benefit let's dive into our structured streaming example. In this example, we are using Databricks IOT stream device data.
We start by getting event hub details from the Azure key vault and configuring the event hub reader.
Next we work on schema we want to apply. There are two reasons why we are applying schema. First we are working with the structured streaming scenario. Second to meet our privacy requirements. Even in our Azure, here we want to have an absolute control over what data is entering our environment.
In the next steps. We get the GSR object from the event hub and convert it to define schema.
Now we start streaming and consume all events from the event hub.
Let's run a query to get the number of records received per device. And let's see records for device ID seven. We have 4,558 records for device ID seven. I'm wondering how many files required to store these records?
300 files. That's a lot of files to store 4,558 records. This means there is a lot of IO going on, and if we need data for this partition, Spark will have to read all 300 files. To me this seems an expensive operation. As it involves lot of IO.
Let's turn a query to fetch records for device ID seven.
It completed in 22.5 seconds. This may seem okay, but scale it up to millions and billions of records. That is the scale at which we operate at Starbucks. I see to be a serious problem for my consuming inquiries.
This is the small file problem that I described a few minutes back. How do we handle this? This is where Databricks Delta optimize rights come into play. The aim of optimize rights is to maximize the throughput of the data being written into the storage layer. It achieves this by reducing the number of files being written without sacrificing too much parallelism.
If we analyze the traditional rights in this image, you notice each executer writing data and corresponding partition. This scenario will result in many small files being written into storage.
However, with optimized rights there is a shuffle of data, according to the partition structure of the target table. This shuffle laterally incurs additional cost, but the throughput gains during the right may pay of the cost of the shuffle. Also the throughput gains achieved while querying the data, make this feature even worth investing.
To demonstrate this I'll turn on the optimize right setting and run the same code again to consume data from event hub.
Let's run the query to confirm we have received identical data. Here for device ID seven we do see the same count. That is 4,558 records. Now let's take the number of files required to store these records. Almost 150. We have reduced the number of files required to have and thus reduce the input output operations.
Next let's run the query to see the performance gain. This query completed in .61 seconds, 35 times better query performance.
We can further optimize file storage and query performance, by running maintenance cycle using optimize and vacuum. This is how we ensure consumers of the data using a screaming pattern gets optimal performance. - Hey, now that you're talking about consumption. Can you describe, what is your consumption pattern? How are teams consuming your data Brad? - Well, some of our pipelines produce specific products that are output to various reporting solutions or exported to third parties, both inside and outside of the organization. A sizable portion of our customer base from data scientists to analysts consume data from within Databricks. We've implemented a multi workspace model where we group consumers by persona, and then have a pub sub model for keeping our metadata in sync. Every night we published table and few definitions to a location accessible to all the consumer workspaces and the subscriber workspaces compare this against their local metadata repository and add any new objects that are available. Since all of our published tables tables are in Delta format. All we really need to do is output the object name and location, which saves time over a full schema validation.
By grouping our consumers into shared workspaces. We're able to easily manage permissions, allocate costs and enable collaboration within work groups with minimal overhead.
- And with this consumption model, it helped us to unlock data democratization. Access to enterprise data is now just one approval of it. Starbucks user use this data for operational and analytical reporting and generating better actionable insights. As we mentioned before, EDAP is used as foundation for machine learning and AI analysis of the transactions and behaviors. The thing we are most excited about is that now we can sell more relevant and timely digital interactions by unlocking the AI and email capabilities using our platform and Delta Lake. This is how we laid out data to the coffee and the Starbucks customer experience we serve our customers. - Well, thanks very much the Brad and Arjit for this awesome session. Do not forget to watch the Starbucks keynote as well as the other session, operationalizing machine learning at scale for more information. Thanks very much for watching today's session and do not forget to provide feedback.