PySpark has always provided wonderful SQL and Python APIs for querying data. As of Databricks Runtime 12.1 and Apache Spark 3.4, parameterized queries support safe and expressive ways to query data with SQL using Pythonic programming paradigms.
This post explains how to make parameterized queries with PySpark and when this is a good design pattern for your code.
Parameters are helpful for making your Spark code easier to reuse and test. They also encourage good coding practices. This post will demonstrate the two different ways to parameterize PySpark queries:
Let's look at how to use both types of PySpark parameterized queries and explore why the built-in functionality is better than other alternatives.
Benefits of parameterized queries
Parameterized queries encourage the "don't repeat yourself" (DRY) pattern, make unit testing easier, and make SQL easier-to-reuse. They also prevent SQL injection attacks, which can pose security vulnerabilities.
It can be tempting to copy and paste large chunks of SQL when writing similar queries. Parameterized queries encourage abstracting patterns and writing code with the DRY pattern.
Parameterized queries are also easier to test. You can parameterize a query so it is easy to run on production and test datasets.
On the other hand, manually parameterizing SQL queries with Python f-strings is a poor alternative. Consider the following disadvantages:
- Python f-strings do not protect against SQL injection attacks.
- Python f-strings do not understand Python native objects such as DataFrames, columns, and special characters.
Let's look at how to parameterize queries with parameter markers, which protect your code from SQL injection vulnerabilities, and support automatic type conversion of common PySpark instances in string format.
Parameterized queries with PySpark custom string formatting
Suppose you have the following data table called h20_1e9
with nine columns:
+-----+-----+------------+---+---+-----+---+---+---------+
| id1| id2| id3|id4|id5| id6| v1| v2| v3|
+-----+-----+------------+---+---+-----+---+---+---------+
|id008|id052|id0000073659| 84| 89|82005| 5| 11|64.785802|
|id079|id037|id0000041462| 4| 35|28153| 1| 1|28.732545|
|id098|id031|id0000027269| 27| 38|13508| 5| 2|59.867875|
+-----+-----+------------+---+---+-----+---+---+---------+
You would like to parameterize the following SQL query:
SELECT id1, SUM(v1) AS v1
FROM h20_1e9
WHERE id1 = "id089"
GROUP BY id1
You'd like to make it easy to run this query with different values of id1
. Here's how to parameterize and run the query with different id1
values.
query = """SELECT id1, SUM(v1) AS v1
FROM h20_1e9
WHERE id1 = {id1_val}
GROUP BY id1"""
spark.sql(query, id1_val="id016").show()
+-----+------+
| id1| v1|
+-----+------+
|id016|298268|
+-----+------+
Now rerun the query with another argument:
spark.sql(query, id1_val="id018").show()
+-----+------+
| id1| v1|
+-----+------+
|id089|300446|
+-----+------+
The PySpark string formatter also lets you execute SQL queries directly on a DataFrame without explicitly defining temporary views.
Suppose you have the following DataFrame called person_df
:
+---------+--------+
|firstname| country|
+---------+--------+
| frank| usa|
| sourav| india|
| rahul| india|
| sim|buglaria|
+---------+--------+
Here's how to query the DataFrame with SQL.
spark.sql(
"select country, count(*) as num_ppl from {person_df} group by country",
person_df=person_df,
).show()
+--------+-------+
| country|num_ppl|
+--------+-------+
| usa| 1|
| india| 2|
|bulgaria| 1|
+--------+-------+
Running queries on a DataFrame using SQL syntax without having to manually register a temporary view is very nice!
Let's now see how to parameterize queries with arguments in parameter markers.
Parameterized queries with parameter markers
You can also use a dictionary of arguments to formulate a parameterized SQL query with parameter markers.
Suppose you have the following view named some_purchases:
+-------+------+-------------+
| item|amount|purchase_date|
+-------+------+-------------+
| socks| 7.55| 2022-05-15|
|handbag| 49.99| 2022-05-16|
| shorts| 25.0| 2023-01-05|
+-------+------+-------------+
Here's how to make a parameterized query with named parameter markers to calculate the total amount spent on a given item.
query = "SELECT item, sum(amount) from some_purchases group by item having item = :item"
Compute the total amount spent on socks.
spark.sql(
query,
args={"item": "socks"},
).show()
+-----+-----------+
| item|sum(amount)|
+-----+-----------+
|socks| 32.55|
+-----+-----------+
You can also parameterize queries with unnamed parameter markers; see here for more information.
Apache Spark sanitizes parameters markers, so this parameterization approach also protects you from SQL injection attacks.
How PySpark sanitizes parameterized queries
Here's a high-level description of how Spark sanitizes the named parameterized queries:
- The SQL query arrives with an optional key/value parameters list.
- Apache Spark parses the SQL query and replaces the parameter references with corresponding parse tree nodes.
- During analysis, a Catalyst rule runs to replace these references with their provided parameter values from the parameters.
- This approach protects against SQL injection attacks because it only supports literal values. Regular string interpolation applies substitution on the SQL string; this strategy can be vulnerable to attacks if the string contains SQL syntax other than the intended literal values.
As previously mentioned, there are two types of parameterized queries supported in PySpark:
- Client-side parameterization using the {} syntax based on PEP 3101 (we've been referring to this as custom string formatting).
- Server-side parameterization using either named parameter markers or unnamed parameter markers.
The {}
syntax does a string substitution on the SQL query on the client side for ease of use and better programmability. However, it does not protect against SQL injection attacks since the query text is substituted before being sent to the Spark server.
Parameterization uses the args
argument of the sql()
API and passes the SQL text and parameters separately to the server. The SQL text gets parsed with the parameter placeholders, substituting the values of the parameters specified in the args
in the analyzed query tree.
There are two flavors of server-side parameterized queries: named parameter markers and unnamed parameter markers. Named parameter markers use the :<param_name>
syntax for placeholders. See the documentation for more information on how to use unnamed parameter markers.
Parameterized queries vs. string interpolation
You can also use regular Python string interpolation to parameterize queries, but it's not as convenient.
Here's how we'd have to parameterize our previous query with Python f-strings:
some_df.createOrReplaceTempView("whatever")
the_date = "2021-01-01"
min_value = "4.0"
table_name = "whatever"
query = f"""SELECT * from {table_name}
WHERE the_date > '{the_date}' AND number > {min_value}"""
spark.sql(query).show()
This isn't as nice for the following reasons:
- It requires creating a temporary view.
- We need to represent the date as a string, not a Python date.
- We need to wrap the date in single quotes in the query to format the SQL string properly.
- This doesn't protect against SQL injection attacks.
In sum, built-in query parameterization capabilities are safer and more effective than string interpolation.
Conclusion
PySpark parameterized queries give you new capabilities to write clean code with familiar SQL syntax. They're convenient when you want to query a Spark DataFrame with SQL. They let you use common Python data types like floating point values, strings, dates, and datetimes, which automatically convert to SQL values under the hood. In this manner, you can now leverage common Python idioms and write beautiful code.
Start leveraging PySpark parameterized queries today, and you will immediately enjoy the benefits of a higher quality codebase.