Managing connectors

Learn about deploying and managing Kafka Connect connectors using KafkaConnector resources. Deploying and managing connectors with KafkaConnector resources is the recommended method by Cloudera for managing connectors.

To deploy and manage connectors in CSM Operator, you use KafkaConnector resources. KafkaConnector resources describe instances of connectors and offer a Kubernetes-native approach to connector management. You create a KafkaConnector resource to deploy a connector and manage it by updating the KafkaConnector resource. The Strimzi Cluster Operator updates configurations and manages the lifecycle of the connectors.

Enabling KafkaConnector resources

KafkaConnector resources are not enabled by default for Kafka Connect clusters. To use KafkaConnector resources, the KafkaConnect resource used for deploying your Kafka Connect cluster must have the strimzi.io/use-connector-resources annotation set to true.

Full resource examples provided by Cloudera in this documentation as well as on the Cloudera Archive have the strimzi.io/use-connector-resources annotation set to true.

Rest API usage

Kafka Connect offers a REST API which is also available for use when you deploy a cluster in CSM Operator. However, usage of the API is not recommended by Cloudera, and should be limited to select use cases.

Deploying connectors

Learn how to deploy Kafka Connect connectors using KafkaConnector resources.

  • Ensure that the Strimzi Cluster Operator is installed and running. See Installation.
  • Ensure that a Kafka Connect cluster is available and running. See Deploying Kafka Connect clusters .
  • Ensure that the connectors you plan to deploy are installed in the Kafka Connect cluster. That is, the Kafka image used by your Kafka Connect cluster includes the required plugin artifacts. See Installing Kafka Connect connector plugins.
  • Ensure that the strimzi.io/use-connector-resources annotation is set to true in the Kafka Connect cluster. See Enabling KafkaConnector resources.
  • Ensure that you know the namespace where the Kafka Connect cluster is deployed. Connectors must be deployed in the same namespace as the Kafka Connect cluster they are deployed in.
  • The example resource in these steps demonstrates deployment of the MirrorHeartbeatConnector, which is installed by default in all Kafka Connect clusters.
  1. Create a YAML configuration containing the manifest for your KafkaConnector resource.
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-heartbeats-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
      tasksMax: 1
      config:
        source.cluster.alias: source
        target.cluster.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    
    • metadata.name specifies the name of the connector.
    • labels.strimzi.io/cluster specifies the name of the Kafka Connect cluster that this connector is deployed in.
    • spec.class specifies the fully qualified name of the connector plugin implementation. The connector plugin must be installed in Kafka Connect.
    • spec.taskMax specifies the maximum number of tasks this connector is allowed to create. This is an upper limit. The connector might not create the maximum number of allowed tasks.
    • spec.config includes the configuration of the connector.

    You can find additional information about supported properties in the Strimzi API Reference.

  2. Deploy the resource.
    kubectl apply --filename [***YAML CONFIG***] --namespace [***NAMESPACE***]

    Ensure that you deploy the connector in the same namespace where the Kafka Connect cluster is running.

  3. Validate that the KafkaConnector resource is created and ready.
    kubectl get kafkaconnectors --namespace [***NAMESPACE***]
    The output should list your KafkaConnector resource.
    NAME                      CLUSTER              CONNECTOR CLASS                                            MAX TASKS   READY
    #...
    my-heartbeats-connector   my-connect-cluster   org.apache.kafka.connect.mirror.MirrorHeartbeatConnector   2           True
    

Deleting connectors

You can delete a connector by deleting its corresponding KafkaConnector resource with kubectl delete.

kubectl delete kafkaconnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***]

Stopping, pausing, and resuming connectors

You can stop, pause, and resume connectors by configuring the spec.state property in the KafkaConnector resource.

#...
kind: KafkaConnector
spec:
  state: stopped

You can set spec.state to running, paused, or stopped. You resume stopped or paused connectors by setting their state to running. The default value is running.

Restarting connectors

You can restart a connector by annotating the KafkaConnector resource with strimzi.io/restart=”true”.

kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***] \
  strimzi.io/restart="true"

If the annotation is added, the Strimzi Cluster Operator immediately restarts the connector. If the initial restart fails for any reason, the Strimzi Cluster Operator attempts to restart the connector once per reconciliation loop. The annotation is automatically removed if the restart is successful.

Checking connector task IDs

You can check the task IDs of a connector by describing the KafkaConnector resource.

kubectl describe kafkaconnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***]

You can find the task IDs in Connector Status. Connector Status also includes the task state and the worker ID.

#...  
Connector Status:
    Connector:
      State:      RUNNING
      worker_id:  my-connect-cluster-connect-0.my-connect-cluster-connect.connect.svc:8083
    Name:         my-source-connector
    Tasks:
      Id:               0
      State:            RUNNING
      worker_id:        my-connect-cluster-connect-0.my-connect-cluster-connect.connect.svc:8083

Restarting connector tasks

You can restart a connector task by annotating the KafkaConnector resource with strimzi.io/restart-task=”[***TASK ID***]”.

kubectl annotate KafkaConnector [***CONNECTOR NAME***] \
  --namespace [***NAMESPACE***] \
  strimzi.io/restart-task="[***TASK ID***]"

If the annotation is added, the Strimzi Cluster Operator immediately restarts the connector task. If the initial restart fails for any reason, the Strimzi Cluster Operator attempts to restart the connector task once per reconciliation loop. The annotation is automatically removed if the restart is successful.