Simplify Streaming Stock Data Analysis Using Databricks Delta

by and

Traditionally, real-time analysis of stock data was a complicated endeavor due to the complexities of maintaining a streaming system and ensuring transactional consistency of legacy and streaming data concurrently.  Databricks Delta helps solve many of the pain points of building a streaming system to analyze stock data in real-time.

In the following diagram, we provide a high-level architecture to simplify this problem.  We start by ingesting two different sets of data into two Databricks Delta tables. The two datasets are stocks prices and fundamentals. After ingesting the data into their respective tables, we then join the data in an ETL process and write the data out into a third Databricks Delta table for downstream analysis.

In this blog post we will review:

  • The current problems of running such a system
  • How Databricks Delta addresses these problems
  • How to implement the system in Databricks

Databricks Delta helps solve these problems by combining the scalability, streaming, and access to advanced analytics of Apache Spark with the performance and ACID compliance of a data warehouse.

Traditional pain points prior to Databricks Delta

The pain points of a traditional streaming and data warehousing solution can be broken into two groups: data lake and data warehouse pains.

Data Lake Pain Points

While data lakes allow you to flexibly store an immense amount of data in a file system, there are many pain points including (but not limited to):

  • Consolidation of streaming data from many disparate systems is difficult.
  • Updating data in a Data Lake is nearly impossible and much of the streaming data needs to be updated as changes are made. This is especially important in scenarios involving financial reconciliation and subsequent adjustments.
  • Query speeds for a data lake are typically very slow.
  • Optimizing storage and file sizes is very difficult and often require complicated logic.

Data Warehouse Pain Points

The power of a data warehouse is that you have a persistent performant store of your data.  But the pain points for building modern continuous applications include (but not limited to):

  • Constrained to SQL queries; i.e. no machine learning or advanced analytics.
  • Accessing streaming data and stored data together is very difficult if at all possible.
  • Data warehouses do not scale very well.
  • Tying compute and storage together makes using a warehouse very expensive.

How Databricks Delta Solves These Issues

Databricks Delta (Databricks Delta Guide) is a unified data management system that brings data reliability and performance optimizations to cloud data lakes.  More succinctly, Databricks Delta takes the advantages of data lakes and data warehouses together with Apache Spark to allow you to do incredible things!

  • Databricks Delta, along with Structured Streaming, makes it possible to analyze streaming and historical data together at data warehouse speeds.
  • Using Databricks Delta tables as sources and destinations of streaming big data make it easy to consolidate disparate data sources.
  • Upserts are supported on Databricks Delta tables.
  • Your streaming/data lake/warehousing solution has ACID compliance.
  • Easily include machine learning scoring and advanced analytics into ETL and queries.
  • Decouples compute and storage for a completely scalable solution.

Implement your streaming stock analysis solution with Databricks Delta

Databricks Delta and Apache Spark do most of the work for our solution; you can try out the full notebook and follow along with the code samples below.   Let’s start by enabling Databricks Delta; as of this writing, Databricks Delta is in private preview so sign up at https://databricks.com/product/databricks-delta.

As noted in the preceding diagram, we have two datasets to process – one for fundamentals and one for price data.  To create our two Databricks Delta tables, we specify the .format(“delta”) against our DBFS locations.

# Create Fundamental Data (Databricks Delta table)
dfBaseFund = spark \\
.read \\
.format('delta') \\
.load('/delta/stocksFundamentals')

# Create Price Data (Databricks Delta table)
dfBasePrice = spark \\
.read \\
.format('delta') \\
.load('/delta/stocksDailyPrices')

While we’re updating the stockFundamentals and stocksDailyPrices, we will consolidate this data through a series of ETL jobs into a consolidated view (stocksDailyPricesWFund).    With the following code snippet, we can determine the start and end date of available data and then combine the price and fundamentals data for that date range into DBFS.

# Determine start and end date of available data
row = dfBasePrice.agg( 
   func.max(dfBasePrice.price_date).alias("maxDate"),  
   func.min(dfBasePrice.price_date).alias("minDate")
).collect()[0]
startDate = row["minDate"]
endDate = row["maxDate"]

# Define our date range function
def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + datetime.timedelta(n)

# Define combinePriceAndFund information by date and 
def combinePriceAndFund(theDate):
  dfFund = dfBaseFund.where(dfBaseFund.price_date == theDate)  
  dfPrice = dfBasePrice.where(
dfBasePrice.price_date == theDate
).drop('price_date')
  # Drop the updated column
  dfPriceWFund = dfPrice.join(dfFund, ['ticker']).drop('updated')

  # Save data to DBFS    
   dfPriceWFund
.write
.format('delta')
.mode('append')
.save('/delta/stocksDailyPricesWFund')

# Loop through dates to complete fundamentals + price ETL process
for single_date in daterange(
startDate, (endDate + datetime.timedelta(days=1))
):
  print 'Starting ' + single_date.strftime('%Y-%m-%d')
  start = datetime.datetime.now()
  combinePriceAndFund(single_date)
  end = datetime.datetime.now()
  print (end - start)

Now we have a stream of consolidated fundamentals and price data that is being pushed into DBFS in the /delta/stocksDailyPricesWFund location.  We can build a Databricks Delta table by specifying .format(“delta”) against that DBFS location.

dfPriceWithFundamentals = spark
.readStream
.format("delta")
.load("/delta/stocksDailyPricesWFund")

// Create temporary view of the data
dfPriceWithFundamentals.createOrReplaceTempView("priceWithFundamentals")

Now that we have created our initial Databricks Delta table, let’s create a view that will allow us to calculate the price/earnings ratio in real time (because of the underlying streaming data updating our Databricks Delta table).

%sql
CREATE OR REPLACE TEMPORARY VIEW viewPE AS 
select ticker, 
       price_date, 
       first(close) as price, 
       (close/eps_basic_net) as pe
  from priceWithFundamentals
 where eps_basic_net > 0
 group by ticker, price_date, pe

Analyze streaming stock data in real time

With our view in place, we can quickly analyze our data using Spark SQL.

%sql
select *
from viewPE
where ticker == "AAPL"
order by price_date

As the underlying source of this consolidated dataset is a Databricks Delta table, this view isn’t just showing the batch data but also any new streams of data that are coming in as per the following streaming dashboard.

Underneath the covers, Structured Streaming isn’t just writing the data to Databricks Delta tables but also keeping the state of the distinct number of keys (in this case ticker symbols) that need to be tracked.

Because you are using Spark SQL, you can execute aggregate queries at scale and in real-time.

%sql
SELECT ticker, AVG(close) as Average_Close
FROM priceWithFundamentals
GROUP BY ticker
ORDER BY Average_Close

Summary

In closing, we demonstrated how to simplify streaming stock data analysis using Databricks Delta.  By combining Spark Structured Streaming and Databricks Delta, we can use the Databricks integrated workspace to create a performant, scalable solution that has the advantages of both data lakes and data warehouses.  The Databricks Unified Analytics Platform removes the data engineering complexities commonly associated with streaming and transactional consistency enabling data engineering and data science teams to focus on understanding the trends in their stock data.

Databricks Blog

Try Databricks for free. Get started today