How The Weather Company Uses Apache Spark to Serve Weather Data Fast at Low Cost

Download Slides

The Weather Company (TWC) collects weather data across the globe at the rate of 34 million records per hour, and the TWC History on Demand application serves that historical weather data to users via an API, averaging 600,000 requests per day. Users are increasingly consuming large quantities of historical data to train analytics models, and require efficient asynchronous APIs in addition to existing synchronous ones which use ElasticSearch. We present our architecture for asynchronous data retrieval and explain how we use Spark together with leading edge technologies to achieve an order of magnitude cost reduction while at the same time boosting performance by several orders of magnitude and tripling weather data coverage from land only to global.

We use IBM Cloud SQL Query, a serverless SQL service based on Spark, which supports a library of built-in geospatial analytics functions, as well as (geospatial) data skipping, which uses metadata to skip over data irrelevant to queries using those functions. We cover best practices we applied including adoption of the Parquet format, use of a multi-tenant Hive Metastore in SQL Query, continuous ingestion pipelines and periodic geospatial data layout and indexing, and explain the relative importance of each one. We also explain how we implemented geospatial data skipping for Spark, cover additional performance optimizations that were important in practice, and analyze the performance acceleration and cost reductions we achieved.

Speakers: Paula Ta-Shma and Erik Goepfert

Transcript

– Hi, I’m Eric Goepfert from the weather company and I’m presenting with Paula Ta-Shma from IBM research. My team at the weather company is currently working on a re-implementation of our historical weather data API, titled, History on Demand. Throughout the development process we’ve been working closely with Paula and her team at IBM research so that we can take full advantage of the tools provided by IBM cloud SQL Query, Cloud Object Storage and Spark. Today I’ll be discussing History on Demand as a use case for those tools and what my team has learned throughout the development process. Paula will focus on the details of SQL Query and its use of Spark and Spark best practices. This is how we use Spark to serve weather data faster and cheaper. So at the weather company we started with a mission to keep people safe and informed about the weather. Today the weather company has evolved into a decisions and solutions company with data at its core, providing people with information and the tools they need to make intelligent decisions about the weather. One example of that is History on Demand. This is our historical weather data API providing access to a global gridded, historical weather dataset. Providing 34 potential weather properties. 34 million records added every hour. We support geospatial and temporal searching. We see on average 600,000 requests per day used by clients primarily from machine learning and data analytics. Supporting research in domains, such as Climate Science, Energy and Utilities, Agriculture, Transportation, Insurance and Retail. So, as I said, we’re, re-implementing History on Demand. Here are some of the problems we set out to solve. First. It was a synchronous API so that synchronous data access layer was expensive. Additionally, it had limited storage capacity per cluster. And then with those two restrictions we had to reduce the data we provide to land only and only 20 of the 34 available weather properties. Additionally, because of that synchronous interaction clients were required to limit the scope of their queries. So they would get small amounts of data for each individual query. With those small queries, it took many, many queries and quite a lot of time to retrieve large amounts of data. So our new solution with the help of IBM Cloud SQL Query and Cloud Object Storage has resulted in an order of magnitude reduction in cost. With Cloud Object Storage as the storage solution, we now have an unlimited storage capacity. And with those two limitations out of the way, we now have global weather data coverage and we make all 34 weather properties available. We switched to an asynchronous interaction here. So clients can submit a single query for a large amount of data. And then we can process that relatively quickly with the help of SQL Query and Spark behind the scenes. With that I’ll hand it over to Paula to talk a little bit more about the details of SQL Query and Spark.

– It’s an overview of our solution. We achieved low cost using a serverless approach. The key point is the compute and storage, are separately managed services. Which means we don’t need to pay for compute when there are no queries. For compute we use Cloud SQL Query which is powered by Spark. For storage we use Cloud Object Storage and the key advantage of Object Storage is unlimited capacity at low cost. In addition, we applied best practices such as using Parquet and applying Geospatial Layout. Cloud SQL Query provide serverless sequel. It has a UI and applications can access it via notebooks using a REST API. We’ll show this in our demo. Good performance is challenging when the compute and storage a separately managed. To achieve this we use data skipping as well as a Hive Metastore. This allows us to skip over irrelevant data, providing high performance and lowering cost. We’ll now cover three features of SQL Query that are important for our solution. First, The Catalog which is a fully integrated multi-tenant Hive Metastore. This is a best practice, whereby a relational database is used to store partition level Metadata. Use of a Hyde Metastore is critical for performance for three main reasons. Firstly, you can use it to skip irrelevant partitions. This is known as partition pruning. Secondly, it enables listing partitions in parallel and thirdly it avoids costly schema inference and enables cost-based optimization. The catalog is also useful for consistently replacing entire partitions for example, to change their data layout. And we use this capability for History on Demand. Now I’ll talk about the second feature Geospatial Integration SQL Query bores a fully integrated Geospatial Library which can be used in queries. This library provides geodetic full earth support which essentially treats the earth as round instead of approximating it as flat. This enables more accurate calculations and can be significant. For example close to the poles. We’ve integrated the library with other skipping capabilities. We’ll talk about this more soon. Now for the third SQL Query feature Data Skipping. SQL query has a very general data skipping capability which avoids reading irrelevant objects based on data skipping indexes. Which contain aggregate Metadata for each object. These indexes are created by users and stored alongside the data in Cloud Object Storage. This compliments partition pruning but at the object level. We support multiple index types including the Standard Mean Max but also introducing others such as ValueList, BloomFilter and Geospatial Index types. To our knowledge we are the only framework which enables skipping for queries with user defined function. These UDFs could, for example be functions from the Geospatial Library under the we have an extensible way to map UDFs to index types. And this is picked up by Sparks Catalyst Optimizer. This goes beyond alternative approaches for data skipping, which are typically based only on MinMax and sometimes BloomFilter and do not support UDFs. How does data skipping work? Our code is an add on library, which enables the modified query execution flow you see here. On the left is a typical Spark SQL Query flow. There’s a partition pruning step before reading the data from Object Storage. We introduced an additional pruning phase which consults object level Metadata. This is done using the Catalyst optimizer and the session extensions API. Here’s a classic example of data skipping using a Min-Max Index on the temperature column. For each object we saw the minimum and maximum values of that column. When evaluating a query looking for temperatures of over 40 degrees Celsius, Spark with our library will prune the list of relevant objects based on this Metadata. In our example the red objects are not relevant to the query and therefore don’t need to be scanned. To build the index we scan the temperature column once and store the index from then on only the index needs to be accessed to make skipping decisions. Note that in the case of Parquet we also have an optimization which reads Min-Max Metadata from object footers instead of needing to scan the whole column. Since we skip entire objects based on aggregate Metadata it’s important to group similar rows together from the point of view of the query workload. We call this data layer. One interesting question is what takes precedence? Should we organize data primarily by time or by geospatial location in the case of our weather dataset? This depends of course on the workload. The History on Demand since we expect users to be interested in relatively small geospatial regions and large time windows, we lay out the data accordingly.

– So this is how we’ve applied those best practices to our production data layout. Notice first the two bold lines there, beginning with hourly and monthly. These represent the catalog partitions Paula mentioned earlier. How this works is first for the monthly or for the hourly data. In this case for February, we ingest every hour a full global data set for the previous hour. And then we geo-spatially partition that into nine Parquet files. And then we repeat this process once per hour for the month. Essentially creating one sort of global layer of data per hour. And then at the end of the month after all of that month, our hourly data has been ingested, we have a process to sort of re layout that whole month in a single layer with a much smaller geo-spatially Parquet files. So they take up much less area on a map. And here’s what that looks like. So on the left you can see the hourly nine geo-spatially partitioned Parquet files and then duplicated 744 times in the case of a 31 day month. And then on the right you see our monthly layout, we’ve lost the duplication. So now we have single files at any given location and also notice that we’ve zoomed in here. So each of those tiles represents a much smaller geospatial area. Back to Paula here.

– Now we assume geospatial layer as Eric just described. Here’s an example of a query looking for weather data in the rally research triangle. Without any optimization this query would need to scan the entire dataset even when using Parquet or ORC. As you see on the right, we built Min-Max indexes on the lat and long columns. To enable geospatial data skipping for queries like this, we mapped the ST contains UDF to Min-Max Indexes on these columns. We also did this for the rest of the functions in the geospatial library. An alternative approach which we also implemented rewrites geospatial queries by adding additional predicates on the latitude and longitude columns. Once this is done Parquet and ORC Min-Max Metadata can be exploited. However, not only does this require complex rewrite logic this still requires reading every object footer for every query. This adds significant overhead because small reads in object storage have relatively high latency. Our approach stores all the Metadata together which reduces the number of reads and increases our overall throughput. To show the benefits of consolidated Metadata. This is an experiment comparing the data skipping and rewrite approaches for a query like the one in the previous slide. On the x-axis you you see the time window size for the query. As the window size increases the benefit of having consolidated Metadata also increases, until we reached an improvement of 3.6 times faster for a 60 month window. Note that the baseline here already uses the query rewrite optimization. If we didn’t use any optimizations we would need a full scan of the entire dataset. Since the query involves the UDF. The improvement is a result of avoiding reading Parquet footers, and also better resource allocation because we avoid the situation where Spark allocates a task just to read an object footer and then goes ahead and skips the rest of the object. This experiment is similar but also takes the catalog into account. Again, our baseline already uses the rewrite optimization and this is the yellow curve. The blue curve uses both the catalog and data skipping, and we get an average tenfold speed up across all points in this graph. Again the data set is in parquet format. For CSV, JSON or Avro the speed up would be much larger. And now for a demo. In this demo, we’re gonna use Watson Studio notebooks to run SQL Query APIs against the weather company data stored in IBM Cloud Object Storage to do some geospatial analytics. So first I need to log in passing my API key. The next step is I’m gonna create a table. Here we use SQL Queries built-in catalog to create a table called TWC demo. Now this table is backed by Parquet data in costs. We partition it by year and month and it has around 40 columns including latitude and longitude. Then we create the partitions at this table and we also create Data Skipping Indexes Min-Max Indexes on both latitude and longitude. So I’m gonna skip over those steps cause I’ve done those ahead of time. But now I’m gonna go ahead and run the query. So this query gets weather data within a radius of a hundred kilometers from New York city for the month of October last year and calculates the average daily temperature for each location on the grade. So here are our results, we have over 50,000 rows. Note that temperatures are in Kelvin. So now I’m kind of curious regarding the data skipping, how did that work out? So actually we skipped 99.95% of the objects. So this gives a significant boost to performance and reduction in cost. So now you can do whatever you like with this weather data. For example, you use it to generate machine learning models and join it with your other datasets. So for this demo, we’re going to show an animated heat map of the temperature data for that month. So what this heat map does is it cycles through each day of the month and shows how the temperature in that region evolved over time. So as you can see here in the New York area temperature is fluctuating on a day to day basis. The other statistics covering what you just saw in the demo, creating daily skip indexes took just over 12 minutes and running out geospatial query took just over a minute. Thanks to the use of Parquet, The Catalog and Data Skipping the query scanned only 20 megabytes out of the 6.4 terabyte dataset.

– So here’s an example query that we’ll be using in production against our data layout. How, how our API will work here is we’ll take requests from a user, specifying a location in this case, a bounding box and range and time. Then we’ll translate that request into a SQL Query that will run behind the scenes. In the query here, notice first, the first half of the where clause, where we’re referencing the year month values from the catalog partitions earlier. What this does is it allows SQL Query to very quickly prune or filter out whole partitions or in our case months of data. And then after it finishes that round of pruning, then we move on to the second half of that where clause, where we’re referencing the specific timestamp from the request and the location. This first would filter against the Metadata that Paula described. And then finally against individual records from the Parquet files. So we did some query performance analysis against our layout. And so this is querying a geospatial area that’s 40 grid points by 40 grid points or roughly 25,000 square kilometers over a range of time up to on the high end one year. So in the charts below, you can see hours queried on the X axis and notice that both the bounding box query on the left and the radius query on the right are finishing right around 200 seconds. So how does that compare to our previous synchronous solution? With the synchronous solution for querying that same bounding box over one year took 8,000 queries and two hours and 15 minutes with our new synchronous solution, we’re down to a single query and only three minutes and 20 seconds. A few more comparisons. Again, we have now unlimited storage capacity global weather coverage instead of land only all 34 weather properties, unlimited query result size and all of that with an order of magnitude reduction in cost.

– So to conclude not only did we get order of magnitude cost reductions we also got performance improvements and enhance functionality at the same time. Key factors were adopting a serverless approach with the Cloud SQL Query and Cloud Object Storage. SQL Query has a powerful geospatial library, fully integrated Catalog and last but not least Geospatial data skipping. Finally, I wanna point out that our data is skipping work is extensible. It can be applied to other domains other than Geospatial. For example, log analytics or genomic data analysis. So thanks very much for listening and a big thanks to all the team. And don’t forget to rate and review this session and please take a look at our latest blog.


 
Watch more Data + AI sessions here
or
Try Databricks for free
« back
About Paula Ta-Shma

IBM

Dr. Paula Ta-Shma is a Research Staff Member in the Cloud & Data Technologies group at IBM Research - Haifa and is responsible for a group of research efforts in the area of Hybrid Data, with a particular focus on high performance, secure and cost efficient data stores and processing engines. She is particularly interested in performant SQL analytics over Object Storage and leads work on Data Skipping whose work is now integrated into multiple IBM products and services. Previously she led projects in areas such as cloud storage infrastructure for IoT and Continuous Data Protection. Prior to working at IBM Dr. Ta-Shma worked at several companies on Database Management Systems including Informix Software Inc. where she worked on Apache Derby. She holds M.Sc. and PhD degrees in computer science from the Hebrew University of Jerusalem.

About Erik Goepfert

The Weather Company, an IBM Business

Erik Goepfert is a Senior Software Engineer for IBM, focusing primarily on historical weather solutions as part of The Weather Company, an IBM Business. At The Weather Company, he works on the History on Demand application which ingests large amounts of weather data and serves it to users via an API. The data is then used by clients primarily for machine learning and data analytics. Previously he worked in the transportation industry, writing software to integrate mobile LiDAR, 3D pavement technology, imaging, and geospatial data collection equipment and data processing software for Mandli Communications. Erik received his Bachelor’s degree in Computer Science from University of Wisconsin, Milwaukee.