Skip to main content

Try this notebook on Databricks

For developers, often the how is as important as the why. While our in-depth blog explains the concepts and motivations of why handling complex data types and formats are important, and equally explains their utility in processing complex data structures, this blog post is a preamble to the how as a notebook tutorial.

In this tutorial, I show and share ways in which you can explore and employ five Spark SQL utility functions and APIs. Introduced in Apache Spark 2.x as part of org.apache.spark.sql.functions, they enable developers to easily work with complex data or nested data types.

In particular, they come in handy while doing Streaming ETL, in which data are JSON objects with complex and nested structures: Map and Structs embedded as JSON. This notebook tutorial focuses on the following Spark SQL functions:

To give you a glimpse, consider this nested schema that defines what your IoT events may look like coming down an Apache Kafka stream or deposited in a data source of your choice.

[code_tabs]

import org.apache.spark.sql.types._
val schema = new StructType()
  .add("dc_id", StringType)                               // data center where data was posted to Kafka cluster
  .add("source",                                          // info about the source of alarm
    MapType(                                              // define this as a Map(Key->value)
      StringType,
      new StructType()
      .add("description", StringType)
      .add("ip", StringType)
      .add("id", LongType)
      .add("temp", LongType)
      .add("c02_level", LongType)
      .add("geo", 
         new StructType()
          .add("lat", DoubleType)
          .add("long", DoubleType)
        )
      )
    )
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType() \
          .add("dc_id", StringType()) \                           # data center where data was posted to Kafka cluster
          .add("source", MapType(StringType(), StructType() \     # info about the source of alarm
                        .add("description", StringType()) \       # define this as a Map(Key->value)
                        .add("ip", StringType()) \
                        .add("id", LongType()) \
                        .add("temp", LongType()) \
                        .add("c02_level", LongType()) \
                        .add("geo", StructType() \
                              .add("lat", DoubleType()) \
                              .add("long", DoubleType()))))

[/code_tabs]

And its corresponding sample DataFrame/Dataset data may look as follows:

[code_tabs]

val dataDS = Seq("""
{
"dc_id": "dc-101",
"source": {
    "sensor-igauge": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo": {"lat":38.00, "long":97.00}                        
    },
    "sensor-ipad": {
      "id": 13,
      "ip": "67.185.72.1",
      "description": "Sensor ipad attached to carbon cylinders",
      "temp": 34,
      "c02_level": 1370,
      "geo": {"lat":47.41, "long":-122.00}
    },
    "sensor-inest": {
      "id": 8,
      "ip": "208.109.163.218",
      "description": "Sensor attached to the factory ceilings",
      "temp": 40,
      "c02_level": 1346,
      "geo": {"lat":33.61, "long":-111.89}
    },
    "sensor-istick": {
      "id": 5,
      "ip": "204.116.105.67",
      "description": "Sensor embedded in exhaust pipes in the ceilings",
      "temp": 40,
      "c02_level": 1574,
      "geo": {"lat":35.93, "long":-85.46}
    }
  }
}""").toDS()
# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

dataDF = jsonToDataFrame( """{

    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":35,
        "c02_level": 1475,
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": 34,
        "c02_level": 1370,
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": 40,
        "c02_level": 1346,
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp": 40,
        "c02_level": 1574,
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""", schema)

[/code_tabs]

If you examine the respective schemas in Scala or Python notebook, you can see the nested structures:

[code_tabs]

df.printSchema
root
 |-- dc_id: string (nullable = true)
 |-- source: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- temp: long (nullable = true)
 |    |    |-- c02_level: long (nullable = true)
 |    |    |-- geo: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- long: double (nullable = true)
dataDF.printSchema()

root
 |-- dc_id: string (nullable = true)
 |-- source: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- temp: long (nullable = true)
 |    |    |-- c02_level: long (nullable = true)
 |    |    |-- geo: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- long: double (nullable = true)

[/code_tabs]

I use a sample of these JSON event data from IoT and Nest devices to illustrate how to use these functions. The takeaway from this tutorial is that there are myriad ways to slice and dice nested JSON structures with Spark SQL utility functions, namely the aforementioned list.


nest image source

Since I would be repeating here what I already demonstrated in the notebook, I encourage that you explore the accompanying notebook, import it into your Databricks workspace, and have a go at it.

What's Next?

In a follow-up tutorial on Higher Order Functions, I'll explore how to use these powerful SQL functions to manipulate structured data.

If you don’t have a Databricks account, get one Databricks today.

Try Databricks for free

Related posts

See all Engineering Blog posts