Configuring Flink application resources

Generally, Flink automatically identifies the resources for an application. You can configure the Taskmanager, buffer timeout and high availability for a Flink application to maintain resources in a more efficient way.

YARN automatically kills application containers that use more memory than their allocated limit. To avoid Flink TaskManagers getting killed by YARN use the following calculation to set the size: TM total size = TM Heap + Heap-cutoff. The default heap cutoff is 25% and it is also configurable.

Furthermore, use a YARN queue with preemption disabled to avoid long running jobs being affected when the cluster reaches its capacity limit.

Flink uses network buffers to transfer data from one operator to another. These buffers are filled up with data during the specified time for the timeout. In case of high data rates, the set time is usually never reached. For cases when the data rate is high the throughput can be further increased with setting the buffer timeout to an intentionally higher value due to the characteristics of the TCP channel. However this in turn increases the latency of the pipeline.

Flink on YARN jobs are configured to tolerate a maximum number of failed containers before they terminate. Configure the YARN maximum failed containers setting in proportion to the total parallelism and the expected lifetime of the job.

High Availability is enabled by default in CSA. This eliminates the Job Manager as a single point of failure. You can also tune the application resilience by setting the YARN maximum application attempts, which determines how many times the application will retry in case of failures.
Table 1. Reference values
Configuration Parameter Recommended value
TM container memory -ytm / taskmanager.heap.size TM Heap + Heap-cutoff
Max parallelism env.setMaxParallelism(num) 120,720,1260,5040
Container heap cutoff containerized.heap-cutoff-ratio 0.25-0.75
Buffer timeout env.setBufferTimeout(millis) 1-100
YARN queue -yqu A queue with no preemption
YARN max failed containers yarn.maximum-failed-containers 3*num_containers
YARN max AM failures yarn.application-attempts 3-5