Configuring Kafka Connect clusters
Learn how you can update Kafka Connect properties in your KafkaConnect resource.
Updating Kafka Connect configurations
You update Kafka Connect configuration by editing your KafkaConnect resources.
Configurable Kafka Connect properties and exceptions
Learn which Kafka Connect properties you can configure in the KafkaConnect resource and what default values some properties have, as well as which properties are managed by Strimzi.
Kafka Connect properties are configured by adding as keys to config
in your
KafkaConnect resource. The values can be on of the following JSON
types:
- String
- Number
- Boolean
You can find a full reference of the available Kafka Connect properties in the Apache Kafka
documentation. All properties can be specified, however, some properties are automatically
configured with a default value if they are not specified in spec.config
.
Also, some properties are managed by Strimzi, and cannot be changed.
Properties with default values
The following properties are configured with default values unless you specify your
configuration in spec.config
.
group.id
, the default value of which isconnect-cluster
offset.storage.topic
, the default value of which isconnect-cluster-offsets
config.storage.topic
, the default value of which isconnect-cluster-configs
status.storage.topic
, the default value of which isconnect-cluster-status
key.converter
, the default value of which isorg.apache.kafka.connect.json.JsonConverter
value.converter
, the default value of which isorg.apache.kafka.connect.json.JsonConverter
Exceptions
Strimzi takes care of configuring and managing certain properties. The values of these properties cannot be changed.
The properties Strimzi takes care of are related to the following.
- Kafka cluster bootstrap address
- Security (encryption, authentication, and authorization)
- Listener and REST interface configuration
- Plugin path configuration
This means that properties with the following prefixes cannot be set.
bootstrap.servers
consumer.interceptor.classes
listeners.
plugin.path
producer.interceptor.classes
rest.
sasl.
security.
ssl.
If the config
property contains an option that cannot be changed, it is
disregarded, and a warning message is logged in the Strimzi Cluster Operator log. All other
supported properties are forwarded to Kafka, including the following exceptions to the options
configured by Strimzi:
- Any SSL configuration for supported TLS versions and cipher suites
Configuring group IDs
Kafka Connect workers use a group ID for coordinating the cluster. All Connect workers use the same group ID inside a cluster.
Make sure to choose the group ID carefully, especially if you run multiple Kafka Connect clusters using the same Kafka cluster, because the group IDs must not clash with each other.
group.id
property.#...
kind: KafkaConnect
spec:
config:
group.id: my-connect-cluster
Configuring internal topics
Kafka Connect uses three internal Kafka topics to store connector and task configurations, offsets, and status.
spec.config
property of the configuration file. This property
contains the worker
configurations.#...
kind: KafkaConnect
spec:
config:
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
Configuring worker replica count
You can configure the number of worker pods created in the Kafka Connect cluster. Cloudera recommends having more than one worker pod for high availability.
spec.replicas
property.#...
kind: KafkaConnect
spec:
replicas: 3
Configuring the Kafka bootstrap
You can configure the bootstrap servers of a Kafka cluster. Cloudera recommends providing multiple brokers, as this makes the connection between Kafka and Kafka Connect clusters highly available.
spec.bootstrapServers
property.You can provide additional security configurations in
spec.authentication
and
spec.tls
.
#...
kind: KafkaConnect
spec:
bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092
authentication:
type: tls
certificateAndKey:
certificate: user.crt
key: user.key
secretName: connect-user
tls:
trustedCertificates:
- certificate: ca.crt
secretName: my-cluster-kafka-cluster-ca-cert
This example specifies a Kafka cluster that has TLS encryption and authentication.
Enabling KafkaConnector resources
Kafka Connect connectors are managed either using KafkaConnector resources or with the Kafka Connect REST API. If you want to manage connectors using KafkaConnector resources, you must enable them in the KafkaConnect resource. Managing connectors with KafkaConnector resources is the method recommended by Cloudera.
strimzi.io/use-connector-resources
annotation
to
true
.#...
kind: KafkaConnect
metadata:
annotations:
strimzi.io/use-connector-resources: "true"
Managing connectors with KafkaConnector resources or the Kafka Connect REST API are mutually exclusive connector management methods.
- If you do not enable the use of KafkaConnector resources, you can only use the REST API to manage connectors in this Kafka Connect cluster.
- If you enable the use of KafkaConnector resources, you can only manage connectors using KafkaConnector resources. If you make any changes over the REST API, the changes are reverted by the Strimzi Cluster Operator.
Cloudera recommends creating connectors using KafkaConnector resources, that is, enabling this annotation for all your Kafka Connect clusters. Cloudera does not recommend exposing and using Kafka Connect REST API externally, because the REST API is insecure.
Configuration providers
Learn how to provide configuration that is not in plain text key-value pair format.
Connectors connect to external systems, requiring additional connection and security configurations. The connector configurations only contain key-value pairs. For some use-cases and configurations, this is not viable. For example, credentials like passwords, access keys, or any other sensitive information should not be added as plain text to the configuration of a connector.
To support these use-cases, connectors can use ConfigProviders and configuration references. ConfigProviders are responsible for pulling configurations from external configuration stores. Kafka Connect in Cloudera Streams Messaging - Kubernetes Operator ships with various ConfigProviders that are available by default. Cloudera recommends using the following ConfigProviders.
- KubernetesSecretConfigProvider - Loads configurations from Kubernetes secrets. You can use it to store sensitive configurations securely.
- KubernetesConfigMapConfigProvider - Loads configurations from Kubernetes ConfigMaps. You can use it to group and centralize reusable configurations across multiple connectors.
- FileConfigProvider - Loads configurations from a property file. You can use it to reference properties from files available in the Kafka Connect worker file system.
ConfigProviders must be enabled in the KafkaConnect
resource if you want to use them in your connector configuration. You enable
ConfigProviders in spec.config
.
#...
kind: KafkaConnect
spec:
config:
config.providers: cfmap,secret,file
config.providers.cfmap.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider
config.providers.secret.class: io.strimzi.kafka.KubernetesSecretConfigProvider
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
connect-ns
namespace, and contains a
sasl.jaas.config
key:#...
kind: KafkaConnector
spec:
config:
producer.override.sasl.jaas.config: ${secret:connect-ns/connect-secrets:sasl.jaas.config}
Kubernetes*ConfigProviders
When using the Kubernetes*ConfigProviders, the Kafka Connect workers require permissions to access the configuration maps and secrets in the Kubernetes cluster. Strimzi automatically creates a ServiceAccount for the Kafka Connect worker pods. An additional Role and RoleBinding is required to make the configuration providers work.
For example, assume you have a db-credentials
Secret that contains
credentials for a database to which that your connector will connect. To establish
access to this secret through a ConfigProvider you need to create
the following Role and Rolebinding.
db-credentials
secret in the database
namespace:apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: database
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["db-credentials"]
verbs: ["get"]
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
subjects:
- kind: ServiceAccount
name: my-connect-connect
namespace: my-project
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
The service account name is always generated with the [***CLUSTER
NAME***]-connect
pattern.
Adding external configuration to Kafka Connect worker pods
Depending on the connector plugins and the connection settings of the external system, you might need to configure additional external configurations for the Kafka Connect workers that are stored in environment variables or in additional volumes. Environment variables and volumes are specified in your KafkaConnect resource.
- Adding environment variables
- Environment variables are added using
spec.externalConfiguration.env
. For example, connectors may need a topic prefix which is stored in an environment variable.The alias for the configuration provider is used to define other configuration parameters. The provider parameters use the alias from
spec.config.providers
, taking the formspec.config.providers.[***ALIAS***].class
.#... kind: KafkaConnect spec: config: config.providers: env config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider externalConfiguration: env: - name: TOPIC_PREFIX value: prefix-text
- Mounting additional volumes
- Additional volumes are mounted using pod and container templates
(
spec.template.*
) properties. For example, connectors might require an additional TLS truststore or keystore.Specify additional volumes for Kafka Connect workers in the
spec.template.pod.volumes
property of the KafkaConnect resource. Attach volumes to the Kafka Connect container with thespec.template.connectContainer.volumeMounts
property.The volumes you specify are mounted under the path you specified in
mountPath
. In the following example, this is /mnt/dbkeystore/[***A FILE NAME FROM THE VOLUME***].#... kind: KafkaConnect spec: template: pod: volumes: - name: dbkeystore secret: secretName: dbkeystore connectContainer: volumeMounts: - name: dbkeystore mountPath: /mnt/dbkeystore
Configuring connector configuration override policy
Learn how to configure the connector configuration override policy. The override policy controls what client properties can be overridden by connectors.
The Kafka Connect framework manages Kafka clients (producers, consumers, and admin clients) used by connectors and tasks. By default, these clients use worker-level properties. You can fine-tune worker-level properties with connector configuration overrides. Properties specified with configuration overrides take priority over worker-level properties. Additionally, they can be applied on a per connector basis.
What configuration properties can be overridden is controlled by a configuration override policy. The policy is specified for Kafka Connect workers (KafkaConnect resource).
Kafka Connect includes the following policies by default.
- All - Allow overrides for all client properties (default).
- None - Do not allow any overrides.
- Principal - Only allow overriding JAAS configurations.
To configure the policy, set connector.client.config.override.policy
in spec.config
of your KafkaConnect resource.
You can set the value of this property to the fully qualified name or the standard
service name (for example, None) of the chosen policy.
#...
kind: KafkaConnect
spec:
config:
connector.client.config.override.policy: None
Configuring delayed rebalance
Learn how to disable or configure the delayed rebalance of Kafka Connect workers.
By default, Kafka Connect workers operate with a delayed rebalance. This means that if a worker stops for any reason, its resources (tasks and connectors) are not immediately reassigned to a different worker.
Instead, by default, a five minute grace period is in effect. During this time, the resource assigned to the stopped worker remains unassigned. This allows the worker to restart and rejoin its group. Worker resources are only reassigned to a different worker after the five minute period is up.
This is useful if the tasks and connectors are heavy operations and you do not want them to be rescheduled immediately. However, this also means that in case of a stoppage, some worker resources might be in an idle state of up to five minutes, which leads to a temporary service outage. This is true even if the stopped worker restarts and rejoins its group before the five minutes is up.
scheduled.rebalance.max.delay.ms
Kafka Connect
property in your KafkaConnect
resource.#...
kind: KafkaConnect
spec:
config:
scheduled.rebalance.max.delay.ms: 0