Spark Dynamic Partition overwriting

Spark has a feature called “Dynamic Partition Overwrites” that can be initiated in SQL:


INSERT OVERWRITE TABLE ...

Or through DataSet writes where the mode is overwrite and the partitioning matches that of the existing table:


sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// followed by an overwrite of a Dataset into an existing partitioned table.
eventData2
  .write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(existingDir)

This Spark feature:

  • Directs the job to write its new data to a temporary directory
  • After job commit completes, scans the output to identify the leaf directories “partitions” into which data was written
  • Deletes the content of those directories in the destination table
  • Renames the new files into the partitions.

This is all done in Spark, which takes over the tasks of scanning the intermediate output tree, deleting partitions and of renaming the new files.

This feature also adds the ability for a job to write data entirely outside the destination table, which is done by writing new files into the working directory, and then Spark moving them to the final destination in job commit.

The manifest committer is compatible with dynamic partition overwrites on Azure and Google Cloud Storage as together they meet the core requirements of the extension:

  • The working directory returned in getWorkPath() is in the same filesystem as the final output.
  • rename() is an O(1) operation which is safe and fast to use when committing a job.

None of the S3A committers support this. The first condition is not met by the staging committers, while the second condition is not met by S3 itself.

To use the manifest committer with dynamic partition overwrites, the Spark version must contain SPARK-40034 PathOutputCommitters to work with dynamic partition overwrite.

Be aware that the rename phase of the operation will be slow if many files are renamed--this is done sequentially. Parallel renaming would speed this up, but could trigger the ABFS overload problems the manifest committer is designed to both minimize the risk of and support recovery from.

The Spark side of the commit operation will be listing/treewalking the temporary output directory (some overhead), followed by the file promotion, done with a classic filesystem rename() call. There will be no explicit rate limiting here.

This means that _dynamic partitioning should not be used on Azure Storage for SQL queries/Spark DataSet operations where many thousands of files are created. The fact that these will suffer from performance problems before throttling scale issues surface, should be considered a warning.