Checkpoint
To make your Flink application fault tolerant, you need to enable checkpointing in your application design. If an error occurs, you can reload your application with state by using the automatically saved snapshot of your data stream.
Checkpointing improves Flink fault tolerance by enabling application to recover their state and stream to the same point in time as the latest availableat a checkpoint in case of failure. Checkpointing is not enabled by default.
- The source is capable to replay records for a certain amount of time (such as Kafka or HDFS)
- The storage for the state is persistent (such as HDFS)
Enable checkpointing in Flink with env.enableCheckpointing(num)
, where
env
is an instance of StreamExecutionEnvironment
and
num
refers to the checkpoint interval in milliseconds. The
env.getCheckpointConfig()
configuration object is used to configure the
checkpointing settings.
See the Apache Flink documentation for the available settings.
// Configure checkpointing if interval is set
long cpInterval = params.getLong("checkpoint.interval.millis", TimeUnit.MINUTES.toMillis(1));
if (cpInterval > 0) {
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointInterval(cpInterval);
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConf.setCheckpointTimeout(TimeUnit.HOURS.toMillis(1));
checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setUseSnapshotCompression(true);
}