Configuring Flink application resources
Generally, Flink automatically identifies the resources for an application. You can configure the Taskmanager, buffer timeout and high availability for a Flink application to maintain resources in a more efficient way.
YARN automatically kills application containers that use more memory than their allocated limit. To avoid Flink TaskManagers getting killed by YARN use the following calculation to set the size: TM total size = TM Heap + Heap-cutoff. The default heap cutoff is 25% and it is also configurable.
Furthermore, use a YARN queue with preemption disabled to avoid long running jobs being affected when the cluster reaches its capacity limit.
Flink uses network buffers to transfer data from one operator to another. These buffers are filled up with data during the specified time for the timeout. In case of high data rates, the set time is usually never reached. For cases when the data rate is high the throughput can be further increased with setting the buffer timeout to an intentionally higher value due to the characteristics of the TCP channel. However this in turn increases the latency of the pipeline.
Flink on YARN jobs are configured to tolerate a maximum number of failed containers before they terminate. Configure the YARN maximum failed containers setting in proportion to the total parallelism and the expected lifetime of the job.
Configuration | Parameter | Recommended value |
---|---|---|
TM container memory | -ytm / taskmanager.heap.size |
TM Heap + Heap-cutoff |
Max parallelism | env.setMaxParallelism(num) |
120,720,1260,5040 |
Container heap cutoff | containerized.heap-cutoff-ratio |
0.25-0.75 |
Buffer timeout | env.setBufferTimeout(millis) |
1-100 |
YARN queue | -yqu |
A queue with no preemption |
YARN max failed containers | yarn.maximum-failed-containers |
3*num_containers |
YARN max AM failures | yarn.application-attempts |
3-5 |