This talk will focus on Journey of technical challenges, trade offs and ground-breaking achievements for building performant and scalable pipelines from the experience working with our customers. The problems encountered are shared by many organizations and so the lessons learned and best practices are widely applicable.
Attendees will come out of the session with Best Practices and Strategies that can be applied to their Big Data architecture, such as:
Audience: The attendees should have some knowledge of setting up the Big Data Pipelines and Apache Spark.
– Welcome to the Spark Summit.
I’m Vini Jaiswal working as a Customer Success Engineer at Databricks. I work closely with data engineers, data scientists and executives across various industry. Including finance, geospatial, claiming, retail, healthcare. I help their cloud and their data implementation and strategizing AI and machine learning, as well as analytics use cases.
So I’m here to share some of the lessons that I learned with our customers and best practices they implemented.
I want to take you through the data engineering journey as 80% of the business problems are associated with that. We will cover data strategies, building cost effective and performant data pipelines, securing it for productionization. And I will conclude it with customer reference architecture.
At Databricks, we think that data has the potential to solve the world’s toughest problems. How do you do that?
You have data, and you have business problems to solve. All you need to do is run data science machine learning, alright? If you have traditional stack, but unsolved AI reporting on it, if you can just do that, our jobs will be much more easier. Unfortunately, the reality is complicated. We no longer are talking about business intelligence or business analytics. Organizations are realizing the value of having all the data and now they want to do much more than that. They want to be data driven. To leverage machine learning and AI effectively, you need a unified faster and scalable solution to support growing volume and diverse data including open format.
Data Lake is more like a body of water in its natural state. data flows from source systems to the lake. It collects everything eliminates data silos and facilitates a central location. Data Lake serves as a single source of truth, where you can source all the data, decouple compute and store it and use it to drive insights, analytics, data science and machine learning. But this is how the usual Data Lake looks like, it’s garbage in and garbage out.
This is what the data scientists dream of.
Let’s summarize these problems. No atomicity means failed production jobs. Leaving data in corrupt state. There’s a huge data quality and reliability issue with this architecture. For each copy, it’s hard to keep it consistent. So it’s hard to do anything with it. Data scientists spend 80% of their time data munging and 20% relaying. How do you want to solve it?
You need a unified service to remove all the data silos and ensure that you have reliability. And the great thing is you already have a lot of data in your Data Lake. All you need to do is make sure it’s reliable. Spark is one component of that, and most people have already standardized on Spark as their data processing engine. Second really important component that a lot of users find value is in Delta Lake. It creates a transactional layer on top of your existing Data Lake. And once you have asset transactions on top of it, you can make sure that you have reliability and high quality data.
And you can do all kinds of computation on it. So in fact if you have patterns, streaming both with Delta, it’s okay to have streaming in data while someone in Batch is reading it. That’s the unified service. Now we got our data fixed. Whenever you are starting a data pipeline, think about what is the use case or business outcomes you want to achieve.
Usually like to start a machine learning use case, or even a project if I don’t even get the data right. It’s always a good practice to ask yourself if the data that you are provisioning helps a use case and if your data is ready for analytics or machine learning. So think about your audience for whom you are building the data for create tables that are purpose towards use cases and audience. Think about file format does file format lend itself to a faster application. Whilst yes we had some challenges like why transformation, etc parquets is more optimized format. You need to think about table segmentation as well. So instead of one size fits all approach, my recommendation is having different tiers based on the user persona and use cases. In Delta Lake we use the terminology of bronze, silver and gold. Bronze meaning in that tier, you can try to preserve all your raw data, keep it in bronze layer and use lifecycle policies with minimum transformation. This is mainly used for data scientists. Second is silver tier, combine all your logic partitions to keep it in optimized format with some transformation. So this serves some sophisticated advanced users like product managers. Gold tables, so this is where you want to keep rollups and aggregations for business users and specific use teams. This serves BI domain level teams. Now you are all set with data strategy. To consume data downstream, you need to think about efficient ways to run queries and use compute. Today there are two major factors which drive the need of cost optimization. The evolving global financial conditions in response to COVID-19 pandemic and the accelerated adoption of cloud. Since I’ve worked with customers directly I was in a well, I was in a position well placed to assist with effective use of existing tools and best practices on how to optimize cost.
There is no silver bullet or drop down and guard down where you can plug in parameters and get a perfectly architected cluster.
If your objective is to allocate the right amount of cluster resources for a job. Then we will need to understand how different types of jobs demand different types of cluster resources.
If our goal is to allocate just the right amount of cluster size for the application, it’s important to know how your data is going to be used. Each of the use cases require different thinking.
For example, if you have a use case, which requires memory optimized instances. These are good for applications, where you want to run machine learning workloads, so most everything is iterative, meaning you will need to work on the same data again and again. So caching becomes a really important feature to run machine learning algorithms in more performant manner. Compute optimized instances are great for streaming and ETL use cases. Traditionally when I’m doing streaming on the whole, I am performing operations on a row basis as opposed to a large data set. And I might be adding timestamps or removing null value. But I’m not trying to join these elements together. So I don’t need that much memory. With ETL, you need to do full file scans, but there is no data read. And most of the workloads with CPU bound, so you can use C class for this option.
For storage optimized workloads, this is an option which has a locally mounted disk as well. If you’re going to be handling images repeatedly, that will be a good one here. For analytics use case or ad hoc queries, we recommend i3 or storage optimized clusters. We also have Delta IO cache option, which caches data locally and is good for repeated read of data, so we don’t need to read and read from s3 again and again.
Next question you should ask, how fast the job needs to be run? We can throw in more cores at the job, but it may not be worth the consumption cost.
You should always remember that experimentation is cheap. So getting a rough order of magnitude is important but spending time trying to perfect the architect or cluster might be delaying your development time as you move along. You should use the proper size of node, I usually try to keep a good happy medium. So when you have a very large workload, for example you choose a node size of 256 gigabytes of memory. That means you could end up with a large JVM hip and as a result garbage collection is going to take a very long time. When you have a small node, then you need to scale it out as more nodes have less memory it means more spills. So I choose 64 or 128 gigs memory give or take using that range. So what memory size you should choose really depends on workload. The ultimate decision over there is a benchmark, so you should start with that. Secondly if you need to think about your cluster size, you should think about how many workers you need to complete the job. And it depends on data volume, complexity of transformation, and eventually your application requirements. You can start with how many task you can create initially, which is correlated to the input data size.
Once you start running some workload, observe the behaviors from the Spark UI, For example, storage tab in Spark UI gives you information on caching, to indicate if you have enough memory or not.
How much of data set is cached in memory and how much of data set is cached on this. This is the information that can help you as well. For example, if your data set is fully cached with some room to spare, you can find that from the storage tab of your application. There is a chance for you to reduce the number of workers. If you don’t need that much memory. Let’s say if you are almost completely cached, then you might need to increase the cluster size so that you have enough memory to run the jobs. Third one is not even close to cache. So if your data set is super big that instead of caching in memory, you may want to choose cache on disk, but not on the EBS. You should choose the instance type that support large number of SLT. So we can cache it to the disk. And when we retrieve data, it is much faster.
Next is observing Ganglia Metrics and tweaking the workloads. Ganglia Metrics UI is the integrated metrics tool that comes with Databricks. So with it you can track important cluster metrics such as CPU, memory and network utilization over time. For analytical workload, you want to make sure that you have enough parallelism and partitions in your data frame. The more cores you give to workloads the better. If your Spark application is well written, then there is no bottleneck. And you should be able to linearly scale it. So if you double your cluster size, then your runtime should be cut in half. CPU usage is aggregated over all machines in a cluster. For this reason, you never want your usage to near 100 percent. This would mean that extra work is being put on your driver. And you’re really stressing the driver in this case. For example, the good amount of maximum usage should be 80% on a cluster with five machines four being workers and one being driver. Let’s say if your workload is network bound, then you need to think about where are the big spikes, so you can use ganglia UI for big spikes. Green indicates that data is being read and blue indicates data being written out. For this use case, you can use big machines with better network throughput and i3 type with SSD backed instances so that you don’t have to do repeated remote puts. The memory pane on the top right describes the objects stored in memory for a given Spark application. So if you spill a ton, you can use more memory and i3 instances.
Now we have data, storage and compute all figured out. Next thing which comes up is can I make my application run faster?
So look for these four symptoms when running your workloads. Shuffle, spill, skew, small files. And Spark UI is where you can do this.
First shuffle, so let’s look at shuffle. I have two tables here flights and carriers. Where my flights is large data set and carriers is small data set, and this is how I’m joining my tables. When I run query, it took 28 minutes to complete. Let’s look at this SQL logical plan to see what happened during query execution. So I’m going to locate my job and observe the query execution.
I’m gonna scroll down to the sort, merge, join, you can see that it outputs a very large number of rows. And this might be because the way we are joining a large table to small table. And if I scroll a little more down, I can also observe that there is a spill size, this is a huge spill size. So to improve that, you can take advantage of joint optimization. So how you do that is optimizing, broadcasting the smaller number to all the executors that way our shuffle doesn’t have to incur during the joining process. And then you want to review your join order. It’s ideal to join the smallest tables first, and work up to the big one. Now once we factor this element to the query took one point eight minutes. And looking at SQL plan in Spark UI, we can see that broadcast improved the shuffle, and now it was completed in much more faster time. Now we’re gonna look at what is the result of sort, merge, join. It’s around 1000 rows output, which is a significant improvement from the previous value.
Next is spill. So when the data cannot fit in memory, disk spill may dominate overall performance. Since the default configuration allocates one CPU core per Spark task. This can lead to situations where Spark jobs are not using all the CPU resources allocated to them. And as a result, the overall cluster CPU utilization can remain low, even under peak scheduling conditions. Similarly for memory depending on the input data and shuffle sizes the job might not fully utilize its reserved memory. This is not obvious to find, but you can leverage SQL plan to in the most informative manner. Here it shows a large amount of spill, when my shuffle partitions were set to 16.
So approach I took is increase the partitions to 48 and reduce the number of cores the executor can use.
You can see on the right my spell size decreased from these kind of tweaking and it was almost to zero. So the more spill you can remove larger the impact.
Now you might think, for your application, if you find a skew now what. So data skew is a condition in which a table’s data is unevenly distributed among partitions in the cluster. Data skew can severely downgrade performance of queries, especially those with joins. So if you’re joining big tables that would require shuffling of data, and skew can lead to an extreme imbalance of what imbalanced cluster. So what you can do is observe your CPU usage using Ganglia UI. So in my graph, you can see that usage becomes low after an initial spike. Another metric that can be helpful is looking at the stages in the Spark UI, which I’m showing to the right side. So look at the duration of task, there is a significant difference in the value of max from the 25 percentile and 75 percentile values. Another indicator is input size, and records are also significantly different. To help with this, you can use broadcasts joins. Sparks SQL accepts cue joins in queries. So with the information from these hints, Spark can construct a better query plan one that does not suffer from data skew. You can always play around with other settings which are already partitioning or your salt keys.
So over the years, there has been an extensive and continuous effort to improve Spark SQL’s query optimizer. And one of the various improvement is the cost optimization framework. Adaptive query execution looks to tackle such issues by optimizing and adjusting query plans based on runtime statistics. It reduces manual effort of tuning Sparks shuffle partitions, and it dynamically changes sort, merge, join into broadcast-hash join, and also it is able to handle skew joins. This Spark query execution is available in our newest version which is seven dot X and Spark three dot O. There is also a different talk in the Spark Summit that you might wanna watch if you’re interested in learning more.
So to resolve small files problem, let’s look at scenario and the solution we took. So I’m gonna talk about an example of a media customer I was working closely with. They were getting close to 500,000 real time data requests per second on their cluster. So getting a lot of URLs every second which is dumped into s3 for certain time interval creates a large number of small files problem. So we took two step approach to solve it. We managed to process these large number of small files instead of a cluster of bitters writing directly to s3, they sent URLs to kinesis stream instead. Which eliminated the generation of small files. All the data is in the streams now, which would lead to utilizing Spark resources more efficiently. Instead of caching or persisting data on the cluster, they were writing directly to the Spark parquet table. However, this resulted in another issue that is with the parquet tables, they ended up writing so many files to the tables. The problem with it is continuous append on the table is too slow. And if one job is updating the table, another one cannot query the table. So second stop was solving this challenge using Delta Table. Delta supports asset transactions which basically means they were able to concurrently read and write this table. It is also very efficient with continuous append. And the table and Data Lake serve both as a batch table as well as streaming source and sink. While we are at it, let me cover a few more performance options with data.
First is compaction, so if you continuously write data to a Delta table over a period of time, it accumulates a large number of files. Especially if you add data in small batches, this can have an adverse effect on the efficiency of table reads, and it can also affect the performance of your file system. Ideally, a large number of small files should be rewritten into a smaller number of larger files, on a regular basis. This is known as compaction, you can compact a table by repartitioning it to a smaller number of files.
Second is auto optimize. This consists of two complimentary features optimized writes and auto compaction. It is available in Delta if you run data on Databricks. With optimized writes, Databricks dynamically optimizes Spark partition sizes based on the actual data and it maximizes the throughput of the data being returned. So in terms of auto compaction after an individual write, Databricks checks if files can be further compacted, and it will run a quick optimize job to further compact files for partitions where small files are still existing.
Z-ordering is a technique to co-locate related information in the same set of files. This co-locality is automatically used by Databricks, Delta, data skipping algorithms. So this time it dramatically reduces the amount of data that needs to be read. To Z-order data you need to specify the columns to order in the Z-order by query. One of the use case of this is time sensitive requests emerging from GDPR and CCPA Laws. And Z-ordering can be helpful because the data on the keys that need to be deleted helps with speeding up locality of the requested records and efficiently rewriting their parquet files.
So you got data, storage, compute and operational efficiency figured out. For every company and especially the data companies, it’s really important to have governance to scale the solutions and operate in a secure and reliable manner. One of the use cases which has emerged recently for organizations is the GDPR and CCPA compliance. Let’s talk about how Delta helps with it. With Delta and performance features we talked about in the previous slide. You can dramatically simplify and speed up your ability to locate and remove personal information in response to consumer GDPR and CCPA request. By default, Delta Lake retains table history for 30 days and makes it available for other options like time travel and rollbacks. But if you didn’t mind that GDPR and compliance requires still records to be made unavailable. Before the default retention period is up. You can perform deletes and use vacuum function to remove the files. Once you have removed the table history using the vacuum command, all users will lose the ability to view that history and rollback.
Another best practice in general especially for GDPR is if your original table contains personal identifiable information about the customers. Separate out the columns into PII and non PII tables. You can also pseudonymous the records by using SHA algorithms for linking up to a lookup table. So restrict that PII table access and use non PII tables across your downstream and other upward stream of tables. Apart from applying these best practices to Delta Lake, it is recommended to set a retention policy with your blob storage for 30 days or less, so that you don’t have to end up looking for data in the last minute. So if you want to learn about this use case in more detail. We also have our tech talk available and the link is in the reference section.
Other best practices for auditing and monitoring are using cluster tags so you can work towards chargebacks for different users and departments. You can use audit logs generated by Databricks. Monitor your DBU usage regularly for awareness and spikes. And just to be aware of how much you are using what. Delta transaction logs should be leveraged since it captures all the information about actions performed timestamp, person who performed the transaction, etc. So you can leverage that for audit as well.
Data block pipeline is consumed by different layers in your organization, and managing access roles and responsibilities as well as managing usage is a must. You should implement fine grained enforcing access and controls for your users. Think about the controls to give on the storage layer as well. You can manage inscription applied ie and rule policies to the data being stored as well. Always it’s a good practice to have a minimum level permission required for each user.
Bringing it together. So this is a reference pipeline from one of my customers. They started from very small step and efficiently utilize each step in their data journey to build a scalable pipeline. This is how each component fits together, in their whole data architecture.
Having a good data strategy in place.
Leveraging design patterns that we discussed earlier, to automate, optimize and build performant execution pipelines. You can then top it off with security and governance controls, and tie it all together for business value. Hopefully the lessons and best practices that I shared can be helpful for your Databricks and data journey.
Want to mention that your feedback is really important, so please rate our session.
Vini Jaiswal is a Customer Success Engineer at Databricks, working with our customers on their AI, Machine Learning and Cloud strategy and implementation and has a track record of driving growth and long term successes for her clients. Vini's 7+ years of work experience, includes working as a Lead Data Science Engineer at Citi and Data Analyst at Southwest Airlines.