Migrating stateful Flink jobs

When you run Flink application with state, you must stop the Flink jobs with a savepoint, so you can restart them from the exact point on your upgraded Data Hub clusters. Based on the version of your original cluster, you can provide the savepoint path either in HDFS or S3.

The following savepoint paths are supported based on the version of your original cluster:
  • If the version of the original cluster was 7.2.9 or lower, you need to provide the savepoint path in HDFS.
  • If the version of the original cluster was 7.2.10 or higher, you need to provide the savepoint path in S3.
  1. Connect to your host where you run your Flink jobs using ssh.
    ssh <workload_username>@<master_host>
    Password: <your_workload_password>
  2. Determine which jobs you want to stop by listing the Flink job IDs.
    flink list
  3. Stop your Flink jobs with a savepoint using command line interface (CLI).
    If the version of your original cluster was 7.2.10 or higher, store the savepoint in S3:
    flink stop --savepointPath s3a://<bucket_name>/savepoints <flink_job_id>
    If the version of your original cluster was 7.2.9 or lower, store the savepoint in HDFS:
    flink stop --savepointPath hdfs:///tmp/savepoints <flink_job_id>
  4. Take note of the savepoint path in the output of the command as you must provide the path when resuming the Flink applications.
  5. Copy the savepoint from HDFS to S3, only if the version of your original cluster was 7.2.9 or lower.
    1. If the savepoint size is minimal:
      hdfs dfs -mkdir -p s3a://aa-uet2/savepoints/
      hdfs dfs -cp <savepoint_path> s3a://<bucket_name>/savepoints/
    2. If the savepoint size is large (several GB or larger):
      hdfs dfs -mkdir -p s3a://aa-uet2/savepoints/
      hadoop distcp -m 10 <savepoint_path> s3a://<bucket_name>/savepoints/
  6. Create the Data Hub cluster with the new version.
    For more information, see the Creating Streaming Analytics cluster section.
  7. Copy the savepoint from S3 to HDFS, only if the version of your original cluster was 7.2.9 or lower.
    Make sure that you provide the same path on HDFS as the original savepoint location in the original cluster.
    1. If the savepoint size is minimal:
      hdfs dfs -cp \
        s3a://<bucket_name>/savepoints/<savepoint_folder_name> \
        hdfs:///tmp/savepoints/
    2. If the savepoint size is large (several GB or larger):
      hadoop distcp -m 10 \
        s3a://<bucket_name>/savepoints/<savepoint_folder_name> \
        hdfs:///tmp/savepoints/
  8. Update the Flink job dependencies in your POM file, and rebuild your JAR file.
    For more information, see the Updating Flink job dependencies section.
  9. Resume your Flink application from savepoint.
    If the version of your original cluster was 7.2.10 or higher, restore your application from S3:
    flink run <run_arguments> \
      -fromSavepoint s3a://<bucket_name>/savepoints \
      <jar_file> <app_arguments>
    
    If the version of your original cluster was 7.2.9 or lower, restore your application from HDFS:
    flink run <run_arguments> \
      -fromSavepoint hdfs:///tmp/savepoints/<savepoint_name> \
      <jar_file> <app_arguments>