Enabling the Directory Committer in Spark
Spark has its own internal output committer which needs to be switched to the new
committer mechanism, and, when using Apache Parquet-formatted output, Spark expects the
committer Parquet to be a subclass of ParquetOutputCommitter
.
As a result three lines need to be added to spark-defaults.conf to switch to the new committers:
spark.hadoop.fs.s3a.committer.name directory spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
This is all that is needed. Note that the S3A Committer is only used for Spark SQL, Datasets and Dataframes; some simple examples such as the wordcount examples do not use these APIs, so do use the new committers.
Here is an example pyspark application using the committer. There is no difference between this and other applications
from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext, SparkSession from pyspark.sql.functions import * sconf = SparkConf() sconf.set("spark.hadoop.fs.s3a.committer.name", "directory") sconf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol") sconf.set("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter") sc = SparkContext(appName="s3acommitter", conf = sconf) spark = SparkSession(sc) sourceDF = spark.range(0, 10000) datasets = "s3a://guarded-bucket/datasets/" sourceDF.write.format("orc").save(datasets + "orc") sourceDF.write.format("parquet").save(datasets + "parquet") sourceDF.write.format("csv").save(datasets + "csv") sc.stop()