Chapter 10. Using Spark with HDFS
Specifying Compression
To add a compression library to Spark, use the --jars
option. The following
example adds the LZO compression library:
spark-submit --driver-memory 1G --executor-memory 1G --master yarn-client --jars /usr/hdp/2.3.0.0-2557/hadoop/lib/hadoop-lzo-0.6.0.2.3.0.0-2557.jar test_read_write.py
To specify compression in Spark-shell when writing to HDFS, use code similar to:
rdd.saveAsHadoopFile("/tmp/spark_compressed", "org.apache.hadoop.mapred.TextOutputFormat", compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
For more information about supported compression algorithms, see Configuring HDFS Compression in the HDFS Reference Guide.
Accessing HDFS from PySpark: Setting HADOOP_CONF_DIR
If PySpark is accessing an HDFS file, HADOOP_CONF_DIR
needs to be set in an
environment variable. For example:
export HADOOP_CONF_DIR=/etc/hadoop/conf [hrt_qa@ip-172-31-42-188 spark]$ pyspark [hrt_qa@ip-172-31-42-188 spark]$ >>>lines = sc.textFile("hdfs://ip-172-31-42-188.ec2.internal:8020/tmp/PySparkTest/file-01") .......
If HADOOP_CONF_DIR
is not set properly, you might see the following
error:
Error from secure
cluster
2016-01-20 00:27:06,046|t1.machine|INFO|1580|140672245782272|MainThread|Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 2016-01-20 00:27:06,047|t1.machine|INFO|1580|140672245782272|MainThread|: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] 2016-01-20 00:27:06,047|t1.machine|INFO|1580|140672245782272|MainThread|at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 2016-01-20 00:27:06,047|t1.machine|INFO|1580|140672245782272|MainThread|at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 2016-01-20 00:27:06,048|t1.machine|INFO|1580|140672245782272|MainThread|at {code}