Running Spark Python applications
Accessing Spark with Java and Scala offers many advantages: platform independence by running inside the JVM, self-contained packaging of code and its dependencies into JAR files, and higher performance because Spark itself runs in the JVM. You lose these advantages when using the Spark Python API.
Managing dependencies and making them available for Python jobs on a cluster can be difficult. To determine which dependencies are required on the cluster, you must understand that Spark code applications run in Spark executor processes distributed throughout the cluster. If the Python transformations you define use any third-party libraries, such as NumPy or nltk, Spark executors require access to those libraries when they run on remote executors.
Setting the Python Path
After the Python packages you want to use are in a consistent location on your cluster, set the appropriate environment variables to the path to your Python executables as follows:
-
Specify the Python binary to be used by the Spark driver and executors
by setting the
PYSPARK_PYTHON
environment variable inspark-env.sh
. You can also override the driver Python binary path individually using thePYSPARK_DRIVER_PYTHON
environment variable. These settings apply regardless of whether you are using yarn-client or yarn-cluster mode.Make sure to set the variables using the
export
statement. For example:export PYSPARK_PYTHON=${PYSPARK_PYTHON:-<path_to_python_executable>}
This statement uses shell parameter expansion to set the
PYSPARK_PYTHON
environment variable to <path_to_python_executable> if it is not set to something else already. If it is already set, it preserves the existing value.Here are some example Python binary paths:
-
Anaconda parcel:
/opt/cloudera/parcels/Anaconda/bin/python
-
Virtual environment:
/path/to/virtualenv/bin/python
-
Anaconda parcel:
-
If you are using yarn-cluster mode, in addition to the above, also set
spark.yarn.appMasterEnv.PYSPARK_PYTHON
andspark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON
in spark-defaults.conf (using the safety valve) to the same paths.
In Cloudera Manager, set environment variables in
spark-env.sh
and spark-defaults.conf
as follows:
Minimum Required Role: Configurator (also provided by Cluster Administrator, Full Administrator)
- Go to the Spark service.
- Click the Configuration tab.
- Search for Spark Service Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh.
-
Add the
spark-env.sh
variables to the property. - Search for Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-defaults.conf.
-
Add the
spark-defaults.conf
variables to the property. - Enter a Reason for change, and then click Save Changes to commit the changes.
- Restart the service.
- Deploy the client configuration.
Self-Contained Dependencies
In a common situation, a custom Python package contains functionality
you want to apply to each element of an RDD. You can use a
map()
function call to make sure that each Spark
executor imports the required package, before calling any of the
functions inside that package. The following shows a simple example:
def import_my_special_package(x):
import my.special.package
return x
int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x))
int_rdd.collect()
You create a simple RDD of four elements and call it
int_rdd
. Then you apply the function
import_my_special_package
to every element of the
int_rdd
. This function imports
my.special.package
and then returns the original
argument passed to it. Calling this function as part of a
map()
operation ensures that each Spark executor
imports my.special.package
when needed.
If you only need a single file inside
my.special.package
, you can direct Spark to make this
available to all executors by using the --py-files
option in your spark-submit
command and specifying the
local path to the file. You can also specify this programmatically by
using the sc.addPyFiles()
function. If you use
functionality from a package that spans multiple files, you can
make
an egg for the package, because the --py-files
flag also accepts a path to an egg file.
If you have a self-contained dependency, you can make the required Python dependency available to your executors in two ways:
-
If you depend on only a single file, you can use the
--py-files
command-line option, or programmatically add the file to theSparkContext
withsc.addPyFiles(path)
and specify the local path to that Python file. -
If you have a dependency on a self-contained module (a module with no
other dependencies), you can create an egg or zip file of that module
and use either the
--py-files
command-line option or programmatically add the module to theSparkContext
withsc.addPyFiles(path)
and specify the local path to that egg or zip file.
Complex Dependencies
Some operations rely on complex packages that also have many
dependencies. Although such a package is too complex to distribute as a
*.py
file, you can create an egg for it and all of its
dependencies, and send the egg file to executors using the
--py-files
option.
Limitations of Distributing Egg Files on Heterogeneous Clusters
If you are running a heterogeneous cluster, with machines of different CPU architectures, sending egg files is impractical because packages that contain native code must be compiled for a single specific CPU architecture. Therefore, distributing an egg for complex, compiled packages like NumPy, SciPy, and pandas often fails. Instead of distributing egg files, install the required Python packages on each host of the cluster and specify the path to the Python binaries for the worker hosts to use.
Installing and Maintaining Python Environments
Installing and maintaining Python environments can be complex but allows you to use the full Python package ecosystem. Ideally, a sysadmin installs the Anaconda distribution or sets up a virtual environment on every host of your cluster with your required dependencies.
If you are using Cloudera Manager, you can deploy the Anaconda distribution as a parcel as follows:
Minimum Required Role: Cluster Administrator (also provided by Full Administrator)
- Add the following URL https://repo.anaconda.com/pkgs/misc/parcels/ to the Remote Parcel Repository URLs as described in "Parcel Configuration Settings."
- Download, distribute, and activate the parcel as described in "Managing Parcels."
Anaconda is installed in parcel
directory/Anaconda
, where parcel
directory is /opt/cloudera/parcels
by
default, but can be changed in parcel configuration settings. The
Anaconda parcel is supported by
Continuum
Analytics.
If you are not using Cloudera Manager, you can set up a virtual
environment on your cluster by running commands on each host using
Cluster
SSH,
Parallel
SSH, or
Fabric.
Assuming each host has Python and pip
installed, use
the following commands to set up the standard data stack (NumPy, SciPy,
scikit-learn, and pandas) in a virtual environment on a RHEL
6-compatible system:
# Install python-devel:
yum install python-devel
# Install non-Python dependencies required by SciPy that are not installed by default:
yum install atlas atlas-devel lapack-devel blas-devel
# install virtualenv:
pip install virtualenv
# create a new virtualenv:
virtualenv mynewenv
# activate the virtualenv:
source mynewenv/bin/activate
# install packages in mynewenv:
pip install numpy
pip install scipy
pip install scikit-learn
pip install pandas