Overview

To access all the code examples in this stage, please import the Examining IoT Device Using Datasets notebook.

The Apache Spark Dataset API provides a type-safe, object-oriented programming interface. In other words, in Spark 2.0 DataFrame and Datasets are unified as explained in Quick Start > RDDs, DataFrames, and Datasets, and DataFrame is an alias for an untyped Dataset [Row]. Like DataFrames, Datasets take advantage of Spark’s Catalyst optimizer by exposing expressions and data fields to a query planner. Beyond Catalyst’s optimizer, Datasets also leverage Tungsten’s fast in-memory encoding. They extend these benefits with compile-time type safety—meaning production applications can be checked for errors before they are ran—and they also allow direct operations over user-defined classes, as you will see in a couple of simple examples below. Lastly, the Dataset API offers a high-level domain specific language operations like sum(), avg(), join(), select(), groupBy(), making the code a lot easier to express, read, and write.

In this section, you will learn two ways to create Datasets: dynamically creating a data and reading from JSON file using Spark Session. Additionally, through simple and short examples, you will learn about Dataset API operations on the Dataset, issue SQL queries and visualize data. For learning purposes, we use a small IoT Device dataset; however, there is no reason why you can’t use a large dataset.

We have made a number of datasets available in the /databricks-datasets folder which is accessible within the Databricks platform.

Creating or Loading Sample Data

There are two easy ways to have your structured data accessible and process it using Dataset APIs within a notebook. First, for primitive types in examples or demos, you can create them within a Scala or Python notebook or in your sample Spark application. For example, here’s a way to create a Dataset of 100 integers in a notebook.

Note that in Spark 2.0, the SparkContext is subsumed by SparkSession, a single point of entry, called spark. Going forward, you can use this handle in your driver or notebook cell, as shown below, in which we create 100 integers as Dataset[Long].

// range of 100 numbers to create a Dataset.
val range100 = spark.range(100)
range100.collect()

Output of a simple Dataset

Second, the more common way is to read a data file from an external data sources, such HDFS, S3, NoSQL, RDBMS, or local filesystem. Spark supports multiple formats : JSON, CSV, Text, Parquet, ORC etc. To read a JSON file, you can simply use the SparkSession handle spark.

// read a JSON file from a location mounted on a DBFS mount point
// Note that we are using the new entry point in Spark 2.0 called spark
val jsonData = spark.read.json("/databricks-datasets/data/people/person.json")

At the time of reading the JSON file, Spark does not know the structure of your data—how you want to organize your data into a typed-specific JVM object. It attempts to infer the schema from the JSON file and creates a DataFrame = Dataset[Row] of generic Row objects.

Alternatively, to convert your DataFrame into a Dataset reflecting a Scala class object, you define a domain specific Scala case class, followed by explicitly converting into that type, as shown below.

// First, define a case class that represents our type-specific Scala JVM Object
case class Person (email: String, iq: Long, name: String)

// Read the JSON file, convert the DataFrames into a type-specific JVM Scala object // Person. Note that at this stage Spark, upon reading JSON, created a generic
// DataFrame = Dataset[Rows]. By explicitly converting DataFrame into Dataset
// results in a type-specific rows or collection of objects of type Person
val ds = spark.read.json("/databricks-datasets/data/people/person.json").as[Person]

In a second example, we do something similar with IoT devices state information captured in a JSON file: define a case class and read the JSON file from the FileStore, and convert the DataFrame = Dataset[DeviceIoTData].

There are a couple of reasons why you want to convert a DataFrame into a type-specific JVM objects. First, after an explicit conversion, for all relational and query expressions using Dataset API, you get compile-type safety. For example, if you use a filter operation using the wrong data type, Spark will detect mismatch types and issue a compile error rather an execution runtime error, resulting in catching errors earlier. Second, the Dataset API provides high-order methods making code much easier to read and develop.

In the following section, Processing and Visualizing a Dataset, you will notice how the use of Dataset typed objects make the code much easier to express and read.

As above with Person example, here we create a case class that encapsulates our Scala object.

// define a case class that represents our Device data.
case class DeviceIoTData (
  battery_level: Long,
  c02_level: Long,
  cca2: String,
  cca3: String,
  cn: String,
  device_id: Long,
  device_name: String,
  humidity: Long,
  ip: String,
  latitude: Double,
  longitude: Double,
  scale: String,
  temp: Long,
  timestamp: Long
)

// fetch the JSON device information uploaded into the Filestore
val jsonFile = "/databricks-datasets/data/iot/iot_devices.json"

// read the json file and create the dataset from the case class DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json(jsonFile).as[DeviceIoTData]

Viewing a Dataset

To view this data in a tabular format, instead of exporting this data out to a third party tool, you can use the Databricks display() command. That is, once you have loaded the JSON data and converted into a Dataset for your type-specific collection of JVM objects, you can view them as you would view a DataFrame, by using either display() or using standard Spark commands, such as take(), foreach(), and println() API calls.

// display the dataset table just read in from the JSON file
display(ds)

Example of the available visualizations in Databricks from the display() command

Screenshot of a Dataset being displayed in tabular format

// Using the standard Spark commands, take() and foreach(), print the first 
// 10 rows of the Datasets.
ds.take(10).foreach(println(_))

Print first 10 rows of a dataset

Processing and Visualizing a Dataset

An additional benefit of using the Databricks display() command is that you can quickly view this data with a number of embedded visualizations. For example, in a new cell, you can issue SQL queries and click on the map to see the data. But first, you must save your dataset, ds, as a temporary table.

// registering your Dataset as a temporary table to which you can issue SQL queries
ds.createOrReplaceTempView("iot_device_data")

Screenshot of a Dataset being displayed in a map chart

Like RDD, Dataset has transformations and actions methods. Most importantly are the high-level domain specific operations such as sum(), select(), avg(), join(), and union() that are absent in RDDs. For more information, look at the Scala Dataset API.

Let’s look at a few handy ones in action. In the example below, we use filter(), map(), groupBy(), and avg(), all higher-level methods, to create another Dataset, with only fields that we wish to view. What’s noteworthy is that we access the attributes we want to filter by their names as defined in the case class. That is, we use the dot notation to access individual fields. As such, it makes code easy to read and write.

// filter out all devices whose temperature exceed 25 degrees and generate 
// another Dataset with three fields that of interest and then display 
// the mapped Dataset
val dsTemp = ds.filter(d => d.temp > 25).map(d => (d.temp, d.device_name, d.cca3)
display(dsTemp)

The display() output of a filtered dataset

// Apply higher-level Dataset API methods such as groupBy() and avg().
// Filter temperatures > 25, along with their corresponding
// devices' humidity, compute averages, groupBy cca3 country codes,
// and display the results, using table and bar charts
val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()

// display averages as a table, grouped by the country
display(dsAvgTmp)

Display dataset averages as a table

// display the averages as bar graphs, grouped by the country
display(dsAvgTmp)

Display the averages as bar graphs, grouped by the country

// Select individual fields using the Dataset method select()
// where battery_level is greater than 6. Note this high-level
// domain specific language API reads like a SQL query
display(ds.select($"battery_level", $"c02_level", $"device_name").where($"battery_level" > 6).sort($"c02_level"))

Select individual fields using the Dataset method select()

Below is an animated gif showing how quickly you can go from table to map to charts using Datasets and Databricks  display() command.

Having saved the Dataset of DeviceIoTData as a temporary table, you can issue SQL queries to it.

%sql select cca3, count (distinct device_id) as device_id from iot_device_data group by cca3 order by device_id desc limit 100

Example of the available visualizations in Databricks from the display() command

To access all the code examples in this stage, please import the Examining IoT Devices Using Datasets notebook.