ClouderaAdaptive-specific resources

You can use ClouderaAdaptive resources to create your policy. The resources include the ClouderaAdaptive.jexl script and the ClouderaAdaptive.json file.

ClouderaAdaptive.jexl

The Lakehouse Optimizer Data Hub contains the default ClouderaAdaptive.jexl file which is a policy script.

You can add or modify the script elements as necessary. Each action builder statement generates a maintenance action.

Curl command to fetch the default ClouderaAdaptive.jexl file contents

curl --location 'https://[***LAKEHOUSE OPTIMIZER ENDPOINT***]/api/v1/policies/resource?uri=dlm%3A%2F%2Ftps%3Adefault%2FClouderaAdaptive'
        \ --header 'Authorization: Bearer ey'

Default JEXL file contents

The following sample script shows the default ClouderaAdaptive.jexl file contents:
/*
 * Description : Policy that evaluates if a certain maintenance action is to be scheduled based on iceberg stats.
 *
 * Scheduling
 *
 * Cron expressions are used to define the schedule for policy evaluation. This follows the Quartz cron syntax.
 * 0 0/10 * ? * * will trigger the policy evaluation every 10 minutes.
 * Refer this for more details : https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html
 *
 * Iceberg statistics
 *
 * #pragma dlm-statistics : If this is set to true stats will pre-computed before the evaluation.
 * statistics(Table, force) : Method defined in the jexl policy context to retrieve the table stats from cache.
 *
 * Arguments
 *
 * $table : Iceberg table object. This is used to retrieve the table properties.
 * $constants : Policy constants applicable on this table.
 * stats : Iceberg table stats used for decision making.
 * dlm : Builder instance used to create ActionBuilder objects.
 *
 * Evaluation of actions
 *
 * Expire Snapshot : Expires the iceberg table snapshots based on the criteria, trigger this action if number of snapshots is greater than the
 * dlm.expireSnapshot.retainLast defined in the policy constants or if the duration between current and oldest snapshot is exceeding the dlm.expireSnapshot.snapshotsDurationDeltaMax (milliseconds).
 *
 * Data file Compaction/Rewrite data file : Compacts small data files and removes delete files.
 * This action is triggered if there is an expected significant drop in the number of files.
 * The compaction operation is guided by statistics and the target file size and small files to determine whether compaction should be performed.
 * The threshold for small files can be stored in table properties as (write.bin-packing.min-file-count) or minInputFiles from json file.
 * The threshold for the reduction or drop can be either stored as an Iceberg table property (dlm.rewriteDataFiles.filesCountDrop)
 * or as a policy property ($constants.rewriteDataFiles.filesCountDrop). If none of these are specified, the
 * value 0.15 is used; if a reduction in number of files greater that 15% is expected, compact the table.
 * Note that the ratio is calculated based on the target file size write.target-file-size-bytes.
 *
 * Rewrite manifest/Compact manifests : Compacts manifest files, trigger this action based on table('write.manifest.target-size-bytes') / json (manifestFileSize) property  if
 * 1) the manifest file count exceeds the table property dlm.rewriteManifest.smallFileRatioMax.
 * 2) the table property dlm.rewriteManifest.fileCountMax / json (fileCountMax).
 *
 * This dlm.rewriteManifest.smallFileRatioMax ratio should by default is 0.5, if kept lower that means rewrite manifest will aggressively act on table, if kept
 * higher then rewrite manifest will lazily act on table.
 *
 * Delete orphan files : Delete the files ( manifest, metadata.json, data/delete files ) which are not tracked by any snapshot, trigger this action weekly or monthly.
 *
*/
// Set to true if iceberg table stats pre-computation is required.
#pragma dlm.statistics true
// Stores the list of generated actions
const actions = [...]
// Fetch iceberg stats
let stats = statistics($table);
// Evaluate compaction
if ($constants.rewriteDataFiles.enabled) {
    let filesCountDrop = $table['dlm.rewriteDataFiles.filesCountDrop'] ?? $constants.rewriteDataFiles.filesCountDrop ?? 0.15;
    // Target file size used to compute the reduction is defined in the stat computation module.
    // use specified target file size or default if no policy specific
    let targetFileSize = $table['write.target-file-size-bytes'] ?? $constants.rewriteDataFiles.targetFileSize ?? 512*1024*1024;
    let minInputFiles = $table['write.bin-packing.min-file-count'] ?? $constants.rewriteDataFiles.minInputFiles ?? 5;
    stats = stats.compactionInfo.fetchCompactionInfo($table, stats, targetFileSize, filesCountDrop, minInputFiles)
    if (stats.compactionInfo.eligiblePartitionsCount > 0) {
       let partialProgressEnabled = $table['dlm.rewriteDataFiles.partialProgressEnabled'] ?? $constants.rewriteDataFiles.partialProgressEnabled ?? false;
       let partialProgressMaxCommits = $table['dlm.rewriteDataFiles.partialProgressMaxCommits'] ?? $constants.rewriteDataFiles.partialProgressMaxCommits ?? 10;
       let useStartingSequenceNumber = $table['dlm.rewriteDataFiles.useStartingSequenceNumber'] ?? $constants.rewriteDataFiles.useStartingSequenceNumber ?? false;
       let rewriteAll = $table['dlm.rewriteDataFiles.rewriteAll'] ?? $constants.rewriteDataFiles.rewriteAll ?? false;
       let maxConcurrentRewriteFileGroups = $table['dlm.rewriteDataFiles.maxConcurrentRewriteFileGroups'] ?? $constants.rewriteDataFiles.maxConcurrentRewriteFileGroups ?? 5;
       let deleteFileThreshold = $table['dlm.rewriteDataFiles.deleteFileThreshold'] ?? $constants.rewriteDataFiles.deleteFileThreshold ?? 2000000;
        const compaction = dlm:rewriteDataFiles($table)
            .targetFileSize(targetFileSize)
            .partialProgressEnabled(partialProgressEnabled)
            .partialProgressMaxCommits(partialProgressMaxCommits)
            .useStartingSequenceNumber(useStartingSequenceNumber)
            .rewriteAll(rewriteAll)
            .maxConcurrentRewriteFileGroups(maxConcurrentRewriteFileGroups)
            .minInputFiles(minInputFiles)
            .deleteFileThreshold(deleteFileThreshold);
            
        // Comma separated column names , Eg : a,b,c
        let zOrderColumns = $table['dlm.rewriteDataFiles.zOrderColumns'] ?? $constants.rewriteDataFiles.zOrderColumns;
        // default strategy is binpack and default sort order is table's sort order
        if (zOrderColumns != null) {
            compaction.zOrder(zOrderColumns.split(","));
        }
        let where  = $table['dlm.rewriteDataFiles.where'] ?? $constants.rewriteDataFiles.where;
        if (where != null) {
            compaction.where(where);
        }
        // Sort items (columnName, orderAsc, nullFirst) , Eg : (a false true),(b true false)
        let sort = $table['dlm.rewriteDataFiles.sort'] ?? $constants.rewriteDataFiles.sort;
        if (sort != null) {
            let sortItemArray = sort.split(",")
            for (let item : sortItemArray) {
                let sortItem  = item.substring(1, size(item) - 1);
                let arr = sortItem.split(" ");
                compaction.sort(arr[0], arr[1] == 'true', arr[2] == 'true');
            }
        }
        actions.add(compaction);
    }
}
// Evaluate rewrite manifest
if ($constants.rewriteManifest.enabled) {
    let manifestMax = $table['dlm.rewriteManifest.fileCountMax'] ?? $constants.rewriteManifest.fileCountMax ?? 1000;
    let manifestFileSize = $table['write.manifest.target-size-bytes'] ?? $constants.rewriteManifest.manifestFileSize ?? 8388608;
    stats = stats.manifestInfo.fetchManifestInfo($table, stats, manifestFileSize)
    let manifestCount = stats.manifestInfo.manifestFilesCount;
    let smallManifestRatio = stats.manifestInfo.smallManifestRatio;
    let smallRatioThreshold =
       $table['dlm.rewriteManifest.smallFileRatioMax']
       ?? $constants.rewriteManifest.smallFileRatioMax
       ?? 0.5;
    if (manifestCount >= manifestMax && smallManifestRatio >= smallRatioThreshold) {
       actions.add(dlm:rewriteManifests($table, $constants));
    }
}
// Delete orphan files
if ($constants.deleteOrphanFiles.enabled) {
    actions.add(dlm:deleteOrphanFiles($table, $constants));
}
// Rewrite positional delete files
if ($constants.rewritePositionDelete.enabled) {
    actions.add(dlm:rewritePositionDeletes($table, $constants));
}
// Evaluate expire snapshot
if ($constants.expireSnapshot.enabled) {
    let min = $table['dlm.expireSnapshot.retainLast'] ?? $constants.expireSnapshot.retainLast ?? 2; // if null 1
    let snapshots = stats.numberOfSnapshots;
    let snapshotTimestampDelta = stats.snapshotTimestampDelta ?? 3600000; // if null 0
    let deltaDurationThreshold = $table['dlm.expireSnapshot.snapshotsDurationDeltaMax'] ?? $constants.expireSnapshot.snapshotsDurationDeltaMax ?? 0;
    if (snapshots > min || snapshotTimestampDelta > deltaDurationThreshold) {
        // create an expire snapshot action
        actions.add(dlm:expireSnapshots($table, $constants));
    }
}
// return the list of actions
actions

Table statistics

  • The #pragma dlm.statistics true statement ensures the pre-computation of statistics before the evaluation phase.
  • The const stats = statistics($table) method gets the table statistics.

Using the policy constants to specify the maintenance type and the default action in the default JEXL file

Policy constants are maintenance operation types with their default action argument values.

Action builders can be used to generate the required action with the arguments defined in the policy constants. For example, dlm:expireSnapshots($table, $constants).

The policy script supports retrieving the values from the table property or the policy constants. For example, let filesCountDrop =$table['dlm.rewriteDataFiles.filesCountDrop'] ?? $constants.rewriteDataFiles.filesCountDrop ?? 0.15; .

For more information about modifying the JEXL script contents, see JEXL syntax.

Viewing the table properties in the default JEXL file

If you have configured the Iceberg table properties using DDL commands, you can use the table property values in the script.

ClouderaAdaptive.json

Curl command to fetch the contents of the default ClouderaAdaptive.json file

curl --location 'https://[***LAKEHOUSE OPTIMIZER ENDPOINT***]/api/v1/policies/resource?uri=dlm%3A%2F%2Ftpp%3Adefault%2FClouderaAdaptive'
        \
--header 'Authorization: Bearer ey'

You can use the default action arguments and values as is. If your use case requires more arguments in addition to the default action arguments or you want to tune the existing default arguments, then use the default JSON file as a template to add or modify the arguments, upload the modified file as a new JSON file, and reschedule the namespace.

ClouderaAdaptive.json file contents

The following snippet shows the default ClouderaAdaptive.json file contents:
{
  "expireSnapshot": {
    "expireOlderThan": 432000000,
    "retainLast": 50,
    "snapshotsDurationDeltaMax": 432000000
  },
  "rewriteManifest" : {
    "useCaching": true,
    "fileCountMax": 100,
    "manifestFileSize": 8388608,
    "smallFileRatioMax": 0.5
  },
  "rewriteDataFiles" : {
    "targetFileSize": 536870912,
    "filesCountDrop": 0.15,
    "minInputFiles": 5,
    "partialProgressEnabled": false
  },
  "rewritePositionDelete" : {
    "enabled" : true,
    "targetFileSize": 67108864,
    "maxConcurrentGroupRewrite": 10,
    "minInputFiles": 6
  },
  "deleteOrphanFiles" : {
    "olderThan": 259200000
  },
  "label": "Cloudera Adaptive policy"
}

Action arguments in the default JSON file

The following table lists the action arguments that are specific to the default ClouderaAdaptive JSON file, their default values and ranges, and their description:
Table 1. Action arguments specific to default JSON file
Action argument Value Can be overwritten by specified table properties Description
expireSnapshot
snapshotsDurationDeltaMax Default is 600000 ms snapshotsDurationDeltaMax Defines the maximum time duration between the latest and oldest snapshot duration in milliseconds.
rewriteManifest
fileCountMax Default is 5 fileCountMax Defines the maximum manifest file count to trigger the rewrite action.
manifestFileSize Default is 8388608 write.manifest.target-size-bytes Defines the file size of a manifest file. Any manifest file that is less than the defined size is considered to be a small manifest file.
smallFileRatioMax Default is 0.5 smallFileRatioMax Determines the threshold for small manifest files count ratio. A manifest file is considered to be small if it is less than the manifestFileSize value. The ratio is calculated using the smallManifestFileCount divided by totalManifestCount equation.
rewriteDataFiles
filesCountDrop Default is 0.15 filesCountDrop Determines the threshold for file count reduction ratio per partition. A partition is eligible for compaction when the file count drop ratio is greater than the specified value. The ratio is calculated using the (totalFilesexpectedFileCount) / totalFiles equation. For example, run the compaction task only when more than 15% reduction is possible.
zOrderColumns - zOrderColumns Specifies the Z-order column to be use during the data file compaction process.
where - where Specifies the filter criteria to rewrite the data files during the compaction process.
sort - sort Determines how to sort the data files.

CRON expression to schedule the table maintenance

The default ClouderaAdaptive.json file does not have a schedule. You must upload a JSON file with the required CRON expression to define the schedule of the evaluation phase. Alternatively, you can perform manual table maintenance.

For example, the "cron": "0 0 * * * ?" expression runs the evaluation phase every hour. For more information about using CRON expressions, see CRON expression generator.

The following sample curl uploads a policy constant for ClouderaAdaptive at catalog level:
curl --location --request PUT 'https://[***LAKEHOUSE OPTIMIZER ENDPOINT***]/api/v1/policies/resource?uri=dlm%3A%2F%2Ftpp%2Fhive%2FClouderaAdaptive' \
--header 'Authorization: Bearer ey' \
--form 'resource=@"/Users/test/Desktop/ClouderaAdaptive.json"
In this scenario, the ClouderaAdaptive.json file only contains the following CRON expression:
 {
 "cron": "0 2 * * *" 
}