Skip to main content
Company Blog

Read Rise of the Data Lakehouse to explore why lakehouses are the data architecture of the future with the father of the data warehouse, Bill Inmon.

A couple of weeks ago, we published a short blog and an accompanying tutorial notebook that demonstrated how to use five Spark SQL utility functions to explore and extract structured and nested data from IoT Devices. Keeping with the same theme, I want to show how you can put to a wide use of the four high-order functions introduced in SQL as part of Databricks Runtime Beta 3.0.

Knowing why offers insight but doesn’t make you productive. Knowing how does. Whereas our in-depth blog explains the concepts and motivations of why handling complex data types such as arrays with high-order functions are important in SQL, this blog is a preamble to how as a notebook tutorial to use high-order functions in SQL in processing structured data and arrays in IoT device events.

In particular, you can put them to good use if you enjoy functional programming. Far more important, these high-order functions offer three benefits. For instance, you don’t need to:

  • unpack arrays into individual rows and apply your function and repack them;
  • depend on limited built-in functions; and
  • write UDFs in Scala or Python.

All can be done in SQL. For this tutorial, we will explore four SQL functions, and how you can use them to process array types:

Again, as in the previous tutorial, the takeaway from this tutorial is simple: There exist myriad ways to slice and dice nested JSON structures with Spark SQL utility functions, namely the aforementioned list. These dedicated high-order functions are primarily suited to manipulate arrays in SQL, making the code easier to write and more concise to express when processing table values with arrays or nested arrays.

To give you a glimpse, consider this nested schema that defines what your IoT events may look like coming down an Apache Kafka stream or deposited in a data source of your choice.

from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType() \
          .add("dc_id", StringType()) \
          .add("source", MapType(StringType(), StructType() \
                        .add("description", StringType()) \
                        .add("ip", StringType()) \
                        .add("id", IntegerType()) \
                        .add("temp", ArrayType(IntegerType())) \
                        .add("c02_level", ArrayType(IntegerType())) \
                        .add("geo", StructType() \
                              .add("lat", DoubleType()) \
                              .add("long", DoubleType()))))

And its corresponding sample DataFrame/Dataset data may look as follows:

dataDF = jsonToDataFrame( """{
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":[35,35,35,36,35,35,32,35,30,35,32,35],
        "c02_level": [1475,1475,1473],
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": [45,45,45,46,45,45,42,35,40,45,42,45],
        "c02_level": [1370,1370,1372],
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": [40,40,40,40,40,43,42,40,40,45,42,45],
        "c02_level": [1346,1346, 1343],
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp":[30,30,30,30,40,43,42,40,40,35,42,35],
        "c02_level": [1574,1570, 1576],
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""", schema)

If you examine the corresponding schema in our Python notebook, you will see the nested structures: array of integers for temp and c02-level.

root
 |-- dc_id: string (nullable = true)
 |-- source: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- temp: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- c02_level: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- geo: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- long: double (nullable = true)

I use a sample of these JSON event data from IoT devices to illustrate how to use these SQL functions. Instead of repeating myself here what I already demonstrated in the notebook, I encourage that you explore the accompanying notebook, import it into your Databricks workspace, and have a go at it.

What’s Next

Try the accompanying tutorial on Databricks. If you have not read our previous related blog and its tutorial on Spark SQL utility functions, do read them. Also, If you don’t have a Databricks account, get one today.