Setting parallelism and max parallelism
The max parallelism is the most essential part of resource configuration for Flink applications as it defines the maximum jobs that are executed at the same time in parallel instances. However, you can optimize max parallelism in case your production goals differ from the default settings.
In a Flink application, the different tasks are split into several parallel instances for execution. The number of parallel instances for a task is called parallelism. Parallelism can be defined at the operator, client, execution environment and system level. Cloudera recommends setting parallelism to a lower value at first use, and increasing it over time if the job cannot keep up with the input rate.
To configure the max parallelism, setMaxParallelism
is called as it controls
the number of key-groups created by the state backends. A key-group is a partition of an operator
state. The number of key-groups determines how data is going to be distributed among the parallel
operators. If the key-groups are not distributed evenly, the data distribution is also
uneven.
- The number should be large enough to accommodate expected future load increases as this setting cannot be changed without starting from an empty state.
- If
P
is the selected parallelism for the job, the max parallelism should be divisible byP
to get even state distribution. - Please note that larger max parallelism settings have greater cost on the state backend side, for large scale production jobs benchmarking the size of the state based on the maximum parallelism is useful before changing this parameter.
Based on these criteria, Cloudera recommends setting the max paralellism to factorials or other numbers with a large number of divisors (120, 180, 240, 360, 720, 840, 1260), which will make parallelism tuning easier.
Stateless | In-memory state | RocksDB state |
---|---|---|
1 million record / sec / core | 100 000 records / sec / core | 10 000 records / sec / core |