Using Spark 2 from Python
Cloudera Data Science Workbench supports using Spark 2 from Python via PySpark.
Setting Up a PySpark Project
PySpark Environment Variables
The default Cloudera Data Science Workbench engine currently includes Python 2.7.11 and Python 3.6.1. To use PySpark with lambda functions that run within the CDH cluster, the Spark executors must have access to a matching version of Python. For many common operating systems, the default system Python will not match the minor release of Python included in Data Science Workbench.
To ensure that the Python versions match, Python can either be installed on every CDH node or made available per job run using Spark’s ability to distribute dependencies. Given the size of a typical isolated Python environment and the desire to avoid repeated uploads from gateway hosts, Cloudera recommends installing Python 2.7 and 3.6 on the cluster if you are using PySpark with lambda functions.
You can install Python 2.7 and 3.6 on the cluster using any method and set the corresponding PYSPARK_PYTHON environment variable in your project. Cloudera Data Science Workbench 1.3 (and higher) include a separate environment variable for Python 3 sessions called PYSPARK3_PYTHON. Python 2 sessions continue to use the default PYSPARK_PYTHON variable. This will allow you to run Python 2 and Python 3 sessions in parallel without either variable being overridden by the other.
Creating and Running a PySpark Project
To get started quickly, use the PySpark template project to create a new project. For instructions, see Creating a Project.
To run a PySpark project, navigate to the project's overview page, open the workbench console and launch a Python session. For detailed instructions, see Using the Workbench.
Testing a PySpark Project in Spark Local Mode
Spark's local mode is often useful for testing and debugging purposes. Use the following sample code snippet to start a PySpark session in local mode.
from pyspark.sql import SparkSession spark = SparkSession\ .builder \ .appName("LocalSparkSession") \ .master("local") \ .getOrCreate()
For more details, refer the Spark documentation: Running Spark Applications.
Example: Montecarlo Estimation
Within the template PySpark project, pi.py is a classic example that calculates Pi using the Montecarlo Estimation.
What follows is the full, annotated code sample that can be saved to the pi.py file.
# # Estimating $\pi$ # # This PySpark example shows you how to estimate $\pi$ in parallel # using Monte Carlo integration. from __future__ import print_function import sys from random import random from operator import add # Connect to Spark by creating a Spark session from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 # To access the associated SparkContext count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
Example: Locating and Adding JARs to Spark 2 Configuration
This example shows how to discover the location of JAR files installed with Spark 2, and add them to the Spark 2 configuration.
# # Using Avro data # # This example shows how to use a JAR file on the local filesystem on # Spark on Yarn. from __future__ import print_function import os,sys import os.path from functools import reduce from pyspark.sql import SparkSession from pyspark.files import SparkFiles # Add the data file to HDFS for consumption by the Spark executors. !hdfs dfs -put resources/users.avro /tmp # Find the example JARs provided by the Spark parcel. This parcel # is available on both the driver, which runs in Cloudera Data Science Workbench, and the # executors, which run on Yarn. exampleDir = os.path.join(os.environ["SPARK_HOME"], "examples/jars") exampleJars = [os.path.join(exampleDir, x) for x in os.listdir(exampleDir)] # Add the Spark JARs to the Spark configuration to make them available for use. spark = SparkSession\ .builder\ .config("spark.jars", ",".join(exampleJars))\ .appName("AvroKeyInputFormat")\ .getOrCreate() sc = spark.sparkContext # Read the schema. schema = open("resources/user.avsc").read() conf = {"avro.schema.input.key": schema } avro_rdd = sc.newAPIHadoopFile( "/tmp/users.avro", # This is an HDFS path! "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", "org.apache.hadoop.io.NullWritable", keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter", conf=conf) output = avro_rdd.map(lambda x: x[0]).collect() for k in output: print(k) spark.stop()
Example: Distributing Dependencies on a PySpark Cluster
Although Python is a popular choice for data scientists, it is not straightforward to make a Python library available on a distributed PySpark cluster. To determine which dependencies are required on the cluster, you must understand that Spark code applications run in Spark executor processes distributed throughout the cluster. If the Python code you are running uses any third-party libraries, Spark executors require access to those libraries when they run on remote executors.
This example demonstrates a way to run the following Python code (nltk_sample.py), that includes pure Python libraries (nltk), on a distributed PySpark cluster.
- (Prerequisites)
- Make sure the Anaconda parcel has been distributed and activated on your cluster.
- Create a new project in Cloudera Data Science Workbench. In that project, create a new file called nltk_sample.py with the following sample script.
nltk_sample.py
# This code uses NLTK, a Python natural language processing library. # NLTK is not installed with conda by default. # You can use 'import' within your Python UDFs, to use Python libraries. # The goal here is to distribute NLTK with the conda environment. import os import sys from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("spark-nltk") \ .getOrCreate() data = spark.sparkContext.textFile('1970-Nixon.txt') def word_tokenize(x): import nltk return nltk.word_tokenize(x) def pos_tag(x): import nltk return nltk.pos_tag([x]) words = data.flatMap(word_tokenize) words.saveAsTextFile('nixon_tokens') pos_word = words.map(pos_tag) pos_word.saveAsTextFile('nixon_token_pos')
- Go to the project you created and launch a new PySpark session.
- Click Terminal Access and run the following command to pack the Python environment into conda.
conda create -n nltk_env --copy -y -q python=2.7.11 nltk numpy
The --copy option allows you to copy whole dependent packages into certain directory of a conda environment. If you want to add extra pip packages without conda, you should copy packages manually after using pip install. In Cloudera Data Science Workbench, pip will install packages into the ~/.local directory in a project.pip install some-awesome-package cp -r ~/.local/lib ~/.conda/envs/nltk_env/
Zip the conda environment for shipping on PySpark cluster.cd ~/.conda/envs zip -r ../../nltk_env.zip nltk_env
- (Specific to NLTK) For this example, you can use NLTK data as input.
cd ~/ source activate nltk_env # download nltk data (nltk_env)$ python -m nltk.downloader -d nltk_data all (nltk_env)$ hdfs dfs -put nltk_data/corpora/state_union/1970-Nixon.txt ./ # archive nltk data for distribution cd ~/nltk_data/tokenizers/ zip -r ../../tokenizers.zip * cd ~/nltk_data/taggers/ zip -r ../../taggers.zip *
- Set spark-submit options in spark-defaults.conf.
spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark.yarn.appMasterEnv.NLTK_DATA=./ spark.executorEnv.NLTK_DATA=./ spark.yarn.dist.archives=nltk_env.zip#NLTK,tokenizers.zip#tokenizers,taggers.zip#taggers
With these settings, PySpark unzips nltk_env.zip into the NLTK directory. NLTK_DATA is the environmental variable where NLTK data is stored.
- In Cloudera Data Science Workbench, set the PYSPARK_PYTHON environment variable to the newly-created environment. To do this, navigate back to the Project Overview page and click Add. Then click Save Environment. . Set PYSPARK_PYTHON to ./NLTK/nltk_env/bin/python and click
- Launch a new PySpark session and run the nltk_sample.py script in the workbench. You can test whether the script ran successfully using the following
command:
!hdfs dfs -cat ./nixon_tokens/* | head -n 20
Annual Message to the Congress on the State of the Union . January 22 , 1970 Mr. Speaker , Mr. ! hdfs dfs -cat nixon_token_pos/* | head -n 20 [(u'Annual', 'JJ')] [(u'Message', 'NN')] [(u'to', 'TO')] [(u'the', 'DT')] [(u'Congress', 'NNP')] [(u'on', 'IN')] [(u'the', 'DT')] [(u'State', 'NNP')] [(u'of', 'IN')] [(u'the', 'DT')] [(u'Union', 'NN')] [(u'.', '.')] [(u'January', 'NNP')] [(u'22', 'CD')] [(u',', ',')] [(u'1970', 'CD')] [(u'Mr.', 'NNP')] [(u'Speaker', 'NN')] [(u',', ',')] [(u'Mr.', 'NNP')]