Although Python is a popular choice for data scientists, it is not straightforward to
    make a Python library available on a distributed PySpark cluster. To determine which
    dependencies are required on the cluster, you must understand that Spark code applications run
    in Spark executor processes distributed throughout the cluster. If the Python code you are
    running uses any third-party libraries, Spark executors require access to those libraries when
    they run on remote executors.
    This example demonstrates a way to run the following Python
      code (nltk_sample.py), that includes pure Python libraries (nltk), on a distributed
      PySpark cluster. 
    - 
        (Prerequisites)
        
          - 
            Make sure the Anaconda parcel has been distributed and activated on your cluster.
            
          
 
          - 
            Create a new project in Cloudera Data Science Workbench. In that project, create a
              new file called 
nltk_sample.py with the following sample
              script.
            nltk_sample.py
            
              # This code uses NLTK, a Python natural language processing library.
# NLTK is not installed with conda by default. 
# You can use 'import' within your Python UDFs, to use Python libraries.
# The goal here is to distribute NLTK with the conda environment.
import os
import sys
from pyspark.sql import SparkSession
 
spark = SparkSession.builder \
          .appName("spark-nltk") \
          .getOrCreate()
 
data = spark.sparkContext.textFile('1970-Nixon.txt')
 
def word_tokenize(x):
    import nltk
    return nltk.word_tokenize(x)
 
def pos_tag(x):
    import nltk
    return nltk.pos_tag([x])
 
words = data.flatMap(word_tokenize)
words.saveAsTextFile('nixon_tokens')
 
pos_word = words.map(pos_tag)
pos_word.saveAsTextFile('nixon_token_pos')
             
           
        
       - 
        Go to the project you created and launch a new PySpark session. 
      
 - 
        Click Terminal Access and run the following command to pack the
          Python environment into conda. 
        
          conda create -n nltk_env --copy -y -q python=2.7.11 nltk numpy
         
        The --copy option allows you to copy whole dependent packages into
          certain directory of a conda environment. If you want to add extra pip packages without
          conda, you should copy packages manually after using pip install. In
          Cloudera Data Science Workbench, pip will install packages into the
            ~/.local directory in a project.
        
          pip install some-awesome-package
cp -r ~/.local/lib ~/.conda/envs/nltk_env/
         
        Zip the conda environment for shipping on PySpark cluster.
        
          cd ~/.conda/envs
zip -r ../../nltk_env.zip nltk_env
         
       - 
        (Specific to NLTK) For this example, you can use NLTK data as input.
        
          cd ~/
source activate nltk_env
# download nltk data
(nltk_env)$ python -m nltk.downloader -d nltk_data all
(nltk_env)$ hdfs dfs -put nltk_data/corpora/state_union/1970-Nixon.txt ./
  
# archive nltk data for distribution
cd ~/nltk_data/tokenizers/
zip -r ../../tokenizers.zip *
cd ~/nltk_data/taggers/
zip -r ../../taggers.zip *
         
       - 
        Set 
spark-submit options in
          spark-defaults.conf.
        
          spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python
spark.yarn.appMasterEnv.NLTK_DATA=./
spark.executorEnv.NLTK_DATA=./
spark.yarn.dist.archives=nltk_env.zip#NLTK,tokenizers.zip#tokenizers,taggers.zip#taggers
         
        With these settings, PySpark unzips nltk_env.zip into the NLTK
          directory. NLTK_DATA is the environmental variable where NLTK data is
          stored.
       - 
        In Cloudera Data Science Workbench, set the 
PYSPARK_PYTHON environment
          variable to the newly-created environment. 
        
          - 
            To do this, navigate back to the Project Overview page and click .
          
 
          - 
            Set 
PYSPARK_PYTHON to ./NLTK/nltk_env/bin/python
              and click Add.
           
          - 
            Then click Save Environment.
          
 
        
       - 
        Launch a new PySpark session and run the 
nltk_sample.py script in the
          workbench. You can test whether the script ran successfully using the following
          command:
        
          !hdfs dfs -cat ./nixon_tokens/* | head -n 20
         
        
          Annual
Message
to
the
Congress
on
the
State
of
the
Union
.
January
22
,
1970
Mr.
Speaker
,
Mr.
  
! hdfs dfs -cat nixon_token_pos/* | head -n 20
[(u'Annual', 'JJ')]
[(u'Message', 'NN')]
[(u'to', 'TO')]
[(u'the', 'DT')]
[(u'Congress', 'NNP')]
[(u'on', 'IN')]
[(u'the', 'DT')]
[(u'State', 'NNP')]
[(u'of', 'IN')]
[(u'the', 'DT')]
[(u'Union', 'NN')]
[(u'.', '.')]
[(u'January', 'NNP')]
[(u'22', 'CD')]
[(u',', ',')]
[(u'1970', 'CD')]
[(u'Mr.', 'NNP')]
[(u'Speaker', 'NN')]
[(u',', ',')]
[(u'Mr.', 'NNP')]