Skip to main content

Controlling the environment of an application is often challenging in a distributed computing environment - it is difficult to ensure all nodes have the desired environment to execute, it may be tricky to know where the user’s code is actually running, and so on.

Apache Spark™ provides several standard ways to manage dependencies across the nodes in a cluster via script options such as --jars, --packages, and configurations such as spark.jars.* to make users seamlessly manage the dependencies in their clusters.

In contrast, PySpark users often ask how to do it with Python dependencies - there have been multiple issues filed such as SPARK-13587, SPARK-16367, SPARK-20001 and SPARK-25433. One simple example that illustrates the dependency management scenario is when users run pandas UDFs.

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('double')
def pandas_plus_one(v: pd.Series) -> pd.Series:
    return v + 1

spark.range(10).select(pandas_plus_one("id")).show()

If they do not have required dependencies installed in all other nodes, it fails and complains that PyArrow and pandas have to be installed.

Traceback (most recent call last):
  ...
ModuleNotFoundError: No module named 'pyarrow'

One straightforward method is to use script options such as --py-files or the spark.submit.pyFilesconfiguration, but this functionality cannot cover many cases, such as installing wheel files or when the Python libraries are dependent on C and C++ libraries such as pyarrow and NumPy.

This blog post introduces how to control Python dependencies in Apache Spark comprehensively. Most of the content will be also documented in the upcoming Apache Spark 3.1 as part of Project Zen. Please refer to An Update on Project Zen: Improving Apache Spark for Python Users for more details.

Using Conda

Conda is one of the most widely-used Python package management systems. PySpark users can directly use a Conda environment to ship their third-party Python packages by leveraging conda-pack which is a command line tool creating relocatable Conda environments. It is supported in all types of clusters in the upcoming Apache Spark 3.1. In Apache Spark 3.0 or lower versions, it can be used only with YARN.

The example below creates a Conda environment to use on both the driver and executor and packs it into an archive file. This archive file captures the Conda environment for Python and stores both Python interpreter and all its relevant dependencies.

conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz

After that, you can ship it together with scripts or in the code by using the --archives option or spark.archives configuration (spark.yarn.dist.archives in YARN). It automatically unpacks the archive on executors.

In the case of a spark-submit script, you can use it as follows:

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py

Note that PYSPARK_DRIVER_PYTHON above should not be set for cluster modes in YARN or Kubernetes.

For a pyspark shell:

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment

If you’re on a regular Python shell or notebook, you can try it as shown below:

import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_conda_env.tar.gz#environment").getOrCreate()

Using Virtualenv

Virtualenv is a Python tool to create isolated Python environments. Since Python 3.3, a subset of its features has been integrated into Python as a standard library under the venv module. In the upcoming Apache Spark 3.1, PySpark users can use virtualenv to manage Python dependencies in their clusters by using venv-pack in a similar way as conda-pack. In the case of Apache Spark 3.0 and lower versions, it can be used only with YARN.

A virtual environment to use on both driver and executor can be created as demonstrated below. It packs the current virtual environment to an archive file, and it contains both Python interpreter and the dependencies. However, it requires all nodes in a cluster to have the same Python interpreter installed because venv-pack packs Python interpreter as a symbolic link.

python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz

You can directly pass/unpack the archive file and enable the environment on executors by leveraging the --archives option or spark.archives configuration (spark.yarn.dist.archives in YARN).

For spark-submit, you can use it by running the command as follows. Also, notice that PYSPARK_DRIVER_PYTHON has to be unset in Kubernetes or YARN cluster modes.

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py

In the case of a pyspark shell:

export PYSPARK_DRIVER_PYTHON=python 
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment 

For regular Python shells or notebooks:

import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_venv.tar.gz#environment").getOrCreate()

Using PEX

PySpark can also use PEX to ship the Python packages together. PEX is a tool that creates a self-contained Python environment. This is similar to Conda or virtualenv, but a .pex file is executable by itself.

The following example creates a .pex file for the driver and executor to use. The file contains the Python dependencies specified with the pex command.

pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex

This file behaves similarly with a regular Python interpreter.

./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5

However, .pex file does not include a Python interpreter itself under the hood so all nodes in a cluster should have the same Python interpreter installed.

In order to transfer and use the .pex file in a cluster, you should ship it via the spark.files configuration (spark.yarn.dist.files in YARN) or --files option because they are regular files instead of directories or archive files.

For application submission, you run the commands as shown below. PYSPARK_DRIVER_PYTHON should not be set for cluster modes in YARN or Kubernetes.

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py

For the interactive pyspark shell, the commands are almost the same:

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex

For regular Python shells or notebooks:

import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
    "spark.files",  # 'spark.yarn.dist.files' in YARN.
    "pyspark_pex_env.pex").getOrCreate()

Conclusion

In Apache Spark, Conda, virtualenv and PEX can be leveraged to ship and manage Python dependencies.

  • Conda: this is one of the most commonly used package management systems. In Apache Spark 3.0 and lower versions, Conda can be supported with YARN cluster only, and it works with all other cluster types in the upcoming Apache Spark 3.1.
  • Virtualenv: users can do it without an extra installation because it is a built-in library in Python but it should have the same Python installed in all nodes whereas Conda does not require it. Virtualenv works only with YARN cluster in Apache Spark 3.0 and lower versions, and all other cluster types support it in the upcoming Apache Spark 3.1.
  • PEX: it can be used with any type of cluster in any version of Apache Spark although it is arguably less widely used and requires to have the same Python installed in all nodes whereas Conda does not require it.

These package management systems can handle any Python packages that --py-files or spark.submit.pyFiles configuration cannot cover. Users can seamlessly ship not only pandas and PyArrow but also other dependencies to interact together when they work with PySpark.

In the case of Databricks notebooks, we not only provide an elegant mechanism by having a well-designed UI but also allow users to directly use pip and Conda in order to address this Python dependency management. Try out these today for free on Databricks.

Try Databricks for free

Related posts

See all Engineering Blog posts