Job management
Learn more about Flink job management.
kubectl -n flink patch FlinkDeployment flink-kubernetes-tutorial \
--type=merge \
--patch='{"spec":{"job":{"args":["--rowsPerSec", "100"]}}}'
In this example, patch
is used as an alternative to modify the original
configuration and apply the changes to the FlinkDeployment
resource.
spec.restartNonce
property. This ensures that the Flink Operator automatically
restarts the job if it is different from the previous
value.kubectl -n flink patch FlinkDeployment flink-kubernetes-tutorial \
--type=merge \
--patch='{"spec":{"restartNonce":1234}}'
Recovering missing job deployments
In case the Flink cluster deployment is deleted by accident or external process, the
Flink Operator can recover the deployment when high availability is enabled. Ensure
that the kubernetes.operator.jm-deployment-recovery.enabled
property is enabled to recover the FlinkDeployment.
Restarting unhealthy job deployments
kubernetes.operator.cluster.health-check.enabled
kubernetes.operator.jm-deployment-recovery.enabled
- The count of Flink restarts reaches the configured value (default is 64) for
kubernetes.operator.cluster.health-check.restarts.threshold
property within the window period (default is 2 minutes) configured forkubernetes.operator.cluster.health-check.restarts.window
.
If cluster.health-check.checkpoint-progress.enabled
is
enabled and the count of successful Flink checkpoints do not change within the
window period (default is 5 minutes) configured for
kubernetes.operator.cluster.health-check.checkpoint-progress.window
Restarting failed job deployments
In case the Flink job is failed, the Flink Operator can restart the failed job when
kubernetes.operator.job.restart.failed
property is enabled. In
this case when the job status is FAILED
the Flink Operator deletes
the current job and redeploys it using the latest successful checkpoint.
Manually recovering deployments
In case the Flink deployment is in a state where the Flink Operator cannot determine the health of the application or the latest checkpoint cannot be used to recover the deployment, manual recovery can be used.
- Redeploying with
savepointRedeployNonce
- You can redeploy a Flink Deployment or Flink Session Job resource from a
target savepoint by using the
savepointRedeployNonce
andinitialSavepointPath
in thejob.spec
as shown in the following example:
When changing thejob: initialSavepointPath: file://redeploy-target-savepoint # If not set previously, set to 1, otherwise increment, e.g. 2 savepointRedeployNonce: 1
savepointRedeployNonce
the operator will redeploy the job to the savepoint defined in theinitialSavepointPath
. The savepoint path must not be empty.
- Deleting and recreating resources
- You also have the option to completely delete and recreate the resources
to solve any deployment related issues. This resets the status
information to start from a clean slate. However, savepoint history will
be lost and the Flink Operator will not clean up past periodic
savepoints taken before the deletion. You can use the following steps to
recreate the FlinkDeployment resource from a user
defined savepoint path:
- Locate the latest checkpoint or savepoint metafile in the configured checkpoint or savepoint directory.
- Delete the FlinkDeployment resource of your application.
- Check that the current savepoint is still present, and that your FlinkDeployment resource is deleted completely.
- Modify the
job.spec
and set theinitialSavepointPath
to the last checkpoint location. - Recreate the FlinkDeployment resource.
- Monitor the job to see what caused the problem before.