Configuring Kafka Connect clusters
Learn how you can update Kafka Connect properties in your KafkaConnect resource.
Updating Kafka Connect configurations
You update Kafka Connect configuration by editing your KafkaConnect resources.
Configurable Kafka Connect properties and exceptions
Learn which Kafka Connect properties you can configure in the KafkaConnect resource and what default values some properties have, as well as which properties are managed by Strimzi.
Kafka Connect properties are configured by adding as keys to config in your
        KafkaConnect resource. The values can be on of the following JSON
      types:
- String
- Number
- Boolean
You can find a full reference of the available Kafka Connect properties in the Apache Kafka
      documentation. All properties can be specified, however, some properties are automatically
      configured with a default value if they are not specified in spec.config.
      Also, some properties are managed by Strimzi, and cannot be changed.
Properties with default values
The following properties are configured with default values unless you specify your
    configuration in spec.config.
- group.id, the default value of which is- connect-cluster
- offset.storage.topic, the default value of which is- connect-cluster-offsets
- config.storage.topic, the default value of which is- connect-cluster-configs
- status.storage.topic, the default value of which is- connect-cluster-status
- key.converter, the default value of which is- org.apache.kafka.connect.json.JsonConverter
- value.converter, the default value of which is- org.apache.kafka.connect.json.JsonConverter
Exceptions
Strimzi takes care of configuring and managing certain properties. The values of these properties cannot be changed.
The properties Strimzi takes care of are related to the following.
- Kafka cluster bootstrap address
- Security (encryption, authentication, and authorization)
- Listener and REST interface configuration
- Plugin path configuration
This means that properties with the following prefixes cannot be set.
- bootstrap.servers
- consumer.interceptor.classes
- listeners.
- plugin.path
- producer.interceptor.classes
- rest.
- sasl.
- security.
- ssl.
If the config property contains an option that cannot be changed, it is
    disregarded, and a warning message is logged in the Strimzi Cluster Operator log. All other
    supported properties are forwarded to Kafka, including the following exceptions to the options
    configured by Strimzi:
- Any SSL configuration for supported TLS versions and cipher suites
Configuring group IDs
Kafka Connect workers use a group ID for coordinating the cluster. All Connect workers use the same group ID inside a cluster.
Make sure to choose the group ID carefully, especially if you run multiple Kafka Connect clusters using the same Kafka cluster, because the group IDs must not clash with each other.
group.id
            property.#...
kind: KafkaConnect
spec:
  config:
    group.id: my-connect-clusterConfiguring internal topics
Kafka Connect uses three internal Kafka topics to store connector and task configurations, offsets, and status.
spec.config property of the configuration file. This property
            contains the worker
                configurations.#...
kind: KafkaConnect
spec:
  config:
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3Configuring worker replica count
You can configure the number of worker pods created in the Kafka Connect cluster. Cloudera recommends having more than one worker pod for high availability.
spec.replicas
                property.#...
kind: KafkaConnect
spec:
  replicas: 3Configuring the Kafka bootstrap
You can configure the bootstrap servers of a Kafka cluster. Cloudera recommends providing multiple brokers, as this makes the connection between Kafka and Kafka Connect clusters highly available.
spec.bootstrapServers property.You can provide additional security configurations in
                    spec.authentication and
                spec.tls.
#...
kind: KafkaConnect
spec:
  bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092
  authentication:
    type: tls
    certificateAndKey:
      certificate: user.crt
      key: user.key
      secretName: connect-user
  tls:
    trustedCertificates:
      - certificate: ca.crt
        secretName: my-cluster-kafka-cluster-ca-certThis example specifies a Kafka cluster that has TLS encryption and authentication.
Enabling KafkaConnector resources
Kafka Connect connectors are managed either using KafkaConnector resources or with the Kafka Connect REST API. If you want to manage connectors using KafkaConnector resources, you must enable them in the KafkaConnect resource. Managing connectors with KafkaConnector resources is the method recommended by Cloudera.
strimzi.io/use-connector-resources annotation
            to
                true.#...
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: "true"Managing connectors with KafkaConnector resources or the Kafka Connect REST API are mutually exclusive connector management methods.
- If you do not enable the use of KafkaConnector resources, you can only use the REST API to manage connectors in this Kafka Connect cluster.
- If you enable the use of KafkaConnector resources, you can only manage connectors using KafkaConnector resources. If you make any changes over the REST API, the changes are reverted by the Strimzi Cluster Operator.
Cloudera recommends creating connectors using KafkaConnector resources, that is, enabling this annotation for all your Kafka Connect clusters. Cloudera does not recommend exposing and using Kafka Connect REST API externally, because the REST API is insecure.
Configuration providers
Learn how to provide configuration that is not in plain text key-value pair format.
Connectors connect to external systems, requiring additional connection and security configurations. The connector configurations only contain key-value pairs. For some use-cases and configurations, this is not viable. For example, credentials like passwords, access keys, or any other sensitive information should not be added as plain text to the configuration of a connector.
To support these use-cases, connectors can use ConfigProviders and configuration references. ConfigProviders are responsible for pulling configurations from external configuration stores. Kafka Connect in Cloudera Streams Messaging - Kubernetes Operator ships with various ConfigProviders that are available by default. Cloudera recommends using the following ConfigProviders.
- KubernetesSecretConfigProvider - Loads configurations from Kubernetes secrets. You can use it to store sensitive configurations securely.
- KubernetesConfigMapConfigProvider - Loads configurations from Kubernetes ConfigMaps. You can use it to group and centralize reusable configurations across multiple connectors.
- FileConfigProvider - Loads configurations from a property file. You can use it to reference properties from files available in the Kafka Connect worker file system.
ConfigProviders must be enabled in the KafkaConnect
            resource if you want to use them in your connector configuration. You enable
                ConfigProviders in spec.config.
#...
kind: KafkaConnect
spec:
  config:
    config.providers: cfmap,secret,file
    config.providers.cfmap.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider
    config.providers.secret.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProviderconnect-ns namespace, and contains a
                sasl.jaas.config
            key:#...
kind: KafkaConnector
spec:
  config:
    producer.override.sasl.jaas.config: ${secret:connect-ns/connect-secrets:sasl.jaas.config}Kubernetes*ConfigProviders
When using the Kubernetes*ConfigProviders, the Kafka Connect workers require permissions to access the configuration maps and secrets in the Kubernetes cluster. Strimzi automatically creates a ServiceAccount for the Kafka Connect worker pods. An additional Role and RoleBinding is required to make the configuration providers work.
For example, assume you have a db-credentials Secret that contains
                credentials for a database to which that your connector will connect. To establish
                access to this secret through a ConfigProvider you need to create
                the following Role and Rolebinding. 
db-credentials
                secret in the database
                namespace:apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: database
rules:
  - apiGroups: [""]
    resources: ["secrets"]
    resourceNames: ["db-credentials"]
    verbs: ["get"]apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
subjects:
  - kind: ServiceAccount
    name: my-connect-connect
    namespace: my-project
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.ioThe service account name is always generated with the [***CLUSTER
                        NAME***]-connect pattern.
Adding external configuration to Kafka Connect worker pods
Depending on the connector plugins and the connection settings of the external system, you might need to configure additional external configurations for the Kafka Connect workers that are stored in environment variables or in additional volumes. Environment variables and volumes are specified in your KafkaConnect resource.
- Adding environment variables
- Environment variables are added using
                            spec.externalConfiguration.env. For example, connectors may need a topic prefix which is stored in an environment variable.The alias for the configuration provider is used to define other configuration parameters. The provider parameters use the alias from spec.config.providers, taking the formspec.config.providers.[***ALIAS***].class.#... kind: KafkaConnect spec: config: config.providers: env config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider externalConfiguration: env: - name: TOPIC_PREFIX value: prefix-text
- Mounting additional volumes
- Additional volumes are mounted using pod and container templates
                            (spec.template.*) properties. For example, connectors might require an additional TLS truststore or keystore.Specify additional volumes for Kafka Connect workers in the spec.template.pod.volumesproperty of the KafkaConnect resource. Attach volumes to the Kafka Connect container with thespec.template.connectContainer.volumeMountsproperty.The volumes you specify are mounted under the path you specified in mountPath. In the following example, this is /mnt/dbkeystore/[***A FILE NAME FROM THE VOLUME***].#... kind: KafkaConnect spec: template: pod: volumes: - name: dbkeystore secret: secretName: dbkeystore connectContainer: volumeMounts: - name: dbkeystore mountPath: /mnt/dbkeystore
Configuring connector configuration override policy
Learn how to configure the connector configuration override policy. The override policy controls what client properties can be overridden by connectors.
The Kafka Connect framework manages Kafka clients (producers, consumers, and admin clients) used by connectors and tasks. By default, these clients use worker-level properties. You can fine-tune worker-level properties with connector configuration overrides. Properties specified with configuration overrides take priority over worker-level properties. Additionally, they can be applied on a per connector basis.
What configuration properties can be overridden is controlled by a configuration override policy. The policy is specified for Kafka Connect workers (KafkaConnect resource).
Kafka Connect includes the following policies by default.
- All - Allow overrides for all client properties (default).
- None - Do not allow any overrides.
- Principal - Only allow overriding JAAS configurations.
To configure the policy, set connector.client.config.override.policy
                in spec.config of your KafkaConnect resource.
                You can set the value of this property to the fully qualified name or the standard
                service name (for example, None) of the chosen policy.
#...
kind: KafkaConnect
spec:
  config: 
    connector.client.config.override.policy: NoneConfiguring delayed rebalance
Learn how to disable or configure the delayed rebalance of Kafka Connect workers.
By default, Kafka Connect workers operate with a delayed rebalance. This means that if a worker stops for any reason, its resources (tasks and connectors) are not immediately reassigned to a different worker.
Instead, by default, a five minute grace period is in effect. During this time, the resource assigned to the stopped worker remains unassigned. This allows the worker to restart and rejoin its group. Worker resources are only reassigned to a different worker after the five minute period is up.
This is useful if the tasks and connectors are heavy operations and you do not want them to be rescheduled immediately. However, this also means that in case of a stoppage, some worker resources might be in an idle state of up to five minutes, which leads to a temporary service outage. This is true even if the stopped worker restarts and rejoins its group before the five minutes is up.
scheduled.rebalance.max.delay.ms Kafka Connect
                property in your KafkaConnect
                resource.#...
kind: KafkaConnect
spec:
  config:
    scheduled.rebalance.max.delay.ms: 0