Configuring Kafka Connect clusters

Learn how you can update Kafka Connect properties in your KafkaConnect resource.

Updating Kafka Connect configurations

You update Kafka Connect configuration by editing your KafkaConnect resources.

  1. Run the following command.
    kubectl edit kafkaconnect [***CONNECT CLUSTER NAME***]  --namespace [***NAMESPACE***]

    Running kubectl edit opens the resource manifest in an editor.

  2. Make your changes.
  3. Save the file.
    Once the changes are saved, a rolling update is triggered and the workers restart one after the other with the applied changes.

Configurable Kafka Connect properties and exceptions

Learn which Kafka Connect properties you can configure in the KafkaConnect resource and what default values some properties have, as well as which properties are managed by Strimzi.

Kafka Connect properties are configured by adding as keys to config in your KafkaConnect resource. The values can be on of the following JSON types:

  • String
  • Number
  • Boolean

You can find a full reference of the available Kafka Connect properties in the Apache Kafka documentation. All properties can be specified, however, some properties are automatically configured with a default value if they are not specified in spec.config. Also, some properties are managed by Strimzi, and cannot be changed.

Properties with default values

The following properties are configured with default values unless you specify your configuration in spec.config.

  • group.id, the default value of which is connect-cluster
  • offset.storage.topic, the default value of which is connect-cluster-offsets
  • config.storage.topic, the default value of which is connect-cluster-configs
  • status.storage.topic, the default value of which is connect-cluster-status
  • key.converter, the default value of which is org.apache.kafka.connect.json.JsonConverter
  • value.converter, the default value of which is org.apache.kafka.connect.json.JsonConverter

Exceptions

Strimzi takes care of configuring and managing certain properties. The values of these properties cannot be changed.

The properties Strimzi takes care of are related to the following.

  • Kafka cluster bootstrap address
  • Security (encryption, authentication, and authorization)
  • Listener and REST interface configuration
  • Plugin path configuration

This means that properties with the following prefixes cannot be set.

  • bootstrap.servers
  • consumer.interceptor.classes
  • listeners.
  • plugin.path
  • producer.interceptor.classes
  • rest.
  • sasl.
  • security.
  • ssl.

If the config property contains an option that cannot be changed, it is disregarded, and a warning message is logged in the Strimzi Cluster Operator log. All other supported properties are forwarded to Kafka, including the following exceptions to the options configured by Strimzi:

  • Any SSL configuration for supported TLS versions and cipher suites

Configuring group IDs

Kafka Connect workers use a group ID for coordinating the cluster. All Connect workers use the same group ID inside a cluster.

Make sure to choose the group ID carefully, especially if you run multiple Kafka Connect clusters using the same Kafka cluster, because the group IDs must not clash with each other.

Configure the group ID by setting the value of the group.id property.
#...
kind: KafkaConnect
spec:
  config:
    group.id: my-connect-cluster

Configuring internal topics

Kafka Connect uses three internal Kafka topics to store connector and task configurations, offsets, and status.

Configure the internal Kafka topics in the spec.config property of the configuration file. This property contains the worker configurations.
#...
kind: KafkaConnect
spec:
  config:
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3

Configuring worker replica count

You can configure the number of worker pods created in the Kafka Connect cluster. Cloudera recommends having more than one worker pod for high availability.

Configure the number of worker pods created by setting the value of the spec.replicas property.
#...
kind: KafkaConnect
spec:
  replicas: 3

Configuring the Kafka bootstrap

You can configure the bootstrap servers of a Kafka cluster. Cloudera recommends providing multiple brokers, as this makes the connection between Kafka and Kafka Connect clusters highly available.

Configure the bootstrap servers of the Kafka cluster by setting the value of the spec.bootstrapServers property.

You can provide additional security configurations in spec.authentication and spec.tls.

#...
kind: KafkaConnect
spec:
  bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092
  authentication:
    type: tls
    certificateAndKey:
      certificate: user.crt
      key: user.key
      secretName: connect-user
  tls:
    trustedCertificates:
      - certificate: ca.crt
        secretName: my-cluster-kafka-cluster-ca-cert

This example specifies a Kafka cluster that has TLS encryption and authentication.

Enabling KafkaConnector resources

Kafka Connect connectors are managed either using KafkaConnector resources or with the Kafka Connect REST API. If you want to manage connectors using KafkaConnector resources, you must enable them in the KafkaConnect resource. Managing connectors with KafkaConnector resources is the method recommended by Cloudera.

You enable KafkaConnector resource by setting the strimzi.io/use-connector-resources annotation to true.
#...
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: "true"

Managing connectors with KafkaConnector resources or the Kafka Connect REST API are mutually exclusive connector management methods.

  • If you do not enable the use of KafkaConnector resources, you can only use the REST API to manage connectors in this Kafka Connect cluster.
  • If you enable the use of KafkaConnector resources, you can only manage connectors using KafkaConnector resources. If you make any changes over the REST API, the changes are reverted by the Strimzi Cluster Operator.

Cloudera recommends creating connectors using KafkaConnector resources, that is, enabling this annotation for all your Kafka Connect clusters. Cloudera does not recommend exposing and using Kafka Connect REST API externally, because the REST API is insecure.

Configuration providers

Learn how to provide configuration that is not in plain text key-value pair format.

Connectors connect to external systems, requiring additional connection and security configurations. The connector configurations only contain key-value pairs. For some use-cases and configurations, this is not viable. For example, credentials like passwords, access keys, or any other sensitive information should not be added as plain text to the configuration of a connector.

To support these use-cases, connectors can use ConfigProviders and configuration references. ConfigProviders are responsible for pulling configurations from external configuration stores. Kafka Connect in Cloudera Streams Messaging - Kubernetes Operator ships with various ConfigProviders that are available by default. Cloudera recommends using the following ConfigProviders.

  • KubernetesSecretConfigProvider - Loads configurations from Kubernetes secrets. You can use it to store sensitive configurations securely.
  • KubernetesConfigMapConfigProvider - Loads configurations from Kubernetes ConfigMaps. You can use it to group and centralize reusable configurations across multiple connectors.
  • FileConfigProvider - Loads configurations from a property file. You can use it to reference properties from files available in the Kafka Connect worker file system.

ConfigProviders must be enabled in the KafkaConnect resource if you want to use them in your connector configuration. You enable ConfigProviders in spec.config.

This example enables the ConfigProviders recommended by Cloudera.
#...
kind: KafkaConnect
spec:
  config:
    config.providers: cfmap,secret,file
    config.providers.cfmap.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider
    config.providers.secret.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
Example config provider usage of a secret, where the secret resource called connect-secrets is located in the connect-ns namespace, and contains a sasl.jaas.config key:
#...
kind: KafkaConnector
spec:
  config:
    producer.override.sasl.jaas.config: ${secret:connect-ns/connect-secrets:sasl.jaas.config}

Kubernetes*ConfigProviders

When using the Kubernetes*ConfigProviders, the Kafka Connect workers require permissions to access the configuration maps and secrets in the Kubernetes cluster. Strimzi automatically creates a ServiceAccount for the Kafka Connect worker pods. An additional Role and RoleBinding is required to make the configuration providers work.

For example, assume you have a db-credentials Secret that contains credentials for a database to which that your connector will connect. To establish access to this secret through a ConfigProvider you need to create the following Role and Rolebinding.

For example, the following role grants access to the db-credentials secret in the database namespace:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: database
rules:
  - apiGroups: [""]
    resources: ["secrets"]
    resourceNames: ["db-credentials"]
    verbs: ["get"]
The following RoleBinding binds the new Role to the Connect ServiceAccount:
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
subjects:
  - kind: ServiceAccount
    name: my-connect-connect
    namespace: my-project
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io

The service account name is always generated with the [***CLUSTER NAME***]-connect pattern.

Adding external configuration to Kafka Connect worker pods

Depending on the connector plugins and the connection settings of the external system, you might need to configure additional external configurations for the Kafka Connect workers that are stored in environment variables or in additional volumes. Environment variables and volumes are specified in your KafkaConnect resource.

Adding environment variables
Environment variables are added using spec.externalConfiguration.env. For example, connectors may need a topic prefix which is stored in an environment variable.

The alias for the configuration provider is used to define other configuration parameters. The provider parameters use the alias from spec.config.providers, taking the form spec.config.providers.[***ALIAS***].class.

#...
kind: KafkaConnect
spec:
  config:
    config.providers: env
    config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
  externalConfiguration:
    env:
      - name: TOPIC_PREFIX
        value: prefix-text
Mounting additional volumes
Additional volumes are mounted using pod and container templates (spec.template.*) properties. For example, connectors might require an additional TLS truststore or keystore.

Specify additional volumes for Kafka Connect workers in the spec.template.pod.volumes property of the KafkaConnect resource. Attach volumes to the Kafka Connect container with the spec.template.connectContainer.volumeMounts property.

The volumes you specify are mounted under the path you specified in mountPath. In the following example, this is /mnt/dbkeystore/[***A FILE NAME FROM THE VOLUME***].

#...
kind: KafkaConnect
spec:
  template:
    pod:
      volumes:
        - name: dbkeystore
          secret:
            secretName: dbkeystore
    connectContainer:
      volumeMounts:
        - name: dbkeystore
          mountPath: /mnt/dbkeystore

Configuring connector configuration override policy

Learn how to configure the connector configuration override policy. The override policy controls what client properties can be overridden by connectors.

The Kafka Connect framework manages Kafka clients (producers, consumers, and admin clients) used by connectors and tasks. By default, these clients use worker-level properties. You can fine-tune worker-level properties with connector configuration overrides. Properties specified with configuration overrides take priority over worker-level properties. Additionally, they can be applied on a per connector basis.

What configuration properties can be overridden is controlled by a configuration override policy. The policy is specified for Kafka Connect workers (KafkaConnect resource).

Kafka Connect includes the following policies by default.

  • All - Allow overrides for all client properties (default).
  • None - Do not allow any overrides.
  • Principal - Only allow overriding JAAS configurations.

To configure the policy, set connector.client.config.override.policy in spec.config of your KafkaConnect resource. You can set the value of this property to the fully qualified name or the standard service name (for example, None) of the chosen policy.

#...
kind: KafkaConnect
spec:
  config: 
    connector.client.config.override.policy: None

Configuring delayed rebalance

Learn how to disable or configure the delayed rebalance of Kafka Connect workers.

By default, Kafka Connect workers operate with a delayed rebalance. This means that if a worker stops for any reason, its resources (tasks and connectors) are not immediately reassigned to a different worker.

Instead, by default, a five minute grace period is in effect. During this time, the resource assigned to the stopped worker remains unassigned. This allows the worker to restart and rejoin its group. Worker resources are only reassigned to a different worker after the five minute period is up.

This is useful if the tasks and connectors are heavy operations and you do not want them to be rescheduled immediately. However, this also means that in case of a stoppage, some worker resources might be in an idle state of up to five minutes, which leads to a temporary service outage. This is true even if the stopped worker restarts and rejoins its group before the five minutes is up.

You can configure the delay to be shorter, or disable it altogether. To do this, configure the scheduled.rebalance.max.delay.ms Kafka Connect property in your KafkaConnect resource.
#...
kind: KafkaConnect
spec:
  config:
    scheduled.rebalance.max.delay.ms: 0