Nested data types offer Databricks customers and Apache Spark users powerful ways to manipulate structured data. In particular, they allow you to put complex objects like arrays, maps and structures inside of columns. This can help you model your data in a more natural way. While this feature is certainly useful, it can be a bit cumbersome to manipulate data inside of the complex objects because SQL (and Spark) do not have primitives for working with such data. In addition, it is time-consuming, non-performant, and non-trivial.
For these reasons, we are excited to offer higher order functions in SQL in the Databricks Runtime 3.0 Release, allowing users to efficiently create functions, in SQL, to manipulate array based data. Higher-order functions are a simple extension to SQL to manipulate nested data such as arrays. For example, the
TRANSFORM expression below shows how we can add a number to every element in an array:
In this post, we’ll cover previous approaches to nested data manipulation in SQL, followed by the higher-order function syntax we have introduced in Databricks.
Before we introduce the new syntax for array manipulation, let’s first discuss the current approaches to manipulating this sort of data in SQL:
- built-in functions (limited functionality)
- unpack the array into individual rows, apply your function, then repack them into an array (many steps, hence inefficient)
- UDFs (not generic or efficient)
We’ll explore each of these independently so that you can understand why array manipulation is difficult. Let’s start off with a table with the schema below (see the included notebook for code that’s easy to run).
root |-- key: long (nullable = false) |-- values: array (nullable = false) | |-- element: integer (containsNull = true) |-- nested_values: array (nullable = false) | |-- element: array (containsNull = false) | | |-- element: integer (containsNull = true)
Spark SQL does have some built-in functions for manipulating arrays. For example, you can create an array, get its size, get specific elements, check if the array contains an object, and sort the array. Spark SQL also supports generators (
inline) that allow you to combine the input row with the array elements, and the
collect_list aggregate. This functionality may meet your needs for certain tasks, but it is complex to do anything non-trivial, such as computing a custom expression of each array element.
Unpack and Repack
The common approach for non-trivial manipulations is the “unpack and repack” method. This is a “Spark SQL native” way of solving the problem because you don’t have to write any custom code; you simply write SQL code. The unpack and repack approach works by applying the following steps:
LATERAL VIEW explodeto flatten the array, and combine the input row with each element in the array;
- Apply a given transformation, in this example
value + 1, to each element in the exploded array; and
collect_setto create a new array.
We can see an example of this in the SQL code below:
SELECT key, values, collect_list(value + 1) AS values_plus_one FROM nested_data LATERAL VIEW explode(values) T AS value GROUP BY key, values
While this approach certainly works, it has a few problems. First, you must be absolutely sure that the key you are used for grouping is unique, otherwise the end result will be incorrect. Second, there is no guaranteed ordering of arrays in Spark SQL. Specifying an operation that requires a specific ordering nearly guarantees incorrect results. Finally, the generated Spark SQL plan will likely be very expensive.
User-Defined Functions (UDFs)
Lastly, we can write custom UDFs to manipulate array data. Our UDFs must define how we traverse an array and how we process the individual elements. Let’s see some basic examples in Python and Scala.
from pyspark.sql.types import IntegerType from pyspark.sql.types import ArrayType def add_one_to_els(elements): return [el + 1 for el in elements] spark.udf.register("plusOneIntPython", add_one_to_els, ArrayType(IntegerType()))
def addOneToElements(elements: Seq[Int]) = elements.map(element => element + 1) spark.udf.register("plusOneInt", addOneToElements(_:Seq[Int]):Seq[Int])
Once registered, we can use those functions to manipulate our data in Spark SQL.
SELECT key, values, plusOneInt(values) AS values_plus_one, plusOneIntPython(values) AS values_plus_one_py FROM nested_data
This approach has some advantages over the previous version: for example, it maintains element order, unlike the pack and repack method. However, it has two key disadvantages. First, you have to write functions in other languages than SQL and register them before running. Second, data serialization into Scala and Python can be very expensive, slowing down UDFs over Spark’s SQL optimized built-in processing.
Our Approach: Higher Order Functions
As observed from the examples above, the traditional ways to manipulate nested data in SQL are cumbersome. To that end, we have built a simple solution in Databricks: higher order functions in SQL.
Our solution introduces two functional programming constructions to SQL: higher order functions and anonymous (lambda) functions. These work together to allow you to define functions that manipulate arrays in SQL. The higher order function, such as
TRANSFORM, takes an array and a lambda function from the user to run on it. It then calls this lambda function on each element in the array.
A Simple Example:
Let’s illustrate the previous concepts with the transformation from our previous example. In this case, the higher order function,
TRANSFORM, will iterate over the array, apply the associated lambda function to each element, and create a new array. The lambda function,
element + 1, specifies how each element is manipulated.
SELECT key, values, TRANSFORM(values, value -> value + 1) AS values_plus_one FROM nested_data
To be abundantly clear, the transformation
TRANSFORM(values, value -> value + 1) has two components:
TRANSFORM(values..)is the higher order function. This takes an array and an anonymous function as its input. Internally transform will take care of setting up a new array, applying the anonymous function to each element, and assigning the result to the output array.
value -> value + 1is an anonymous function. The function is divided into two components separated by a
a. The argument list. In this case, we only have one argument: value. We also support multiple arguments by creating a comma separated list of arguments enclosed by parenthesis, for example:
(x, y) -> x + y.
b. The body. This is an expression that can use the arguments and outer variables to calculate the new value. In this case, we add 1 to the value argument.
We can also use other variables than the arguments in a lambda function; this is called capture. We can use variables defined on the top level, or variables defined in intermediate lambda functions. For example, the following transform adds the key (top level) variable to each element in the values array:
SELECT key, values, TRANSFORM(values, value -> value + key) AS values_plus_key FROM nested_data
Sometimes data is deeply nested. If you want to transform such data, you can can use nested lambda functions. The following example transforms an array of integer arrays, and adds the key (top level) column and the size of the intermediate array to each element in the nested array.
SELECT key, nested_values, TRANSFORM(nested_values, values -> TRANSFORM(values, value -> value + key + SIZE(values))) AS new_nested_values FROM nested_data
We have added the following higher order functions to the 3.0 version of the Databricks Runtime.
transform(array<T>, function<T, U>): array<U>
This produces an
array<u> by applying a
function<T, U> to each element of an input
Note that the functional programming equivalent operation is map. This has been named transform in order to prevent confusion with the map expression (that creates a map from a key value expression).
The following query transforms the values array by adding the key value to each element:
SELECT key, values, TRANSFORM(values, value -> value + key) transformed_values FROM nested_data
exists(array<T>, function<T, V, Boolean>): Boolean
Return true if predicate
function<T, Boolean> holds for any element in input
The following examples checks if the values array contains an elements for which the modulo 10 is equal to 1:
SELECT key, values, EXISTS(values, value -> value % 10 == 1) filtered_values FROM nested_data
filter(array<T>, function<T, Boolean>): array<T>
Produce an output
array from an input
array by only only adding elements for which predicate
function<T, Boolean> holds.
The following examples filters the values array only elements with a value > 50 are allowed:
SELECT key, values, FILTER(values, value -> value > 50) filtered_values FROM nested_data
aggregate(array<T>, B, function<B, T, B>, function<B, R>): R
Reduce the elements of
array into a single value
R by merging the elements into a buffer
function<B, T, B> and by applying a finish
function<B, R> on the final buffer. The initial value
B is determined by a zero expression. The finalize function is optional, if you do not specify the function the finalize function the identity function
(id -> id) is used.
This is the only higher order function that takes two lambda functions.
The following example sums (aggregates) the values array into a single (sum) value. Both a version with a finalize function (
summed_values) and one without a finalize function
summed_values_simple is shown:
SELECT key, values, REDUCE(values, 0, (value, acc) -> value + acc, acc -> acc) summed_values, REDUCE(values, 0, (value, acc) -> value + acc) summed_values_simple FROM nested_data
You can also compute more complex aggregates. The code below shows the computation of the geometric mean of the array elements.
SELECT key, values, AGGREGATE(values, (1.0 AS product, 0 AS N), (buffer, value) -> (value * buffer.product, buffer.N + 1), buffer -> Power(buffer.product, 1.0 / buffer.N)) geomean FROM nested_data
Higher order functions will available in Databricks Runtime 3.0. If you have any nested data, be sure to try them!
This work adds initial support for using higher order functions with nested array data. Additional functions and support for map data are on their way. Be sure to check out the Databricks blog and documentation.