Overview

To access all the code examples in this stage, please import the Streaming Wordcount notebook.

To help introduce Apache Spark Streaming, we will be going through the Streaming Wordcount example – the “Hello World” example of Spark Streaming which counts words on 1-second batches of streaming data. It uses an in-memory string generator as a dummy source for streaming data. Please refer to the Streaming Wordcount notebook to execute this streaming job as this guide will focus on the primary coding components.

Apache Spark Streaming Concepts

Apache Spark Streaming is a scalable fault-tolerant streaming processing system. As part of Apache Spark, it integrates with MLlib, SQL, DataFrames, and GraphX. As for Spark 2.0, we will also release Structured Streaming so you can work with Streaming DataFrames.

Concepts behind Spark Streaming

Sensors, IoT devices, social networks, and online transactions are all generating data that needs to be monitored constantly and acted upon quickly. As a result, the need for large-scale, real-time stream processing is more evident than ever before. There are there are four broad ways Spark Streaming is being used today:

  • Streaming ETL – Data is continuously cleaned and aggregated before being pushed into data stores.
  • Triggers – Anomalous behavior is detected in real-time and further downstream actions are triggered accordingly. E.g. unusual behavior of sensor devices generating actions.
  • Data enrichment – Live data is enriched with more information by joining it with a static dataset allowing for a more complete real-time analysis.
  • Complex sessions and continuous learning – Events related to a live session (e.g. user activity after logging into a website or application) are grouped together and analyzed. In some cases, the session information is used to continuously update machine learning models.

Spark Streaming workflow diagram

In general, Spark Streaming works by having a set of receivers that receive data streams and chop them up into little batches. Spark then processes these batches and pushes out the results.

StreamingContext

Define the function that sets up the StreamingContext

As noted in the previous section, Spark Streaming requires two components: a receiver and a function that creates and sets up the streaming computation. For this Streaming Word Count example in this guide, we will focus on the function as this is the primary logic. Please reference the Streaming Word Count notebook to review the custom receiver as the dummy source.

// This is the dummy source implemented as a custom receiver. No need to understand this.
import scala.util.Random
import org.apache.spark.streaming.receiver._

class DummySource(ratePerSec: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
...
}

//
// This is the function that creates and sets up the streaming computation
//
var newContextCreated = false      // Flag to detect whether new context was created or not

// Function to create a new StreamingContext and set it up
def creatingFunc(): StreamingContext = {
    
  // Create a StreamingContext
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  
  // Create a stream that generates 1000 lines per second
  val stream = ssc.receiverStream(new DummySource(eventsPerSecond))  
  
  // Split the lines into words, and then do word count
  val wordStream = stream.flatMap { _.split(" ")  }
  val wordCountStream = wordStream.map(word => (word, 1)).reduceByKey(_ + _)

  // Create temp table at every batch interval
  //  For Apache Spark = 2.0
  //     rdd.toDF("word", "count").createOrReplaceTempView("batch_word_count") 
  wordCountStream.foreachRDD { rdd => 
    rdd.toDF("word", "count").createOrReplaceTempView("batch_word_count")    
  }
  
  stream.foreachRDD { rdd =>
    System.out.println("# events = " + rdd.count())
    System.out.println("t " + rdd.take(10).mkString(", ") + ", ...")
  }
  
  ssc.remember(Minutes(1))  // To make sure data is not deleted by the time we query it interactively
  
  println("Creating function called to create new StreamingContext")
  newContextCreated = true  
  ssc
}

Start Streaming Job: Stop existing StreamingContext if any and start/restart the new one

Here we are going to use the configurations at the top of the notebook to decide whether to stop any existing StreamingContext, and start a new one, or recover one from existing checkpoints.

// Stop any existing StreamingContext 
if (stopActiveContext) {	
  StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
} 

// Get or create a streaming context
val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
if (newContextCreated) {
  println("New context created from currently defined creating function") 
} else {
  println("Existing context running or recovered from checkpoint, may not be running currently defined creating function")
}

// Start the streaming context in the background.
ssc.start()

// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)

Interactive Querying

As you can see from the example below, the below query will change every time you execute it to reflect the current word count based on the input stream of data.

Interactive querying in Databricks with Spark Streaming

Once you are done, just execute the statement below to stop the streaming context.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

To access all the code examples in this stage, please import the Streaming Wordcount notebook.