Managing connectors
Learn about deploying and managing Kafka Connect connectors using KafkaConnector resources. Deploying and managing connectors with KafkaConnector resources is the recommended method by Cloudera for managing connectors.
To deploy and manage connectors in CSM Operator, you use KafkaConnector resources. KafkaConnector resources describe instances of connectors and offer a Kubernetes-native approach to connector management. You create a KafkaConnector resource to deploy a connector and manage it by updating the KafkaConnector resource. The Strimzi Cluster Operator updates configurations and manages the lifecycle of the connectors.
Enabling KafkaConnector resources
KafkaConnector resources are not enabled by default for Kafka Connect
clusters. To use KafkaConnector resources, the
KafkaConnect resource used for deploying your Kafka Connect cluster must
have the strimzi.io/use-connector-resources
annotation set to
true
.
Full resource examples provided by Cloudera in this documentation as well as on the Cloudera
Archive have the strimzi.io/use-connector-resources
annotation set to
true
.
Rest API usage
Kafka Connect offers a REST API which is also available for use when you deploy a cluster in CSM Operator. However, usage of the API is not recommended by Cloudera, and should be limited to select use cases.
Deploying connectors
Learn how to deploy Kafka Connect connectors using KafkaConnector resources.
- Ensure that the Strimzi Cluster Operator is installed and running. See Installation.
- Ensure that a Kafka Connect cluster is available and running. See Deploying Kafka Connect clusters .
- Ensure that the connectors you plan to deploy are installed in the Kafka Connect cluster. That is, the Kafka image used by your Kafka Connect cluster includes the required plugin artifacts. See Installing Kafka Connect connector plugins.
- Ensure that the
strimzi.io/use-connector-resources
annotation is set totrue
in the Kafka Connect cluster. See Enabling KafkaConnector resources. - Ensure that you know the namespace where the Kafka Connect cluster is deployed. Connectors must be deployed in the same namespace as the Kafka Connect cluster they are deployed in.
- The example resource in these steps demonstrates deployment of the MirrorHeartbeatConnector, which is installed by default in all Kafka Connect clusters.
Deleting connectors
You can delete a connector by deleting its corresponding KafkaConnector resource with kubectl delete.
kubectl delete kafkaconnector [***CONNECTOR NAME***] \
--namespace [***NAMESPACE***]
Stopping, pausing, and resuming connectors
You can stop, pause, and resume connectors by configuring the spec.state property in the KafkaConnector resource.
#...
kind: KafkaConnector
spec:
state: stopped
You can set spec.state
to running
,
paused
, or stopped
. You resume stopped or
paused connectors by setting their state to running
. The default
value is running
.
Restarting connectors
You can restart a connector by annotating the KafkaConnector resource with strimzi.io/restart=”true”.
kubectl annotate kafkaconnector [***CONNECTOR NAME***] \
--namespace [***NAMESPACE***] \
strimzi.io/restart="true"
If the annotation is added, the Strimzi Cluster Operator immediately restarts the connector. If the initial restart fails for any reason, the Strimzi Cluster Operator attempts to restart the connector once per reconciliation loop. The annotation is automatically removed if the restart is successful.
Checking connector task IDs
You can check the task IDs of a connector by describing the KafkaConnector resource.
kubectl describe kafkaconnector [***CONNECTOR NAME***] \
--namespace [***NAMESPACE***]
You can find the task IDs in Connector Status
. Connector
Status
also includes the task state and the worker ID.
#...
Connector Status:
Connector:
State: RUNNING
worker_id: my-connect-cluster-connect-0.my-connect-cluster-connect.connect.svc:8083
Name: my-source-connector
Tasks:
Id: 0
State: RUNNING
worker_id: my-connect-cluster-connect-0.my-connect-cluster-connect.connect.svc:8083
Restarting connector tasks
You can restart a connector task by annotating the KafkaConnector resource with strimzi.io/restart-task=”[***TASK ID***]”.
kubectl annotate KafkaConnector [***CONNECTOR NAME***] \
--namespace [***NAMESPACE***] \
strimzi.io/restart-task="[***TASK ID***]"
If the annotation is added, the Strimzi Cluster Operator immediately restarts the connector task. If the initial restart fails for any reason, the Strimzi Cluster Operator attempts to restart the connector task once per reconciliation loop. The annotation is automatically removed if the restart is successful.