Savepoint management

Learn more about Flink savepoint management.

Savepoints are triggered automatically by the system during the upgrade process, as described in the previous section. You can also trigger savepoints manually or periodically, but user-created savepoints will not be used during the restoration process after the upgrade, and are not required for correct operation.

For savepoints to work, Flink requires a durable storage to save its data. You can use any type of (local or networked) mounted volumes, or object storage (for example S3, Longhorn, NFS, etc). In this documentation we use an NFS volume type.

To enable and use savepoints, you need to update the following properties (compared to the previous specifications):
  • Define a new volume to store the savepoint and mount it to the flink-main-container container.
  • Enable savepoints by adding the savepoint directory to spec.flinkConfiguration.
  • Enable checkpoints by adding the checkpoint directory to spec.flinkConfiguration.
  • Enable periodic savepoints triggered by the Flink Operator by adding kubernetes.operator.periodic.savepoint.interval: 2h.
  • Set upgradeMode to savepoint to create savepoints and resume from them before each restart.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-kubernetes-tutorial
spec:
  image: [***REGISTRY HOST***]:[***PORT***]/[***PROJECT***]/flink-kubernetes-tutorial:latest
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.savepoints.dir: file:///opt/flink/durable/savepoints
    state.checkpoints.dir: file:///opt/flink/durable/checkpoints
    high-availability.storageDir: file:///opt/flink/durable/ha
    kubernetes.operator.periodic.savepoint.interval: 2h
  serviceAccount: flink
  mode: native
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/flink/durable
              name: flink-volume
      volumes:
        - name: flink-volume
          nfs:
            server: my-nfs-server.example.com
            path: /data/flink/
  job:
    args: ["--rowsPerSec", "10", "--outputPath", "/opt/flink/durable"]
    jarURI: local:///opt/flink/usrlib/flink-kubernetes-tutorial.jar
    parallelism: 4
    state: running
    upgradeMode: savepoint
You can use the following commands to create the new deployment:
kubectl -n flink delete FlinkDeployment flink-kubernetes-tutorial
kubectl -n flink apply -f flink-deployment.yaml
After the application is running, you trigger a savepoint using the following command:
kubectl -n flink patch FlinkDeployment flink-kubernetes-tutorial \
    --type=merge \
    --patch='{"spec":{"job":{"savepointTriggerNonce":1234}}}'

In case the application is suspended, the Flink Operator automatically creates a savepoint and resumes the application from the savepoint when restarted.

The Flink Operator automatically keeps track of the savepoint history, whether it’s triggered automatically by an upgrade or manually (ad-hoc or by a periodic task). You can configure an automatic removal of older savepoints by changing the cleanup behavior as shown in the following example:
kubernetes.operator.savepoint.history.max.age: 24 h
kubernetes.operator.savepoint.history.max.count: 5

You can disable the savepoint cleanup completely by setting the kubernetes.operator.savepoint.cleanup.enabled property to false. In this case, the Flink Operator still collects and saves the savepoint history, but does not perform any cleanup operations.

Additional savepoint operations

Even though savepoints are triggered automatically during an upgrade process, you can also trigger a savepoint manually or periodically. These configurations are optional and have no impact on the automatic savepoint triggering, and not required for the correct operation of the Flink cluster.

Manually triggering a savepoint
You can use the savepointTriggerNonce property in spec.job to create a new savepoint by defining a new (different or random) value to the property:
job:
    ...
    savepointTriggerNonce: 123
This change will be applied by the Flink Operator as described in the previous sections.
Periodically triggering a savepoint
You can use the kubernetes.operator.periodic.savepoint.interval property, on a per-job level, to trigger a savepoint after a specified period:
flinkConfiguration:
    ...
    kubernetes.operator.periodic.savepoint.interval: 6h

The timely execution of the periodic savepoint is not guaranteed as it can be delayed due to unhealthy job status or other user operation.