Job management

Learn more about Flink job management.

In case you make any changes to the FlinkDeployment resource that requires a restart, the Flink Operator automatically restarts the deployment after applying a patch to the resource. For example, changing the job arguments can be done with the following:
kubectl -n flink patch FlinkDeployment flink-kubernetes-tutorial \
    --type=merge \
    --patch='{"spec":{"job":{"args":["--rowsPerSec", "100"]}}}'

In this example, patch is used as an alternative to modify the original configuration and apply the changes to the FlinkDeployment resource.

To restart the deployment without making any changes to the definition, you can update the spec.restartNonce property. This ensures that the Flink Operator automatically restarts the job if it is different from the previous value.
kubectl -n flink patch FlinkDeployment flink-kubernetes-tutorial \
    --type=merge \
    --patch='{"spec":{"restartNonce":1234}}'

Recovering missing job deployments

In case the Flink cluster deployment is deleted by accident or external process, the Flink Operator can recover the deployment when high availability is enabled. Ensure that the kubernetes.operator.jm-deployment-recovery.enabled property is enabled to recover the FlinkDeployment.

Restarting unhealthy job deployments

In case the Flink cluster deployment is considered unhealthy, the Flink Operator can restart the deployment when high availability is enabled. Ensure that the following properties are enabled to restart the Flink deployment:
  • kubernetes.operator.cluster.health-check.enabled
  • kubernetes.operator.jm-deployment-recovery.enabled
A Flink deployment is considered unhealthy in the following cases:
  • The count of Flink restarts reaches the configured value (default is 64) for kubernetes.operator.cluster.health-check.restarts.threshold property within the window period (default is 2 minutes) configured for kubernetes.operator.cluster.health-check.restarts.window.

If cluster.health-check.checkpoint-progress.enabledis enabled and the count of successful Flink checkpoints do not change within the window period (default is 5 minutes) configured for kubernetes.operator.cluster.health-check.checkpoint-progress.window

Restarting failed job deployments

In case the Flink job is failed, the Flink Operator can restart the failed job when kubernetes.operator.job.restart.failed property is enabled. In this case when the job status is FAILED the Flink Operator deletes the current job and redeploys it using the latest successful checkpoint.

Manually recovering deployments

In case the Flink deployment is in a state where the Flink Operator cannot determine the health of the application or the latest checkpoint cannot be used to recover the deployment, manual recovery can be used.

You have the following options to restore a job from the target savepoint or checkpoint:
Redeploying with savepointRedeployNonce
You can redeploy a Flink Deployment or Flink Session Job resource from a target savepoint by using the savepointRedeployNonce andinitialSavepointPath in thejob.spec as shown in the following example:
 job:
   initialSavepointPath: file://redeploy-target-savepoint
   # If not set previously, set to 1, otherwise increment, e.g. 2
   savepointRedeployNonce: 1
When changing the savepointRedeployNonce the operator will redeploy the job to the savepoint defined in the initialSavepointPath. The savepoint path must not be empty.
Deleting and recreating resources
You also have the option to completely delete and recreate the resources to solve any deployment related issues. This resets the status information to start from a clean slate. However, savepoint history will be lost and the Flink Operator will not clean up past periodic savepoints taken before the deletion. You can use the following steps to recreate the FlinkDeployment resource from a user defined savepoint path:
  1. Locate the latest checkpoint or savepoint metafile in the configured checkpoint or savepoint directory.
  2. Delete the FlinkDeployment resource of your application.
  3. Check that the current savepoint is still present, and that your FlinkDeployment resource is deleted completely.
  4. Modify the job.spec and set the initialSavepointPath to the last checkpoint location.
  5. Recreate the FlinkDeployment resource.
  6. Monitor the job to see what caused the problem before.