Building End-to-End Delta Pipelines on GCP

May 26, 2021 03:15 PM (PT)

Download Slides

Delta has been powering many production pipelines at scale in the Data and AI space since it has been introduced for the past few years.

Built on open standards, Delta provides data reliability, enhances storage and query performance to support big data use cases (both batch and streaming), fast interactive queries for BI and enabling machine learning. Delta has matured over the past couple of years in both AWS and AZURE and has become the de-facto standard for organizations building their Data and AI pipelines.

In today’s talk, we will explore building end-to-end pipelines on the Google Cloud Platform (GCP). Through presentation, code examples and notebooks, we will build the Delta Pipeline from ingest to consumption using our Delta Bronze-Silver-Gold architecture pattern and show examples of Consuming the delta files using the Big Query Connector.

In this session watch:
Himanish Kushary, Architect, Databricks
Molly Nagamuthu, Architect, Databricks

 

Transcript

Himanish Kushar…: Hello, everyone. Welcome to our talk. We’ll be speaking about architecting an end-to-end Delta pipeline on Google Cloud Platform and its integration with other GCP services. We will walk you through a brief Delta Lake overview, talk about implementing Delta Lake architecture on Google Cloud Platform, go through our reference architecture on implementing a modern data lake architecture, using Delta and GCP, and also show you a working demo of the architecture. But before we begin, let me introduce ourselves. I am Himanish Kushary. I’m a Practice Leader with the residence solutions architect team at Databricks. My co-presenter today is Molly Nagamuthu. She’s a Senior Solutions Architect at Databricks. So let’s get started.
Before we dive into building pipelines using Delta lake on GCP, let’s talk about some of the challenges we face while building a traditional data lake ETL pipeline. One of the challenges that we see pretty often is around reliability. For example, when you have newly-arriving data from your retail pipeline and you are reading the data at the same time, you could end up with incorrect rates so it’s very hard to append data without creating some inconsistencies. Also, a lot of times we see that jobs keeps on failing midway and when a ETL job fails midway, then you could end up with corrupt data or incomplete data. Fixing this is often complicated and expensive. Also, we see modification of existing data is very difficult. For example, if you want fine-grained access control for requirements like GDPR or CCPA, then it can be very costly to make those updates to your records on a traditional data lake. Lot of times in the enterprise, use cases demand mixing, streaming and batch workloads for getting the best possible insights. Getting real-time operations to work on a traditional data lake is very complicated and could end up with data inconsistency and make the process very expensive. Also, for auditing and government requirement, you would require to keep copies of the data for historical purposes. This could also becomes a challenge when you’re using a traditional data lake.
Now let’s move on to the next. We have seen that there could be performance challenges that could arise on a traditional database. Some of them are: in our enterprise data lake, where you’re dealing with thousands of partitions and tables, it may be very difficult to handle large metadata. We are all too familiar with the small files problems on traditional data lake, which causes performance issues as well as expensive operations. Also, usually partitioning and bucketing has been our go-to choice on traditional data lakes for performance-tuning, but that is error-prone and once you have established a partition, it’s very difficult to change. For example, say, if you want some tuning done on multiple dimensions or for high cardinality columns or say for floating points value, using the traditional partitioning process could be cumbersome.
Moving on, we have also seen in traditional data lake environment that there have been lot of challenges revolving around data governance and quality. For example, if you want to track what was the changes made to your dataset, it may be very difficult to set up and manage because of lack of transactional traceability on a traditional data lake. Lot of times we see that the data that comes into a traditional database may have lot of data quality issues, and if not handled properly, then this bad data could end up propagating to downstream into your enterprise environment. It is very hard to make sure that the data quality is correct, and it doesn’t affect your enterprise insights. Lastly, fine-grained access control on a traditional data lake has always been a challenge. So Delta lake actually helps address all these issues.
Delta Lake provides a fully Spark API compatible open-format that provides reliability with ethic transactions so reads and writes happen simultaneously without inconsistencies. It unifies batch and streaming work-use cases to create a more simple and mental level architecture. It provides a way for scalable metadata handling for enterprises to support thousands of tables and partitions. Delta Lake also provides the ability to time travel and look at data changes for traceability. It has in-built support for schema enforcement and data quality checks which allows for making sure that bad data doesn’t end up in your downstream ETL process. Delta Lake also supports upserts and deletes which can be utilized for simplifying fine-grain data updates. For example, for GDPR and CCPA requirements. Lastly, Delta Lake also have compaction and data-skipping built-in which allows for lot better performance.
So what we see here is Delta Lake helps us to move forward towards a more ideal data lake architecture by providing features like ACID transactions, efficient upserts, schema enforcement, scalable metadata handling and batch and streaming unification [inaudible]. But, our work is not done. Building an enterprise-scale data lake, especially one that is integrated with your data warehouse environment and other applications is still hard. That is why we have been noticing a new paradigm that has evolved to unify all your data analytics and AI requirements into something called lakehouse architecture. The lakehouse architecture takes the best of both worlds. Things like open-format, support formation learning from the data lake world and things like sequel and BI support from the data warehouse world and brings them together into a single architecture platform and the key enabler behind this lakehouse architecture is Delta Lake.
So let’s look at the Delta lake platform and the architecture, how it would look like. So the lakehouse architecture that you see here is unique in three ways. The data, it only exists once to support all of your data workloads on one common platform. On top of the open-data lake, you have the governance and data management layer provided by Delta Lake, which is based on open-source and open-standards, so that it’s easy to work with existing tools and avoid any proprietary formats. What this enables is your data engineers, your data analysts, your data scientists to work together in a collaborative way much more easily. Now let’s look at how a modern data architecture can be implemented using Delta on Google Cloud Platform. A modern data lake architecture would have different data sources like unstructured data, semi-structured data structured data coming in into the raw data [inaudible]. Once it’s there, then those raw data will be processed, refined and curated into a curated location where it will be consumed by different groups within your organization, like data engineers, data scientists, data analysts, so on and so forth. Now, how do you implement that on GCP using Delta Lake? Let’s look at that.
So, on GCP when you are trying to implement a modern data architecture, Delta Lake could be the central piece in your architecture. Being open and reliable and also performant, it will bring in reliability to your disparate datasets coming into your data lake. It will speed up ingestion and data availability at scale. It will allow for joining batch and streaming architecture and reduce the barrier of entry for all your data personnels and data assets. And ultimately what it does is it helps to break the silos between your engineering and data science teams, and allows them to work in a more collaborative fashion which would mean that once the Delta Lake has been the central piece of the architecture, it will allow you to use disparate number of tools in the Google ecosystem like Google BigQuery, like Google Cloud storage, like Google email and AI products, as well as use other platforms like Apache Spark [inaudible] or Databricks to run your analysis.
So the key part here is the open and reliable and performant Delta Lake platform that’s been implemented here. Now, once the centerpiece is in place and your curated data is in the centralized Delta Lake format, now you can select the right tool for the job you are doing. For example, you could have Dataproc for processing using Apache Spark your data. You could have Databricks, which is a unified Lakehouse platform experience and use it to process your data. You can use Google Dataflow for streaming analytics. You can use Google Datalab and machine learning for exploring the data and running machine-learning algorithms. You could gain valuable insights using BigQuery and you also have the option of dashboarding and visualizations using BI tool like Looker. So different use cases within your ecosystem will be able to leverage the central curated data through Delta data lake management and the governance layer.
Now, once you have selected the right tools, let’s see if we can simplify this architecture for them. So if we can take this further, this architecture would become a lot more simple if we settled down on maybe Apache Spark or a platform like Databricks for the ETL sequel analytics and the streaming and the ML part, we could use Google BigQuery for more near real-time on-demand data analytics and insights and we could use a BI tool like Looker for dashboarding and visualizations. That way the Delta Lake is the central piece of this architecture and on top of that, you could simplify and use Apache Spark, BigQuery and Looker as your different needs, depending on your needs.
So let’s look at a reference architecture using this technology. So what we are looking at is a typical reference architecture for implementing ETL pipeline using Delta on GCP. Enterprises usually have variety of data getting into the system at different velocity and maybe files are getting dropped as raw data on GCS. Maybe the files are coming in a streaming fashion through Google pops-up. Once this raw data lands on Google Cloud Storage, then you can use something like a Spark engine, which is shown here in the reference architecture, to bring it into the Delta Lake and once it is in the Delta Lake, depending on your business needs, this is raw data can be cleaned, validated, processed, and refined to change into curated dataset and all of this will be done in a reliable and performant way because of the Delta Lake technology. What this curated dataset can then be used for machine learning, using something like cloud AI platform. It could be further analyzed using something like Google BigQuery. It could be visualized through Looker and so on and so forth. So as you can see, the Delta lakehouse architecture helps streamline the collaboration across your multiple teams by keeping the data in one centralized location and it allows your team to scale around the entire data ecosystem.
Now, I’ll hand it over to Molly. She is going to walk us through a demo of this architecture in action. Molly, take it away, please.

Molly Nagamuthu: Thanks, Himanish. Hey, everyone. Thank you for being here. For our demo today, we are going to showcase an end-to-end pipeline on the Google Cloud Platform. The focus is to highlight seamless integration with Google Cloud Services. If you have built Spark and Delta applications before on AWS or Azure, they will work exactly the same way on GCP for the features that have been GA-ed so far. Now we are going to build out the reference architecture that Himanish laid out using all the best practices he just covered. For our use case today, we are going to look at tweets’ real-time. We are going to look at the pipeline we are building is to ingest the real-time Twitter stream onto Google Cloud storage. This will be a raw data and conceptually our Delta bronze layer. Just to show our integration with BigQuery, we will have some reference data in BigQuery, use the reference data to enrich our raw data and write out our processed Delta table. Once we have our processed Delta table, we’ll get the analytics and aggregations that the business is interested in, or the questions that need to be answered and write out our curated table. We will push this final table to BigQuery and we will look at the visualizations using Looker.
Let’s dive into our demo next. Let’s just make sure our cluster is up and running. Now for the demo set up, we have separated these four stages of our pipeline into four different notebooks so it’s easy to see the flow between the various stages of the pipeline. So first let’s look at our streaming notebook. We go through the Twitter API authorization. Let’s just run the cell here. I have to… okay, so let’s just go ahead and run this.
That looks good and let’s go ahead and run just our regular cells on the notebook and this is our familiar Twitter API for streaming. We are interested in just a few topics and not really looking at all the tweets that we’re getting, that’ll be too much. So let’s just go ahead, run this and then start our stream. Okay, while this is going on, let’s look at our ingest process. Let’s just go through and set up our containers here.
And let’s double check if they look good on our Google Cloud Platform. We have the two folders, ‘raw_tweets_Delta’ and ‘streaming_tweets’. Let’s get back to ingest process and today we are going to be ingesting using a Databricks Auto loader. Now structured streaming can also be used for ingest, but autoloader provides an easy way to ingest data and it provides a structured streaming source called cloudFiles, given an input directory path on cloud file storage. Let me just start this. The cloud file source automatically processes new files as they arrive. It’s a simple directory listing for file discovery. Now, as the stream may take some time to start, let’s just run this just to make sure that the stream is starting. It’s initializing at this point. As the stream is initializing, we can go ahead and look at our next step in our pipeline, which is processing the tweets.
Now, the first step in the pipeline is to understand what the data is and what we are interested in and clean up the data and so first let’s explore our raw tweets from Twitter. We are interested in only three attributes here. One is the language, the other is the place and the source from where the tweets are coming from. Just to avoid any privacy issues, I haven’t listed any other data besides these three. So, here you see this language is a two digit code and this place, of course, most of the time, the location is not filled in, but for the cases that are filled in, the country code is actually a two digit code. And then we have the source, which has with all this encoding around it, we will try to parse that. So here, let’s look at the reference data from BigQuery. We have two different tables that we are loading. Let’s go ahead and look that up in BigQuery and how it looks.
So the ISO 3 codes, these are public datasets that are available, by the way and for the ISO 3 codes, we take a two digit code and it converts it into a three digit country code. We need this information for visualization and that’s why we are using this reference data. Now for language codes again, we’re just spelling it out as with English names and that’s what we are doing here for these. And these two are reference tables that are already loaded in our BigQuery database. Now, let’s go ahead, let’s just go and check how our stream is doing at this point. Yes, it looks like our stream is up and running and there is data coming in. Let’s just look at these tweets, the number of these tweets. It’s still initializing at this point. Let’s come back to it a little bit later. So let’s load the reference data meanwhile, into our notebook, into our Spark data frame. Perfect. Once we have the reference data loaded…
Yep, now we start these tweets flowing in. The count is increasing slowly and you see the difference here. Yes, now once we have our tweets, let’s go ahead and process this raw data tweets that we have, and then let’s do our enrichment on our… That looks good. So we are just joining here from our language table to our raw tweets. Let’s just run this and see whether we have enriched our language. Yes, it looks good. And now here we are getting our three digit country codes and we are here parsing our Twitter data source and once we have enriched all our raw data, let’s just write out our processed tweets Delta table. Sorry about that. And once we have that, it is creating it in the internal meta-store here.
We have our temporary table from where we are actually running this so… As we are running our silver table, let’s go ahead, while it’s doing this, let’s go ahead and see what we are doing in the next process. So, we’ll just go through our interest here. So once we have our silver table, we are interested in knowing the distribution of tweets based on language, country and source so let’s go ahead and see if our table has written. Yep, that looks good now and we have our enriched data, our language is fully spelled out, our country code is three digits and our parsed stores looks clean. So once we go here, so we are interested in the distribution so let’s just do some analytics on our silver table. We’re just doing some aggregations, we’re just finding the count of tweets. That looks good. That’s what we got, at least in the last five minutes. Let’s just try to visualize it. Okay, so it looks like English is the predominant language of the tweets, at least since we started the streaming. Now let’s look at the distribution of tweets based the country codes. Remember we enriched that country code to our three digit one, and let’s try to map this and… Interesting. So we see tweets pretty much all over the world. That’s interesting. And now here let’s do our parsed data frame.
We’re just taking a little different approach here. We are just trying to see how we write the parsed data frame, how we get the parsed data into the data frame. We’ll try to write the data frame as it is to BigQuery that’s why we are doing this here and let’s see how this looks. Okay. Awesome. So we have the tweet count based on the source, and now let’s create our gold table. So we are pushing the language counts into BigQuery. So let’s just look. Okay, that looks good. Now, let’s write this data back to BigQuery. So here we are writing the data frame to BigQuery and we’re saving it in a table called ‘parsed_gold_tweets_Delta’.
And then let’s write the table itself. The language table that we wrote. Let’s go ahead and look at these. Let’s go ahead and first, look at the tables that are being written so that’s our gold tweets, the language count that we just processed and saved. This is our parsed source count that we have. And now let’s explore our data in Looker. So, I just ran the query, but I can run it again, just reading it from that and then here again, we have English as the most used tweet language. And then let’s look at this, if you don’t have to visualize, we can also query the results from here. And there you have it. So, when we talk about tweets, you probably have advanced Twitter analytics use cases like topic modeling or sentiment analysis, but we are showing this here just to showcase the end-to-end pipeline on GCP and that’s what our focus was on. So the motivation for a talk in showcasing an end-to-end Google Cloud Platform pipeline is the fact that Databricks was GA-ed on the Google Cloud Platform earlier this month. We are already excited about it, that we can use Databricks on GCP and we can now truly be cloud-agnostic platform. Thank you.

Himanish Kushary

Himanish Kushary is a Practice leader with the Resident Solutions Architect team at Databricks. He helps customers across multiple domains with building scalable big data analytics solutions and produ...
Read more

Molly Nagamuthu

Molly Nagamuthu is a Senior Resident Solutions Architect at Databricks. She has been working their top-tier strategic customers solving some of the toughest Big Data problems at scale in both Healthca...
Read more