Spark Guide
Also available as:
PDF
loading table of contents...

Chapter 10. Using Spark with HDFS

Specifying Compression

To add a compression library to Spark, use the --jars option. As mentioned in "Adding Libraries to Spark," the following example adds the LZO compression library:

spark-submit --driver-memory 1G --executor-memory 1G --master yarn-client --jars /usr/hdp/2.3.0.0-2557/hadoop/lib/hadoop-lzo-0.6.0.2.3.0.0-2557.jar test_read_write.py

To specify compression in Spark-shell when writing to HDFS, use code similar to:

rdd.saveAsHadoopFile("/tmp/spark_compressed", 
"org.apache.hadoop.mapred.TextOutputFormat", 
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

For more information about supported compression algorithms, see Configuring HDFS Compression in the HDFS Reference Guide.

Accessing HDFS from PySpark: Setting HADOOP_CONF_DIR

If PySpark is accessing an HDFS file, HADOOP_CONF_DIR needs to be set in an environment variable. For example:

export HADOOP_CONF_DIR=/etc/hadoop/conf
[hrt_qa@ip-172-31-42-188 spark]$ pyspark
[hrt_qa@ip-172-31-42-188 spark]$ >>>lines = sc.textFile("hdfs://ip-172-31-42-188.ec2.internal:8020/tmp/PySparkTest/file-01")
.......

If HADOOP_CONF_DIR is not set properly, you might see the following error:

Error from secure cluster

2016-04-10 00:27:06,046|t1.machine|INFO|1580|140672245782272|MainThread|Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
2016-04-10 00:27:06,047|t1.machine|INFO|1580|140672245782272|MainThread|: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
2016-04-10 00:27:06,047|t1.machine|INFO|1580|140672245782272|MainThread|at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2016-04-10 00:27:06,047|t1.machine|INFO|1580|140672245782272|MainThread|at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
2016-04-10 00:27:06,048|t1.machine|INFO|1580|140672245782272|MainThread|at 
{code}