Checkpointing
Checkpointing is triggered by an internal checkpoint spout at the interval
specified by topology.state.checkpoint.interval.ms
. If there is at
least one IStatefulBolt in the topology, the checkpoint spout is automatically added
by the topology builder .
For stateful topologies, the topology builder wraps the IStatefulBolt
in a StatefulBoltExecutor
, which handles the state commits on receiving
the checkpoint tuples. Non-stateful bolts are wrapped in a
CheckpointTupleForwarder
, which simply forwards the checkpoint
tuples so that the checkpoint tuples can flow through the topology directed acyclic
graph (DAG).
Checkpoint tuples flow through a separate internal stream called
$checkpoint
. The topology builder wires the checkpoint stream
across the whole topology, with the checkpoint spout at the root.
At specified checkpoint intervals, the checkpoint spout emits checkpoint tuples. Upon receiving a checkpoint tuple, the state of the bolt is saved and the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all of its input streams before it saves its state, so state is consistent across the topology. Once the checkpoint spout receives an ack from all bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout.
This checkpoint mechanism builds on Storm's existing acking mechanism to replay the tuples. It uses concepts from the asynchronous snapshot algorithm used by Flink, and from the Chandy-Lamport algorithm for distributed snapshots. Internally, checkpointing uses a three-phase commit protocol with a prepare and commit phase, so that the state across the topology is saved in a consistent and atomic manner.