Configuring state backend for SSB

The same state backend configuration is applied to the SQL jobs in SQL Stream Builder (SSB) as configured for the Flink jobs. While Hashmap stores data as object on Java heap, RocksDB can be used to store larger state that does not fit easily in memory. The RocksDB state backend uses a combination of fast in-memory cache and optimized disk based lookups to manage state.

The following state backend options are available for your Flink application:
RocksDB
By default, RocksDB is configured as the state backend for Flink. The RocksDB state backend holds in-flight data in a RocksDB database that is stored in the TaskManager local data directories and performs asynchronous snapshots. The data is stored as serialized byte arrays. The arrays are defined by the type serializer. This results in key comparisons being byte-wise instead of the hashCode() and equals() Java methods in the Hashmap state backend. The RocksDB state backend is recommended for large state and key/value states, long windows and high-availability setups. The amount of state is limited by the amount of disk space available. This also means that the maximum throughput that can be achieved will be lower as all reads/writes through the RocksDB backend have to go through deserialization and serialization.
Hashmap
The Hashmap state backend holds data internally as objects on the Java heap. The key/value state and window operators hold hash tables that store the values, triggers and so on. The Hashmap state backend is recommended for moderate state and key/value states, long windows and high-availability setups.

You can choose between Hashmap and RocksDB as the state backend based on your performance and scalability requirements. State can be accessed and updated faster with Hashmap on the Java heap, but the size is limited by the available memory in the cluster. Even though RocksDB can scale based on available disk space and is the only state backend to support incremental snapshots, accessing and updating the state requires more resources that can affect the performance compared to storing the state in-memory. For more information about the State backends in Flink, see the Apache Flink documentation.

You can configure the state backend for your streaming application using Cloudera Manager or you can also specify the state backend in the SQL editor in Streaming SQL Console.

  1. Open your cluster in Cloduera Manager.
  2. Select Flink from the list of services.
  3. Click Configuration.
  4. Search for state.backend and select HASHMAP or ROCKSDB based on your requirements. ROCKSDB is set by default.
  5. Click Save changes.
When configuring the state backend in Cloudera Manager, the configuration serves as a default global setting and is applied for every SSB job. You can override the default set in Cloudera Manager by specifying the state backend directly when submitting your SSB job.
The per-job state backend can be configured by setting the state.backend.type to hashmap or rocksdb on the Streaming SQL Console when executing your SQL job in a per-job mode as shown in the following example:
SET 'execution.target' = 'yarn-per-job';
SET 'state.backend.type' = 'rocksdb';
SET 'execution.target' = 'yarn-per-job';
SET 'state.backend.type' = hashmap;

Configuring RocksDB

When choosing RocksDB as state backend, you can also adjust how much memory RocksDB should use as a cache to increase lookup performance by setting the memory managed fraction of the TaskManagers.

The default fraction value is 0.4, but with larger cache requirements you need to increase this value together with the total memory size. For more information, see the Set up TaskManager Memory and the Memory Tuning Guide documentations:

  1. Open your cluster in Cloduera Manager.
  2. Select Flink from the list of services.
  3. Click Configuration.
  4. Search for taskmanager.memory.managed.fraction and change the default value as needed.
  5. Click Save changes.
You can configure the taskmanager.memory.managed.fraction on Streaming SQL Console with executing a SET statement when submitting your SQL job in a per-job mode as shown in the following example:
SET 'execution.target' = 'yarn-per-job';
SET 'taskmanager.memory.managed.fraction' = '0.4';
RocksDB also has predefined options that can be changed based on your system configurations:
  • DEFAULT: option set by default and applicable for all settings.
  • SPINNING_DISK_OPTIMIZED: option for regular spinning disks configured for better performance. The option is configured with the following values:
    • CompactionStyle=LEVEL
    • LevelCompactionDynamicLevelBytes=true
    • MaxBackgroundJobs=4
    • MaxOpenFiles=-1
  • SPINNING_DISK_OPTIMIZED_HIGH_MEM: option for regular spinning hard disks configured for better performance, but with higher memory consumption. The option is configured with the following values:
    • BlockCacheSize=256 MBytes
    • BlockSize=128 KBytes
    • FilterPolicy=BloomFilter
    • LevelCompactionDynamicLevelBytes=true
    • MaxBackgroundJobs=4
    • MaxBytesForLevelBase=1 GByte
    • MaxOpenFiles=-1
    • MaxWriteBufferNumber=4
    • MinWriteBufferNumberToMerge=3
    • TargetFileSizeBase=256 MBytes
    • WriteBufferSize=64 MBytes
  • FLASH_SSD_OPTIMIZED: option for flash SSDs configured for better performance. The following additional options are set:
    • MaxBackgroundJobs=4
    • MaxOpenFiles=-1
You can set the Rocksdb predefined options in the following ways:
  1. Open your cluster in Cloduera Manager.
  2. Select Flink from the list of services.
  3. Click Configuration.
  4. Search for state.backend.rocksdb.predefined-options and change the default value as needed.
  5. Click Save changes.
You can configure the state.backend.rocksdb.predefined-options on Streaming SQL Console with executing a SET statement when submitting your SQL job in a per-job mode as shown in the following example:
SET 'execution.target' = 'yarn-per-job';
SET 'state.backend.rocksdb.predefined-options' = 'SPINNING_DISK_OPTIMIZED';
SET 'state.backend.rocksdb.predefined-options' = 'SPINNING_DISK_OPTIMIZED_HIGH_MEM';
SET 'state.backend.rocksdb.predefined-options' = 'FLASH_SSD_OPTIMIZED';