Checkpoint

To make your Flink application fault tolerant, you need to enable checkpointing in your application design. If an error occurs, you can reload your application with state by using the automatically saved snapshot of your data stream.

Checkpointing improves Flink fault tolerance by enabling application to recover their state and stream to the same point in time as the latest availableat a checkpoint in case of failure. Checkpointing is not enabled by default.

Before enabling this feature make sure that:
  • The source is capable to replay records for a certain amount of time (such as Kafka or HDFS)
  • The storage for the state is persistent (such as HDFS)

Enable checkpointing in Flink with env.enableCheckpointing(num), where env is an instance of StreamExecutionEnvironment and num refers to the checkpoint interval in milliseconds. The env.getCheckpointConfig() configuration object is used to configure the checkpointing settings.

See the Apache Flink documentation for the available settings.

The following is an example of a checkpoint configuration.
// Configure checkpointing if interval is set
		long cpInterval = params.getLong("checkpoint.interval.millis", TimeUnit.MINUTES.toMillis(1));
		if (cpInterval > 0) {
			CheckpointConfig checkpointConf = env.getCheckpointConfig();
			checkpointConf.setCheckpointInterval(cpInterval);
			checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
			checkpointConf.setCheckpointTimeout(TimeUnit.HOURS.toMillis(1));
			checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
			env.getConfig().setUseSnapshotCompression(true);
		}