Managing connectors

Learn about deploying and managing Kafka Connect connectors using KafkaConnector resources. Deploying and managing connectors with KafkaConnector resources is the recommended method by Cloudera for managing connectors.

To deploy and manage connectors in Cloudera Streams Messaging - Kubernetes Operator, you use KafkaConnector resources. KafkaConnector resources describe instances of connectors and offer a Kubernetes-native approach to connector management. You create a KafkaConnector resource to deploy a connector and manage it by updating the KafkaConnector resource. The Strimzi Cluster Operator updates configurations and manages the lifecycle of the connectors.

Enabling KafkaConnector resources

KafkaConnector resources are not enabled by default for Kafka Connect clusters. To use KafkaConnector resources, the KafkaConnect resource used for deploying your Kafka Connect cluster must have the strimzi.io/use-connector-resources annotation set to true.

Full resource examples provided by Cloudera in this documentation as well as on the Cloudera Archive have the strimzi.io/use-connector-resources annotation set to true.

Rest API usage

Kafka Connect offers a REST API which is also available for use when you deploy a cluster in Cloudera Streams Messaging - Kubernetes Operator. However, usage of the API is not recommended by Cloudera, and should be limited to select use cases.

Deploying connectors

Learn how to deploy Kafka Connect connectors using KafkaConnector resources.

  • Ensure that the Strimzi Cluster Operator is installed and running. See Installation.
  • Ensure that a Kafka Connect cluster is available and running. See Deploying Kafka Connect clusters .
  • Ensure that the connectors you plan to deploy are installed in the Kafka Connect cluster. That is, the Kafka image used by your Kafka Connect cluster includes the required plugin artifacts. See Installing Kafka Connect connector plugins.
  • Ensure that the strimzi.io/use-connector-resources annotation is set to true in the Kafka Connect cluster. See Enabling KafkaConnector resources.
  • Ensure that you know the namespace where the Kafka Connect cluster is deployed. Connectors must be deployed in the same namespace as the Kafka Connect cluster they are deployed in.
  • The example resource in these steps demonstrates deployment of the MirrorHeartbeatConnector, which is installed by default in all Kafka Connect clusters.
  1. Create a YAML configuration containing the manifest for your KafkaConnector resource.
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-heartbeats-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
      tasksMax: 1
      config:
        source.cluster.alias: source
        target.cluster.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    
    • metadata.name specifies the name of the connector.
    • labels.strimzi.io/cluster specifies the name of the Kafka Connect cluster that this connector is deployed in.
    • spec.class specifies the fully qualified name of the connector plugin implementation. The connector plugin must be installed in Kafka Connect.
    • spec.taskMax specifies the maximum number of tasks this connector is allowed to create. This is an upper limit. The connector might not create the maximum number of allowed tasks.
    • spec.config includes the configuration of the connector.

    You can find additional information about supported properties in the Strimzi API Reference.

  2. Deploy the resource.
    kubectl apply --filename [***YAML CONFIG***] --namespace [***NAMESPACE***]

    Ensure that you deploy the connector in the same namespace where the Kafka Connect cluster is running.

  3. Validate that the KafkaConnector resource is created and ready.
    kubectl get kafkaconnectors --namespace [***NAMESPACE***]
    The output should list your KafkaConnector resource.
    NAME                      CLUSTER              CONNECTOR CLASS                                            MAX TASKS   READY
    #...
    my-heartbeats-connector   my-connect-cluster   org.apache.kafka.connect.mirror.MirrorHeartbeatConnector   2           True
    

Deleting connectors

You can delete a connector by deleting its corresponding KafkaConnector resource with kubectl delete.

kubectl delete kafkaconnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***]

Stopping, pausing, and resuming connectors

You can stop, pause, and resume connectors by configuring the spec.state property in the KafkaConnector resource.

#...
kind: KafkaConnector
spec:
  state: stopped

You can set spec.state to running, paused, or stopped. You resume stopped or paused connectors by setting their state to running. The default value is running.

Restarting connectors

You can restart a connector by annotating the KafkaConnector resource with strimzi.io/restart=”true”.

kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***] \
  strimzi.io/restart="true"

If the annotation is added, the Strimzi Cluster Operator immediately restarts the connector. If the initial restart fails for any reason, the Strimzi Cluster Operator attempts to restart the connector once per reconciliation loop. The annotation is automatically removed if the restart is successful.

Checking connector task IDs

You can check the task IDs of a connector by describing the KafkaConnector resource.

kubectl describe kafkaconnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***]

You can find the task IDs in Connector Status. Connector Status also includes the task state and the worker ID.

#...  
Connector Status:
    Connector:
      State:      RUNNING
      worker_id:  my-connect-cluster-connect-0.my-connect-cluster-connect.connect.svc:8083
    Name:         my-source-connector
    Tasks:
      Id:               0
      State:            RUNNING
      worker_id:        my-connect-cluster-connect-0.my-connect-cluster-connect.connect.svc:8083

Restarting connector tasks

You can restart a connector task by annotating the KafkaConnector resource with strimzi.io/restart-task=”[***TASK ID***]”.

kubectl annotate KafkaConnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***] \
  strimzi.io/restart-task="[***TASK ID***]"

If the annotation is added, the Strimzi Cluster Operator immediately restarts the connector task. If the initial restart fails for any reason, the Strimzi Cluster Operator attempts to restart the connector task once per reconciliation loop. The annotation is automatically removed if the restart is successful.

Listing connector offsets

You can list connector offsets by adding the spec.listOffsets property to the KafkaConnector resource. After the property is added, you annotate the resource with strimzi.io/connector-offsets="list". The annotation triggers the Strimzi Cluster Operator to write connector offsets to the ConfigMap specified in spec.listOffsets.

  1. Configure your KafkaConnector resource to include the spec.listOffsets property.
    #...
    kind: KafkaConnector
    spec:
      listOffsets:
        toConfigMap:
          name: [***CONFIGMAP NAME***]
    If the ConfigMap you specify does not exist, the Strimzi Cluster Operator creates it when you list connector offsets using the strimzi.io/connector-offsets="list" annotation.
  2. List connector offsets by annotating your KafkaConnector resource with strimzi.io/connector-offsets="list".
    kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
      --namespace [***NAMESPACE***] \
      strimzi.io/connector-offsets="list"
    Once the annotation is applied, the connector offsets are written to the ConfigMap specified in the spec.listOffsets property of the KafkaConnector resource. Afterward, the Strimzi Cluster Operator automatically removes the annotation.
  3. View connector offsets.
    To do this, view the ConfigMap specified in the spec.listOffsets property of the KafkaConnector resource.
    kubectl get configmap [***CONFIGMAP NAME***] \
      --namespace [***NAMESPACE***] \
      --output yaml

    The offsets are written to data[“offsets.json”] in the ConfigMap. For example:

    #...
    kind: ConfigMap
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      ownerReferences:
        - apiVersion: kafka.strimzi.io/v1beta2
          blockOwnerDeletion: false
          controller: false
          kind: KafkaConnector
          name: my-sink-connector
          uid: 76273d69-ba9a-4b60-a532-c546a44dcdc5
      resourceVersion: "12345"
      uid: aad947b4-f10f-4a65-a671-2e1e51cb29ad
    data:
      offsets.json: |-
        {
          "offsets": [
            {
              "partition": {
                "kafka_topic": "my-topic",
                "kafka_partition": 4
              },
              "offset": {
                "kafka_offset": 6
              }
            }
          ]
        }
    #...
    kind: ConfigMap
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      ownerReferences:
        - apiVersion: kafka.strimzi.io/v1beta2
          blockOwnerDeletion: false
          controller: false
          kind: KafkaConnector
          name: my-source-connector
          uid: 2298f93d-5c3d-4a8f-b611-e27b08164a73
      resourceVersion: "54321"
      uid: 6eeeeb55-d593-4f08-be25-f3ca480f41c2
    data:
      offsets.json: |-
        {
          "offsets" : [ {
            "partition" : {
              "filename" : "/data/myfile.txt"
            },
            "offset" : {
              "position" : 23582
            }
          } ]
        }

Altering connector offsets

You can alter connector offsets by stopping your connector and adding the spec.alterOffsets property to the KafkaConnector resource. The alterOffsets property specifies a ConfigMap that includes your offset changes. Following configuration, you annotate the resource with strimzi.io/connector-offsets=”alter”. The annotation triggers the Strimzi Cluster Operator to update the connector offsets. You alter connector offsets if you want to skip or reprocess certain records.

List the offsets of the connector. Listing offsets generates a ConfigMap, which you use to input your changes. Note down the name of the ConfigMap as you will need to edit its contents and specify it in the KafkaConnector resource.
  1. Configure your KafkaConnector resource to include the spec.alterOffsets property. In addition, stop the connector by setting spec.state to stopped.
    #...
    kind: KafkaConnector
    spec:
      state: stopped
      alterOffsets:
        fromConfigMap:
          name: [***CONFIGMAP NAME***]
    The ConfigMap you specify in alterOffsets is the ConfigMap that you generated by listing connector offsets.
  2. Edit the ConfigMap to include your connector offset changes.
    Make your changes in data[“offsets.json”]. For example, assume you want to reprocess some records, in this case you reset the offset position to an appropriate value.
    #...
    kind: ConfigMap
    metadata:
      ownerReferences:
        - apiVersion: kafka.strimzi.io/v1beta2
          blockOwnerDeletion: false
          controller: false
          kind: KafkaConnector
          name: my-sink-connector
          uid: 8b6d745f-5137-4524-9cc0-c67847f319d8
      resourceVersion: "19343"
      uid: da5268a3-9b94-4752-a997-15883fee105c
    data:
      offsets.json: |-
        {
          "offsets": [
            {
              "partition": {
                "kafka_topic": "test-topic",
                "kafka_partition": 2
              },
              "offset": {
                "kafka_offset": 10
              }
            }
          ]
        }
    #...
    kind: ConfigMap
    metadata:
      ownerReferences:
        - apiVersion: kafka.strimzi.io/v1beta2
          blockOwnerDeletion: false
          controller: false
          kind: KafkaConnector
          name: my-sink-connector
          uid: 8b6d745f-5137-4524-9cc0-c67847f319d8
      resourceVersion: "19343"
      uid: da5268a3-9b94-4752-a997-15883fee105c
    data:
      offsets.json: |-
        {
          "offsets": [
            {
              "partition": {
                "kafka_topic": "test-topic",
                "kafka_partition": 2
              },
              "offset": {
                "kafka_offset": 6
              }
            }
          ]
        }
  3. Alter connector offsets by annotating your KafkaConnector resource with strimzi.io/connector-offsets="alter".
    kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
      --namespace [***NAMESPACE***] \
      strimzi.io/connector-offsets="alter"
    The annotation is automatically removed by the Strimzi Cluster Operator after connector offsets are successfully updated.
  4. Optional: Verify your changes by listing connector offsets.
  5. Start the connector by setting spec.state to running in the KafkaConnector resource.
    #...
    kind: KafkaConnector
    spec:
      state: running

Resetting connector offsets

You can reset connector offsets by stopping your connector and annotating the KafkaConnector resource with strimzi.io/connector-offsets="reset". After offsets are reset, you restart your connector.

Review Listing connector offsets. In the following steps, you list connector offsets to verify changes.
  1. Stop the connector by setting spec.state in the KafkaConnector resource to stopped.
    #...
    kind: KafkaConnector
    spec:
      state: stopped
  2. Reset connector offsets by annotating the KafkaConnector resource with strimzi.io/connector-offsets="reset".
    kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
      --namespace [***NAMESPACE***] \
      strimzi.io/connector-offsets="reset"
    

    The annotation is automatically removed by the Strimzi Cluster Operator after connector offsets are successfully reset.

  3. Optional: Verify your changes by listing connector offsets.
    If connector offsets are successfully reset, data[“offsets.json”] will be empty. For example.
    #...
    kind: ConfigMap
    metadata:
    data:
      offsets.json: |-
        {
          "offsets" : []
        }
    
  4. Start the connector by setting spec.state to running in the KafkaConnector resource.
    #...
    kind: KafkaConnector
    spec:
      state: running