Resource configuration for Kudu replication

Learn how to size resources for Kudu replication across three independent dimensions: the Kudu source cluster, the Kudu Java client, and the Flink cluster.

Sizing Kudu ource cluster

A longer --tablet_history_max_age_sec window means Kudu tablets accumulate more UNDO delta data before the system can perform garbage collection. You must monitor the following tablet server flag to ensure background maintenance stays current:

Flag Default Description
--maintenance_manager_num_threads 1 Specifies the number of threads for all background maintenance operations, such as flush, compaction, and UNDO garbage collection. With a large history window, the default thread can fall behind. The recommended ratio is 1 thread per 3 data directories.

Tuning replication job Kudu java client

The reader.* and writer.* parameters are the primary interface for tuning the Kudu Java client embedded in the replication job. The following parameters have the most significant impact on throughput and memory use:

Parameter Default Technical impact
reader.batchSizeBytes 20971520 (20 MiB) Controls the amount of data each scan RPC fetches. You can increase this value for large tables on fast networks or decrease it to reduce per-reader heap usage.
reader.splitSizeBytes Kudu default Smaller values produce finer-grained splits, which increases parallelism but requires more scan RPCs.
writer.maxBufferSize 1000 The number of operations buffered client-side before a flush occurs. Increase this value for higher write throughput; note that each buffered operation consumes heap memory.
writer.flushInterval 1000 ms Specifies how often buffered writes are flushed. Lower values reduce latency, while higher values improve batching efficiency.

Sizing flink cluster

The replication job is a two-operator pipeline (source to sink) with no intermediate transformations. Resource requirements are proportional to the configured parallelism.
Deployment modes
The job runs on YARN in one of two modes:
  • Application mode: This is the recommended mode for Flink 1.15 and later.
  • Per-job mode: This mode is deprecated since Flink 1.15 but remains supported.

In both modes, YARN spawns dedicated containers for the JobManager and TaskManagers. You must ensure that sufficient YARN resources, such as memory and vcores, are available if the cluster is shared with other workloads like Spark or Impala.

Parallelism and memory
You set parallelism at submission time by using the -p flag. Each parallel instance runs one Kudu reader and one Kudu writer.
  • Parallelism limits: The effective upper bound on useful parallelism is the number of scan tokens Kudu produces for the table. Setting -p higher than the tablet count does not improve performance because extra reader instances remain idle. To raise this ceiling, you can reduce the reader.splitSizeBytes property to divide large tablets into multiple tokens.
  • TaskManager memory: Sizing depends on row width, parallelism, and slot count. The main memory drivers are the scan batch buffer and the write buffer. If YARN kills TaskManagers due to out-of-memory (OOM) errors, you must increase the taskmanager.memory.process.size property or reduce the reader.batchSizeBytes property.
  • JobManager memory: The footprint for the enumerator is small regardless of table size. The Flink default is sufficient for most scenarios.
Example submission command:

To apply these settings, pass them as -D flags during job submission:

flink run-application -t yarn-application \
  -p 4 \
  -Dtaskmanager.numberOfTaskSlots=2 \
  -Dclassloader.parent-first-patterns.additional=org.apache.kudu \
  -c org.apache.kudu.replication.ReplicationJob \
  kudu-replication-<version>.jar \
  --job.sourceMasterAddresses source-master1:7051 \
  --job.sinkMasterAddresses   sink-master1:7051 \
  --job.tableName              my_table \
  --job.checkpointsDirectory  hdfs:///kudu-replication/checkpoints/my_table