Configuring state backend for Flink
You can choose between RocksDB and Hashmap as a state backend for your Flink streaming application. While Hashmap stores data as an object on Java heap, RocksDB can be used to store a larger state that does not fit easily in memory. The RocksDB state backend uses a combination of fast in-memory cache and optimized disk based lookups to manage state.
- RocksDB
- By default, RocksDB is configured as the state backend for Flink. The
RocksDB state backend holds in-flight data in a RocksDB database that is
stored in the TaskManager local data directories and performs asynchronous
snapshots. The data is stored as serialized byte arrays. The arrays are
defined by the type serializer. This results in key comparisons being
byte-wise instead of the
hashCode()
andequals()
Java methods in the Hashmap state backend. The RocksDB state backend is recommended for large state and key/value states, long windows and high-availability setups. The amount of state is limited by the amount of disk space available. This also means that the maximum throughput that can be achieved will be lower as all reads/writes through the RocksDB backend have to go through deserialization and serialization.
- Hashmap
- The Hashmap state backend holds data internally as objects on the Java heap. The key/value state and window operators hold hash tables that store the values, triggers and so on. The Hashmap state backend is recommended for moderate state and key/value states, long windows and high-availability setups.
You can choose between Hashmap and RocksDB as the state backend based on your performance and scalability requirements. State can be accessed and updated faster with Hashmap on the Java heap, but the size is limited by the available memory in the cluster. Even though RocksDB can scale based on available disk space and is the only state backend to support incremental snapshots, accessing and updating the state requires more resources that can affect the performance compared to storing the state in-memory. For more information about the State backends in Flink, see the Apache Flink documentation.
You can configure the state backend for your streaming application using Cloudera Manager or you can also specify the state backend in your Flink application.
- Open your cluster in Cloduera Manager.
- Select Flink from the list of services.
- Click Configuration.
- Search for
state.backend
and select HASHMAP or ROCKSDB based on your requirements. ROCKSDB is set by default. - Click Save changes.
- The per-job state backend can be configured by setting the
env.setStateBackend
toHashMapStateBackend
orEmbeddedRocksDBStateBackend
on theStreamExecutionEnvironment
of the job as shown in the following example:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend());
Configuring RocksDB
When choosing RocksDB as state backend, you can also adjust how much memory RocksDB should use as a cache to increase lookup performance by setting the memory managed fraction of the TaskManagers.
The default fraction value is 0.4, but with larger cache requirements you need to increase this value together with the total memory size. For more information, see the Set up TaskManager Memory and the Memory Tuning Guide documentations:
-
- Open your cluster in Cloduera Manager.
- Select Flink from the list of services.
- Click Configuration.
- Search for
taskmanager.memory.managed.fraction
and change the default value as needed. - Click Save changes.
- You can also configure the Taskmanager Memory when submitting the Flink
job in CLI as shown in the following examples:
- Per-job
mode:
flink run -yD taskmanager.memory.managed.fraction=<fraction_size> -t <job_jar_file>
- Session
mode:
flink-yarn-session -yD taskmanager.memory.managed.fraction=<fraction_size>
- Application
mode:
flink run-application -t yarn-application -Dtaskmanager.memory.managed.fraction=<fraction_size> <job_jar_file>
- Per-job
mode:
DEFAULT
: option set by default and applicable for all settings.SPINNING_DISK_OPTIMIZED
: option for regular spinning disks configured for better performance. The option is configured with the following values:CompactionStyle=LEVEL
LevelCompactionDynamicLevelBytes=true
MaxBackgroundJobs=4
MaxOpenFiles=-1
SPINNING_DISK_OPTIMIZED_HIGH_MEM
: option for regular spinning hard disks configured for better performance, but with higher memory consumption. The option is configured with the following values:BlockCacheSize=256 MBytes
BlockSize=128 KBytes
FilterPolicy=BloomFilter
LevelCompactionDynamicLevelBytes=true
MaxBackgroundJobs=4
MaxBytesForLevelBase=1 GByte
MaxOpenFiles=-1
MaxWriteBufferNumber=4
MinWriteBufferNumberToMerge=3
TargetFileSizeBase=256 MBytes
WriteBufferSize=64 MBytes
FLASH_SSD_OPTIMIZED
: option for flash SSDs configured for better performance. The following additional options are set:MaxBackgroundJobs=4
MaxOpenFiles=-1
-
- Open your cluster in Cloduera Manager.
- Select Flink from the list of services.
- Click Configuration.
- Search for
state.backend.rocksdb.predefined-options
and change the default value as needed. - Click Save changes.
- Update the
EmbeddedRocksDBStateBackend
for theStreamExecutionEnvironment
of the Flink job as shown in the following example:EmbeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)