This is a cross-post from the blog of Olivier Girardot. Olivier is a software engineer and the co-founder of Lateral Thoughts, where he works on Machine Learning, Big Data, and DevOps solutions.
With the introduction of window operations in Apache Spark 1.4, you can finally port pretty much any relevant piece of Pandas’ DataFrame computation to Apache Spark parallel computation framework using Spark SQL’s DataFrame. If you’re not yet familiar with Spark’s DataFrame, don’t hesitate to check out RDDs are the new bytecode of Apache Spark and come back here after.
I figured some feedback on how to port existing complex code might be useful, so the goal of this article will be to take a few concepts from Pandas DataFrame and see how we can translate this to PySpark’s DataFrame using Spark 1.4.
Disclaimer: a few operations that you can do in Pandas don't translate to Spark well. Please remember that DataFrames in Spark are like RDD in the sense that they’re an immutable data structure. Therefore things like:
df['three'] = df['one'] * df['two']
Can’t exist, just because this kind of affectation goes against the principles of Spark. Another example would be trying to access by index a single element within a DataFrame. Don’t forget that you’re using a distributed data structure, not an in-memory random-access data structure.
To be clear, this doesn’t mean that you can’t do the same kind of thing (i.e. create a new column) using Spark, it means that you have to think immutable/distributed and re-write parts of your code, mostly the parts that are not purely thought of as transformations on a stream of data.
So let’s dive in.
This part is not that much different in Pandas and Spark, but you have to take into account the immutable character of your DataFrame. First let’s create two DataFrames one in Pandas pdf and one in Spark df:
In [17]: pdf = pd.DataFrame.from_items([('A', [1, 2, 3]), ('B', [4, 5, 6])])
In [18]: pdf.A
Out[18]:
0 1
1 2
2 3
Name: A, dtype: int64
In [19]: df = sqlCtx.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"])
In [20]: df
Out[20]: DataFrame[A: bigint, B: bigint]
In [21]: df.show()
+-+-+
|A|B|
+-+-+
|1|4|
|2|5|
|3|6|
+-+-+
Now in Spark SQL or Pandas you use the same syntax to refer to a column:
In [27]: df.A
Out[27]: Column
Out[27]: Column
In [28]: df['A']
Out[28]: Column
In [29]: pdf.A
Out[29]:
0 1
1 2
2 3
Name: A, dtype: int64
In [30]: pdf['A']
Out[30]:
0 1
1 2
2 3
Name: A, dtype: int64
In [32]: pdf
Out[32]:
A B C
0 1 4 0
1 2 5 0
2 3 6 0
AttributeError Traceback (most recent call last)
----> 1 df.withColumn('C', 0)
AttributeError: 'int' object has no attribute 'alias'
In [37]: df.withColumn('C', F.lit(0)).show()
+-+-+-+
|A|B|C|
+-+-+-+
|1|4|0|
|2|5|0|
|3|6|0|
+-+-+-+
In [39]: df.withColumn('C', df.A * 2)
Out[39]: DataFrame[A: bigint, B: bigint, C: bigint]
In [40]: df.withColumn('C', df.A * 2).show()
+-+-+-+
|A|B|C|
+-+-+-+
|1|4|2|
|2|5|4|
|3|6|6|
+-+-+-+
When you’re selecting columns, to create another projected DataFrame, you can also use expressions:
In [42]: df.select(df.B > 0)
Out[42]: DataFrame[(B > 0): boolean]
In [43]: df.select(df.B > 0).show()
+-------+
|(B > 0)|
+-------+
| true|
| true|
| true|
+-------+
Filtering is pretty much straightforward too, you can use the RDD-like filter