Using the manifest committer

Define a factory for the ABFS or GCS schema, then bind to the manifest committer in Spark.

Using the committer

Define a factory for the ABFS schema in mapreduce.outputcommitter.factory.scheme.abfs or mapreduce.outputcommitter.factory.scheme.gs for GCS.

Some matching spark configuration changes, especially for parquet binding, will be required. These can be done in core-site.xml, if it is not defined in the mapred-default.xml JAR.


<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>
<property>
  <name>mapreduce.outputcommitter.factory.scheme.gs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

Binding to the manifest committer in Spark

In Apache Spark, set the configuration either with command line options (after the '--conf') or by using the spark-defaults.conf file. The following is an example of using spark-defaults.conf, and also including the configuration for Parquet with a subclass of the parquet committer which uses the factory mechansim internally.

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

Using the Cloudstore committerinfo command to probe committer bindings

The hadoop committer settings can be validated in a recent build of cloudstore and its committerinfo command. This command instantiates a committer for that path through the same factory mechanism as MR and spark jobs use, then prints its toString value.

hadoop jar cloudstore-1.0.jar committerinfo abfs://testing@ukwest.dfs.core.windows.net/

2021-09-16 19:42:59,731 [main] INFO  commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer
Committer factory for path abfs://testing@ukwest.dfs.core.windows.net/ is
 org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7
  (classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory)
2021-09-16 19:43:00,897 [main] INFO  manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with
   JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://testing@ukwest.dfs.core.windows.net/
Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter:
 ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://testing@ukwest.dfs.core.windows.net/,
   role='task committer',
   taskAttemptDir=abfs://testing@ukwest.dfs.core.windows.net/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1,
   createJobMarker=true,
   jobUniqueId='job__0000',
   jobUniqueIdSource='JobID',
   jobAttemptNumber=0,
   jobAttemptId='job__0000_0',
   taskId='task__0000_r_000000',
   taskAttemptId='attempt__0000_r_000000_1'},
   iostatistics=counters=();

gauges=();

minimums=();

maximums=();

means=();
}

Verifying that the committer was used

The new committer will write a JSON summary of the operation, including statistics, in the _SUCCESS file.

If this file exists and is zero bytes long: the classic FileOutputCommitter was used.

If this file exists and is greater than zero bytes long, either the manifest committer was used, or in the case of S3A filesystems, one of the S3A committers. They all use the same JSON format.

Configuration options

Option Meaning Default value
mapreduce.manifest.committer.delete.target.files Delete target files? false
mapreduce.manifest.committer.io.threads Thread count for parallel operations 64
mapreduce.manifest.committer.summary.report.directory Directory to save reports ""
mapreduce.manifest.committer.cleanup.parallel.delete Delete temporary directories in parallel true
mapreduce.fileoutputcommitter.cleanup.skipped Skip cleanup of _temporary directory false
mapreduce.fileoutputcommitter.cleanup-failures.ignored Ignore errors during cleanup false
mapreduce.fileoutputcommitter.marksuccessfuljobs Create a _SUCCESS marker file on successful completion (and delete any existing one in job setup) true

Scaling jobs mapreduce.manifest.committer.io.threads

The core reason that the manifest committer is faster than the classic FileOutputCommitter is that it tries to parallelize as much file IO as it can during job commit, specifically:

  • Task manifest loading
  • Deletion of files where directories will be created
  • Directory creation
  • File-by-file renaming
  • Deletion of task attempt directories in job cleanup

These operations are all performed in the same thread pool, whose size is set in the option mapreduce.manifest.committer.io.threads.

Larger values may be used.

XML:


<property>
  <name>mapreduce.manifest.committer.io.threads</name>
  <value>200</value>
</property>

spark-defaults.conf:

spark.hadoop.mapreduce.manifest.committer.io.threads 200

A larger value than that of the number of cores allocated to the MapReduce AM or Spark Driver does not directly overload the CPUs, as the threads are normally waiting for (slow) IO against the object store/filesystem to complete.

Caveats:

  • In Spark, multiple jobs may be committed in the same process, each of which will create their own thread pool during job commit or cleanup.
  • Azure rate throttling may be triggered if too many IO requests are made against the store. The rate throttling option mapreduce.manifest.committer.io.rate can help avoid this.

Optional: deleting target files in Job Commit

The classic FileOutputCommitter deletes files at the destination paths before renaming the job's files into place.

This is optional in the manifest committers, set in the option mapreduce.manifest.committer.delete.target.files with a default value of false. This increases performance and is safe to use when all files created by a job have unique filenames.

Apache Spark does generate unique filenames for ORC and Parquet since SPARK-8406 Adding UUID to output file name to avoid accidental overwriting.

Avoiding checks for/deleting target files saves one delete call per file being committed, so can save a significant amount of store IO.

When appending to existing tables, using formats other than ORC and Parquet, unless confident that unique identifiers are added to each filename, enable deletion of the target files.

spark.hadoop.mapreduce.manifest.committer.delete.target.files true