Configuring state backend for SSB
The same state backend configuration is applied to the SQL jobs in SQL Stream Builder (SSB) as configured for the Flink jobs. While Hashmap stores data as object on Java heap, RocksDB can be used to store larger state that does not fit easily in memory. The RocksDB state backend uses a combination of fast in-memory cache and optimized disk based lookups to manage state.
- RocksDB
- By default, RocksDB is configured as the state backend for Flink. The RocksDB state
backend holds in-flight data in a RocksDB database that is stored in the TaskManager
local data directories and performs asynchronous snapshots. The data is stored as
serialized byte arrays. The arrays are defined by the type serializer. This results in
key comparisons being byte-wise instead of the
hashCode()
andequals()
Java methods in the Hashmap state backend. The RocksDB state backend is recommended for large state and key/value states, long windows and high-availability setups. The amount of state is limited by the amount of disk space available. This also means that the maximum throughput that can be achieved will be lower as all reads/writes through the RocksDB backend have to go through deserialization and serialization.
- Hashmap
- The Hashmap state backend holds data internally as objects on the Java heap. The key/value state and window operators hold hash tables that store the values, triggers and so on. The Hashmap state backend is recommended for moderate state and key/value states, long windows and high-availability setups.
You can choose between Hashmap and RocksDB as the state backend based on your performance and scalability requirements. State can be accessed and updated faster with Hashmap on the Java heap, but the size is limited by the available memory in the cluster. Even though RocksDB can scale based on available disk space and is the only state backend to support incremental snapshots, accessing and updating the state requires more resources that can affect the performance compared to storing the state in-memory. For more information about the State backends in Flink, see the Apache Flink documentation.
You can configure the state backend for your streaming application using Cloudera Manager or you can also specify the state backend in the SQL editor in Streaming SQL Console.
- Open your cluster in Cloduera Manager.
- Select Flink from the list of services.
- Click Configuration.
- Search for
state.backend
and select HASHMAP or ROCKSDB based on your requirements. ROCKSDB is set by default. - Click Save changes.
- The per-job state backend can be configured by setting the
state.backend.type
tohashmap
orrocksdb
on the Streaming SQL Console when executing your SQL job in a per-job mode as shown in the following example:SET 'execution.target' = 'yarn-per-job'; SET 'state.backend.type' = 'rocksdb';
SET 'execution.target' = 'yarn-per-job'; SET 'state.backend.type' = hashmap;
Configuring RocksDB
When choosing RocksDB as state backend, you can also adjust how much memory RocksDB should use as a cache to increase lookup performance by setting the memory managed fraction of the TaskManagers.
The default fraction value is 0.4, but with larger cache requirements you need to increase this value together with the total memory size. For more information, see the Set up TaskManager Memory and the Memory Tuning Guide documentations:
-
- Open your cluster in Cloduera Manager.
- Select Flink from the list of services.
- Click Configuration.
- Search for
taskmanager.memory.managed.fraction
and change the default value as needed. - Click Save changes.
- You can configure the
taskmanager.memory.managed.fraction
on Streaming SQL Console with executing aSET
statement when submitting your SQL job in a per-job mode as shown in the following example:SET 'execution.target' = 'yarn-per-job'; SET 'taskmanager.memory.managed.fraction' = '0.4';
DEFAULT
: option set by default and applicable for all settings.SPINNING_DISK_OPTIMIZED
: option for regular spinning disks configured for better performance. The option is configured with the following values:CompactionStyle=LEVEL
LevelCompactionDynamicLevelBytes=true
MaxBackgroundJobs=4
MaxOpenFiles=-1
SPINNING_DISK_OPTIMIZED_HIGH_MEM
: option for regular spinning hard disks configured for better performance, but with higher memory consumption. The option is configured with the following values:BlockCacheSize=256 MBytes
BlockSize=128 KBytes
FilterPolicy=BloomFilter
LevelCompactionDynamicLevelBytes=true
MaxBackgroundJobs=4
MaxBytesForLevelBase=1 GByte
MaxOpenFiles=-1
MaxWriteBufferNumber=4
MinWriteBufferNumberToMerge=3
TargetFileSizeBase=256 MBytes
WriteBufferSize=64 MBytes
FLASH_SSD_OPTIMIZED
: option for flash SSDs configured for better performance. The following additional options are set:MaxBackgroundJobs=4
MaxOpenFiles=-1
-
- Open your cluster in Cloduera Manager.
- Select Flink from the list of services.
- Click Configuration.
- Search for
state.backend.rocksdb.predefined-options
and change the default value as needed. - Click Save changes.
- You can configure the
state.backend.rocksdb.predefined-options
on Streaming SQL Console with executing aSET
statement when submitting your SQL job in a per-job mode as shown in the following example:SET 'execution.target' = 'yarn-per-job'; SET 'state.backend.rocksdb.predefined-options' = 'SPINNING_DISK_OPTIMIZED'; SET 'state.backend.rocksdb.predefined-options' = 'SPINNING_DISK_OPTIMIZED_HIGH_MEM'; SET 'state.backend.rocksdb.predefined-options' = 'FLASH_SSD_OPTIMIZED';