Upgrading CSA Operator

Complete these steps to upgrade CSA Operator.

Upgrading Cloudera Streaming Analytics Operator consists of three steps:

  1. upgrading the Flink CRDs to the new version,
  2. upgrading the cluster operator using Helm commands, and
  3. upgrading the Flink jobs with the proper image and Flink version.

Upgrading the cluster operator may affect the Flink resources in the cluster. All Flink clusters that specify the version of the cluster (but not the image) will be restarted during the cluster operator upgrade, due to the fact that the default image of all versions changes with the Flink upgrade, triggering a restart.

  • Ensure that your Kubernetes environment meets requirements listed in System requirements.
  • Ensure that you have access to your Cloudera credentials (username and password). Credentials are required to access the Cloudera Archive and Cloudera Docker registry where upgrade artifacts are hosted.
  1. Upgrade the Flink CRDs
    Upgrade the CRDs for FlinkDeployment and FlinkSessionJob resources:
    # download and extract the helm chart
    helm pull oci://container.repository.cloudera.com/cloudera-helm/csa-operator/csa-operator --version 1.1.1-b13
    tar xf csa-operator-1.1.1-b13.tgz
    kubectl replace -f csa-operator/charts/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
    kubectl replace -f csa-operator/charts/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
                    
  2. Upgrade the cluster operator using helm commands
    Upgrade the Helm deployment:
    helm upgrade [*** CUSTOM SETTINGS ***] csa-operator oci://container.repository.cloudera.com/cloudera-helm/csa-operator/csa-operator --version 1.1.1-b13
  3. Upgrade the Flink jobs with the proper image and Flink version
    1. Updated custom Flink applications to use the latest Flink and connector versions in the pom.xml file of the project:
      1. Update the flink.version property to 1.19.1-csaop1.1.0
      2. >Update all connectors to match the version. For example:

        <flink.version<1.19.1-csaop1.1.0<flink.version<
        <flink.cloudera.connector.version<1.0-csaop1.1.0<flink.cloudera.connector.version<
        <flink.kafka.connector.version<3.2-csaop1.1.0<flink.kafka.connector.version<
        
    2. Update any custom docker images built for Flink jobs to be based on the latest Flink Image:
      # Dockerfile
      FROM container.repository.cloudera.com/cloudera/flink:1.19.1-csaop1.1.1-b13
      
    3. Rebuild any custom Flink application jars and/or docker images and push them to their respective repositories.
    4. Suspend all running Flink jobs:
      kubectl --namespace [*** NAMESPACE ***] patch FlinkDeployment [*** DEPLOYMENT NAME ***] --type=merge --patch='{"spec":{"job":{"state":"suspended"}}}'
      
      kubectl --namespace [*** NAMESPACE ***] patch FlinkSessionJob [*** DEPLOYMENT NAME ***] --type=merge --patch='{"spec":{"job":{"state":"suspended"}}}'
      
    5. Update all FlinkDeployment objects to use the proper image and flinkVersion:
      apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment
      metadata:
        name: [*** NAME ***]
        namespace: [*** NAMESPACE ***]
      spec:
        flinkVersion: v1_19
        image: container.repository.cloudera.com/cloudera/flink:1.19.1-csaop1.1.1-b13
        [...]
      
    6. If the name of the jar files have changed because of the update, update the jarURI properties of the FlinkDeployment and FlinkSessionJob objects accordingly:
      apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment # or FlinkSessionJob
      metadata:
        name: [*** NAME ***]
        namespace: [*** NAMESPACE ***]
      spec:
        [...]
        job:
           jarURI: local:///opt/flink/example.jar # ensure this is correct
        [...]
      
    7. Resume the suspended Flink jobs:
      kubectl --namespace [*** NAMESPACE ***] patch FlinkDeployment [*** DEPLOYMENT NAME ***] --type=merge --patch='{"spec":{"job":{"state":"running"}}}'
      
      kubectl --namespace [*** NAMESPACE ***] patch FlinkSessionJob [*** DEPLOYMENT NAME ***] --type=merge --patch='{"spec":{"job":{"state":"running"}}}'