Using the manifest committer
Define a factory for the ABFS or GCS schema, then bind to the manifest committer in Spark.
Using the committer
Define a factory for the ABFS schema in
mapreduce.outputcommitter.factory.scheme.abfs
or
mapreduce.outputcommitter.factory.scheme.gs
for GCS.
Some matching spark configuration changes, especially for parquet binding, will
be required. These can be done in core-site.xml
, if it is not defined in
the mapred-default.xml
JAR.
<property>
<name>mapreduce.outputcommitter.factory.scheme.abfs</name>
<value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>
<property>
<name>mapreduce.outputcommitter.factory.scheme.gs</name>
<value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>
Binding to the manifest committer in Spark
In Apache Spark, set the configuration either with command line options (after the
'--conf') or by using the spark-defaults.conf
file. The following is an
example of using spark-defaults.conf
, and also including the configuration
for Parquet with a subclass of the parquet committer which uses the factory mechansim
internally.
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
Using the Cloudstore committerinfo
command to probe committer
bindings
The hadoop committer settings can be validated in a recent build of cloudstore and its committerinfo
command. This command
instantiates a committer for that path through the same factory mechanism as MR and spark
jobs use, then prints its toString
value.
hadoop jar cloudstore-1.0.jar committerinfo abfs://testing@ukwest.dfs.core.windows.net/
2021-09-16 19:42:59,731 [main] INFO commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer
Committer factory for path abfs://testing@ukwest.dfs.core.windows.net/ is
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7
(classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory)
2021-09-16 19:43:00,897 [main] INFO manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with
JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://testing@ukwest.dfs.core.windows.net/
Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter:
ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://testing@ukwest.dfs.core.windows.net/,
role='task committer',
taskAttemptDir=abfs://testing@ukwest.dfs.core.windows.net/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1,
createJobMarker=true,
jobUniqueId='job__0000',
jobUniqueIdSource='JobID',
jobAttemptNumber=0,
jobAttemptId='job__0000_0',
taskId='task__0000_r_000000',
taskAttemptId='attempt__0000_r_000000_1'},
iostatistics=counters=();
gauges=();
minimums=();
maximums=();
means=();
}
Verifying that the committer was used
The new committer will write a JSON summary of the operation, including
statistics, in the _SUCCESS
file.
If this file exists and is zero bytes long: the classic
FileOutputCommitter
was used.
If this file exists and is greater than zero bytes long, either the manifest committer was used, or in the case of S3A filesystems, one of the S3A committers. They all use the same JSON format.
Configuration options
Option | Meaning | Default value |
---|---|---|
mapreduce.manifest.committer.delete.target.files |
Delete target files? | false |
mapreduce.manifest.committer.io.threads |
Thread count for parallel operations | 64 |
mapreduce.manifest.committer.summary.report.directory |
Directory to save reports | "" |
mapreduce.manifest.committer.cleanup.parallel.delete |
Delete temporary directories in parallel | true |
mapreduce.fileoutputcommitter.cleanup.skipped |
Skip cleanup of _temporary directory |
false |
mapreduce.fileoutputcommitter.cleanup-failures.ignored |
Ignore errors during cleanup | false |
mapreduce.fileoutputcommitter.marksuccessfuljobs |
Create a _SUCCESS marker file on successful completion (and
delete any existing one in job setup) |
true |
Scaling jobs mapreduce.manifest.committer.io.threads
The core reason that the manifest committer is faster than the classic
FileOutputCommitter
is that it tries to parallelize as much file IO as it
can during job commit, specifically:
- Task manifest loading
- Deletion of files where directories will be created
- Directory creation
- File-by-file renaming
- Deletion of task attempt directories in job cleanup
These operations are all performed in the same thread pool, whose size is set in
the option mapreduce.manifest.committer.io.threads
.
Larger values may be used.
XML:
<property>
<name>mapreduce.manifest.committer.io.threads</name>
<value>200</value>
</property>
spark-defaults.conf
:
spark.hadoop.mapreduce.manifest.committer.io.threads 200
A larger value than that of the number of cores allocated to the MapReduce AM or Spark Driver does not directly overload the CPUs, as the threads are normally waiting for (slow) IO against the object store/filesystem to complete.
Caveats:
- In Spark, multiple jobs may be committed in the same process, each of which will create their own thread pool during job commit or cleanup.
- Azure rate throttling may be triggered if too many IO requests are made against the
store. The rate throttling option
mapreduce.manifest.committer.io.rate
can help avoid this.
Optional: deleting target files in Job Commit
The classic FileOutputCommitter
deletes files at the
destination paths before renaming the job's files into place.
This is optional in the manifest committers, set in the option
mapreduce.manifest.committer.delete.target.files
with a default value of
false
. This increases performance and is safe to use when all files
created by a job have unique filenames.
Apache Spark does generate unique filenames for ORC and Parquet since SPARK-8406 Adding UUID to output file name to avoid accidental overwriting.
Avoiding checks for/deleting target files saves one delete call per file being committed, so can save a significant amount of store IO.
When appending to existing tables, using formats other than ORC and Parquet, unless confident that unique identifiers are added to each filename, enable deletion of the target files.
spark.hadoop.mapreduce.manifest.committer.delete.target.files true