Skip to main content

At Virgin Hyperloop One, we work on making Hyperloop a reality, so we can move passengers and cargo at airline speeds but at a fraction of the cost of air travel. In order to build a commercially viable system, we collect and analyze a large, diverse quantity of data, including Devloop Test Track runs, numerous test rigs, and various simulation, infrastructure and socio economic data. Most of our scripts handling that data are written using Python libraries with pandas as the main data processing tool that glues everything together. In this blog post, we want to share with you our experiences of scaling our data analytics using Koalas, achieving massive speedups with minor code changes.

As we continue to grow and build new stuff, so do our data processing needs. Due to the increasing scale and complexity of our data operations, our pandas-based Python scripts were too slow to meet our business needs. This led us to Spark, with the hopes of fast processing times and flexible data storage as well as on-demand scalability. We were, however, struggling with the "Spark switch" - we would have to make a lot of custom changes to migrate our pandas-based codebase to PySpark. We needed a solution that was not only much faster, but also would ideally not require rewriting code. These challenges drove us to research other options and we were very happy to discover that there exists an easy way to skip that tedious step: the Koalas package, recently open-sourced by Databricks.

As described in the Koalas Readme,

The Koalas project makes data scientists more productive when interacting with big data, by implementing the pandas DataFrame API on top of Apache Spark. (...) Be immediately productive with Spark, with no learning curve, if you are already familiar with pandas. Have a single codebase that works both with pandas (tests, smaller datasets) and with Spark (distributed datasets).

In this article I will try to show that this is (mostly) true and why Koalas is worth trying out. By making changes to less than 1% of our pandas lines, we were able to run our code with Koalas and Spark. We were able to reduce the execution times by more than 10x, from a few hours to just a few minutes, and since the environment is able to scale horizontally, we’re prepared for even more data.

Quick Start

Before installing Koalas, make sure that you have your Spark cluster set up and can use it with PySpark. Then, simply run:

pip install koalas

or, for conda users:

conda install koalas -c conda-forge

Refer to Koalas Readme for more details.

A quick sanity check after installation:

import databricks.koalas as ks
kdf = ks.DataFrame({'column1':[4.0, 8.0]}, {'column2':[1.0, 2.0]})
kdf

As you can see, Koalas can render pandas-like interactive tables. How convenient!

Example with basic operations

For the sake of this article, we generated some test data consisting of 4 columns and parameterized number of rows.

import pandas as pd
## generate 1M rows of test data
pdf = generate_pd_test_data( 1e6 )
pdf.head(3)
>>>     timestamp pod_id trip_id speed_mph
0 7.522523 pod_13 trip_6 79.340006
1 22.029855 pod_5 trip_22 65.202122
2 21.473178 pod_20 trip_10 669.901507

We'd like to assess some key descriptive analytics across all pod-trips, for example: What is the trip time per pod-trip?

Operations needed:

  1. Group by ['pod_id','trip id']
  2. For every trip, calculate the trip_time as last timestamp - first timestamp.
  3. Calculate distribution of the pod-trip times (mean, stddev)

The short & slow ( pandas ) way:

(snippet #1)

import pandas as pd
# take the grouped.max (last timestamp) and join with grouped.min (first timestamp)
gdf = pdf.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
# calculate the statistics on trip times
pd_result = gdf.describe()

The long & fast ( PySpark ) way:

(snippet #2)

import pyspark as spark
# import pandas df to spark (this line is not used for profiling)
sdf = spark.createDataFrame(pdf)
# sort by timestamp and groupby
sdf = sdf.sort(desc('timestamp'))
sdf = sdf.groupBy("pod_id", "trip_id").agg(F.max('timestamp').alias('timestamp_last'), F.min('timestamp').alias('timestamp_first'))
# add another column trip_time_sec as the difference between first and last
sdf = sdf.withColumn('trip_time_sec', sdf2['timestamp_last'] - sdf2['timestamp_first'])
sdf = sdf.withColumn('trip_time_hours', sdf3['trip_time_sec'] / 3600.0)
# calculate the statistics on trip times
sdf4.select(F.col('timestamp_last'),F.col('timestamp_first'),F.col('trip_time_sec'),F.col('trip_time_hours')).summary().toPandas()

The short & fast ( Koalas ) way:

(snippet #3)

import databricks.koalas as ks
# import pandas df to koalas (and so also spark) (this line is not used for profiling)
kdf = ks.from_pandas(pdf)
# the code below is the same as the pandas version
gdf = kdf.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
ks_result = gdf.describe().to_pandas()

Note that for the snippets #1 and #3, the code is exactly the same, and so the "Spark switch" is seamless! For most of the pandas scripts, you can even try to change the import pandas databricks.koalas as pd, and some scripts will run fine with minor adjustments, with some limitations explained below.

Results

All the snippets have been verified to return the same pod-trip-times results. The describe and summary methods for pandas and Spark are slightly different, as explained here but this should not affect performance too much.

Sample results:

Advanced Example: UDFs and complicated operations

We're now going to try to solve a more complex problem with the same dataframe, and see how pandas and Koalas implementations differ.

Goal: Analyze the average speed per pod-trip:

  1. Group by ['pod_id','trip id']
  2. For every pod-trip calculate the total distance travelled by finding the area below the velocity (time) chart (method explained here):
  3. Sort the grouped df by timestamp column.
  4. Calculate diffs of timestamps.
  5. Multiply the diffs with the speed - this will result in the distance traveled in that time diff.
  6. Sum the distance_travelled column - this will give us total distance travelled per pod-trip.
  7. Calculate the trip time as timestamp.last - timestamp.first (as in the previous paragraph).
  8. Calculate the average_speed as distance_travelled / trip time.
  9. Calculate distribution of the pod-trip times (mean, stddev).

We decided to implement this task using a custom apply function and UDF (user defined functions).

The pandas way:

(snippet #4)

import pandas as pd
def calc_distance_from_speed( gdf ):
 gdf = gdf.sort_values('timestamp')
 gdf['time_diff'] = gdf['timestamp'].diff()
 return pd.DataFrame({
  'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
  'travel_time_sec': [ gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0] ]
})
results = df.groupby(['pod_id','trip_id']).apply( calculate_distance_from_speed)
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe()

The PySpark way:

(snippet #5)

import databricks.koalas as ks
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pyspark.sql.functions as F
schema = StructType([
 StructField("pod_id", StringType()),
 StructField("trip_id", StringType()),
 StructField("distance_miles", DoubleType()),
 StructField("travel_time_sec", DoubleType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def calculate_distance_from_speed( gdf ):
 gdf = gdf.sort_values('timestamp')
 print(gdf)
 gdf['time_diff'] = gdf['timestamp'].diff()
 return pd.DataFrame({
  'pod_id':[gdf['pod_id'].iloc[0]],
  'trip_id':[gdf['trip_id'].iloc[0]],
  'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
  'travel_time_sec': [ gdf['timestamp'].iloc[-1]-gdf['timestamp'].iloc[0] ]
})
sdf = spark_df.groupby("pod_id","trip_id").apply(calculate_distance_from_speed)
sdf = sdf.withColumn('distance_km',F.col('distance_miles') * 1.609)
sdf = sdf.withColumn('avg_speed_mph',F.col('distance_miles')/ F.col('travel_time_sec') / 60.0)
sdf = sdf.withColumn('avg_speed_kph',F.col('avg_speed_mph') * 1.609)
sdf = sdf.orderBy(sdf.pod_id,sdf.trip_id)
sdf.summary().toPandas() # summary calculates almost the same results as describe

The Koalas way:

(snippet #6)

import databricks.koalas as ks
def calc_distance_from_speed_ks( gdf ) -> ks.DataFrame[ str, str, float , float]:
 gdf = gdf.sort_values('timestamp')
 gdf['meanspeed'] = (gdf['timestamp'].diff()*gdf['speed_mph']).sum()
 gdf['triptime'] = (gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0])
 return gdf[['pod_id','trip_id','meanspeed','triptime']].iloc[0:1]

kdf = ks.from_pandas(df)
results = kdf.groupby(['pod_id','trip_id']).apply( calculate_distance_from_speed_ks)
# due to current limitations of the package, groupby.apply() returns c0 .. c3 column names
results.columns = ['pod_id', 'trip_id', 'distance_miles', 'travel_time_sec']
# spark groupby does not set the groupby cols as index and does not sort them
results = results.set_index(['pod_id','trip_id']).sort_index()
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe()

Koalas’ implementation of apply is based on PySpark's pandas_udf which requires schema information, and this is why the definition of the function has to also define the type hint. The authors of the package introduced new custom type hints, ks.DataFrame and ks.Series. Unfortunately, the current implementation of the apply method is quite cumbersome, and it took a bit of an effort to arrive at the same result (column names change, groupby keys not returned). However, all the behaviors are appropriately explained in the package documentation.

Performance

To assess the performance of Koalas, we profiled the code snippets for different number of rows.

The profiling experiment was done on Databricks platform, using the following cluster configurations:

  • Spark driver node (also used to execute the pandas scripts): 8 CPU cores, 61GB RAM.
  • 15 Spark worker nodes: 4CPU cores, 30.5GB RAM each (sum: 60CPUs / 457.5GB ).

Every experiment was repeated 10 times, and the clips shown below are indicating the min and max times for the executions.

Basic ops

When the data is small, the initialization operations and data transfer are huge in comparison to the computations, so pandas is much faster (marker a). For larger amounts of data, pandas' processing times exceed distributed solutions (marker b). We can then observe some performance hits for Koalas, but it gets closer to PySpark as data increases (marker c).

UDFs

For the UDF profiling, as specified in PySpark and Koalas documentation, the performance decreases dramatically. This is why we needed to decrease the number of rows we tested with by 100x vs the basic ops case. For each test case, Koalas and PySpark show a striking similarity in performance, indicating a consistent underlying implementation. During experimentation, we discovered that there exists a much faster way of executing that set of operations using PySpark windows functionality, however this is not currently implemented in Koalas so we decided to only compare UDF versions.

Discussion

Koalas seems to be the right choice if you want to make your pandas code immediately scalable and executable on bigger datasets that are not possible to process on a single node. After the quick swap to Koalas, just by scaling your Spark cluster, you can allow bigger datasets and improve the processing times significantly. Your performance should be comparable (but 5 to 50% lower, depending on the dataset size and the cluster) with PySpark’s.

On the other hand, the Koalas API layer does cause a visible performance hit, especially in comparison to the native Spark. At the end of the day, if computational performance is your key priority, you should consider switching from Python to Scala.

Limitations and differences

During your first few hours with Koalas, you might wonder, "Why is this not implemented?!" Currently, the package is still under development and is missing some pandas API functionality, but much of it should be implemented in the next few months (for example groupby.diff() or kdf.rename()).

Also from my experience as a contributor to the project, some of the features are either too complicated to implement with Spark API or were skipped due to a significant performance hit. For example, DataFrame.values requires materializing the entire working set in a single node’s memory, and so is suboptimal and sometimes not even possible. Instead if you need to retrieve some final results on the driver, you can call DataFrame.to_pandas() or DataFrame.to_numpy().

Another important thing to mention is that Koalas’ execution chain is different from pandas’: when executing the operations on the dataframe, they are put on a queue of operations but not executed. Only when the results are needed, e.g. when calling kdf.head() or kdf.to_pandas() the operations will be executed. That could be misleading for somebody who never worked with Spark, since pandas does everything line-by-line.

Conclusions

Koalas helped us to reduce the burden to “Spark-ify” our pandas code. If you're also struggling with scaling your pandas code, you should try it too! If you are desperately missing any behavior or found inconsistencies with pandas, please open an issue so that as a community we can ensure that the package is actively and continually improved. Also, feel free to contribute!

Resources

  1. Koalas github: https://github.com/databricks/koalas
  2. Koalas documentation: https://koalas.readthedocs.io
  3. Code snippets from this article: https://gist.github.com/patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc .
Try Databricks for free

Related posts

See all Solutions posts