Implementing State Management
This subsection describes state management APIs and architecture for core Storm.
Stateful abstractions allow Storm bolts to store and retrieve the state of their computations. The state management framework automatically, periodically snapshots the state of bolts across a topology. There is a default in-memory-based state implementation, as well as a Redis-backed implementation that provides state persistence.
Bolts that require state to be managed and persisted by the framework should implement
the IStatefulBolt
interface or extend BaseStatefulBolt
, and
implement the void initState(T state)
method. The initState
method is invoked by the framework during bolt initialization. It contains the
previously saved state of the bolt. Invoke initState
after
prepare
, but before the bolt starts processing any tuples.
Currently the only supported State implementation is KeyValueState
, which
provides key-value mapping.
The following example describes how to implement a word count bolt that uses the key-value state abstraction for word counts:
public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Integer>> { private KeyValueState<String,Integer> wordCounts; ... @Override public void initState(KeyValueState<String,Integer> state) { wordCounts = state; } @Override public void execute(Tuple tuple) { String word = tuple.getString(0); Integer count = wordCounts.get(word, 0); count++; wordCounts.put(word, count); collector.emit(tuple, new Values(word, count)); collector.ack(tuple); } ... }
Extend the
BaseStatefulBolt
and type parameterize it withKeyValueState
, to store the mapping of word to count.In the
init
method, initialize the bolt with its previously saved state: the word count last committed by the framework during the previous run.In the
execute
method, update the word count.
The framework periodically checkpoints the state of the bolt (default every
second). The frequency can be changed by setting the storm config
topology.state.checkpoint.interval.ms
.
For state persistence, use a state provider that supports persistence by
setting the topology.state.provider in the storm config. For example, for Redis
based key-value state implementation, you can set
topology.state.provider
to
org.apache.storm.redis.state.RedisKeyValueStateProvider
in
storm.yaml
. The provider implementation .jar should be in the
class path, which in this case means placing the storm-redis-*
.jar
in the extlib
directory.
You can override state provider properties by setting
topology.state.provider.config
. For Redis state this is a JSON
configuration with the following properties:
{ "keyClass": "Optional fully qualified class name of the Key type.", "valueClass": "Optional fully qualified class name of the Value type.", "keySerializerClass": "Optional Key serializer implementation class.", "valueSerializerClass": "Optional Value Serializer implementation class.", "jedisPoolConfig": { "host": "localhost", "port": 6379, "timeout": 2000, "database": 0, "password": "xyz" } }