Checkpointing
Checkpointing is triggered by an internal checkpoint spout at the interval specified
by topology.state.checkpoint.interval.ms
. If there is at least one
IStatefulBolt in the topology, the checkpoint spout is automatically added by the topology
builder .
For stateful topologies, the topology builder wraps the IStatefulBolt
in a StatefulBoltExecutor
, which handles the state commits on receiving the
checkpoint tuples. Non-stateful bolts are wrapped in a CheckpointTupleForwarder
, which simply forwards the
checkpoint tuples so
that the checkpoint tuples can flow through the topology directed acyclic graph
(DAG).
Checkpoint tuples flow through a separate internal stream called
$checkpoint
. The topology builder wires the checkpoint stream
across the whole topology, with the checkpoint spout at the root.
At specified checkpoint intervals, the checkpoint spout emits checkpoint tuples. Upon receiving a checkpoint tuple, the state of the bolt is saved and the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all of its input streams before it saves its state, so state is consistent across the topology. Once the checkpoint spout receives an ack from all bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout.
This checkpoint mechanism builds on Storm's existing acking mechanism to replay the tuples. It uses concepts from the asynchronous snapshot algorithm used by Flink, and from the Chandy-Lamport algorithm for distributed snapshots. Internally, checkpointing uses a three-phase commit protocol with a prepare and commit phase, so that the state across the topology is saved in a consistent and atomic manner.