Replication connectors and connector architecture

Replication of Kafka data is carried out by the replication connectors that you deploy in a Kafka Connect cluster dedicated to a replication flow. Get familiar with these connectors, learn about their architecture and configuration properties.

There are three different connectors that you deploy to create a replication. Each has its own purpose and carries out a different task related to replication. The replication connectors are as follows.

  • MirrorSourceConector – Replicates topics between source and target clusters.
  • MirrorCheckpointConnector – Replicates the committed group offsets between the source and target clusters.
  • MirrorHeartbeatconnector – Creates a heartbeats topic in a chosen cluster and periodically produces heartbeats into the heartbeats topic.

MirrorSourceConnector

The MirrorSourceConnector is responsible for replicating topics between the source and the target cluster.

The topics that should be replicated are specified with topic filters (also referred to as allow and deny lists) specified in topics connector property. This gives you full control over what is and what is not replicated. In addition to replicating user specified topics, the connector automatically replicates all heartbeats topics. which are created by the MirrorHeartbeatConnector.

Figure 1. MirrorSourceConnector

MirrorSourceTask

The MirrorSourceTask is created by the MirrorSourceConnector. It is responsible for executing data replication. It uses a producer for writing replicated data to the target cluster. This producer is managed by the Kafka Connect framework, all the other clients are managed by the task itself.

Each task receives its assignment from the MirrorSourceConnector as a list of topic partitions. These are assigned to the consumer. The fetched records are then forwarded to the producer. The target topic name is generated based on what replication policy is configured.

Note: Since the MirrorSourceTask instances share the load over topic partitions, there is no point setting the tasksMax property of the connector to higher than the number of topic partitions that need to be replicated.

Offset sync

In addition to replicating data, the MirrorSourceConnector also manages an offset mapping between the source and target cluster for each replicated topic partition. This offset mapping is called offset sync and it is used by the MirrorCheckpointConnector for replicating consumer group offsets.

By default the offset sync is stored in an internal Kafka topic in the source Kafka cluster. The topic is named mm2-offset-syncs.[***TARGET CLUSTER ALIAS***].internal.

The offset sync is a compact topic, which means that at least the latest mapping for each replicated topic partition is kept in the topic, but some old values with older offsets can also be present in the topic until rotation and cleanup.

With the offset.lag.max property you can influence how often a new offset sync should be created. If you create it often, your mapping will be more accurate, but if your consumer groups can lag behind, it increases the chance that offset translation will be unsuccessful. For more information, see MirrorCheckpointConnector.

MirrorSourceConnector example

The following is an example KafkaConnector resource that represents an instance of the MirrorSourceConnector.

This connector example replicates the partitions of the test topic from a Kafka cluster that is aliased as target. The topics property (topic filter) must match the topics property in the MirrorCheckpointConnector that is part of the same replication flow.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  tasksMax: 2
  config:
    topics: test
    source.cluster.alias: us-west
    source.cluster.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
    target.cluster.alias: us-east
    target.cluster.bootstrap.servers: target-cluster-kafka-bootstrap.kafka:9092
    refresh.topics.interval.seconds: 10
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

MirrorCheckpointConnector

The MirrorCheckpointConnector is responsible for replicating the committed group offsets between the source and target clusters. The offsets are translated based on the offset sync topic managed by the MirrorSourceConnector. The translated offsets are periodically applied to the consumer group offsets in the target cluster by the MirrorCheckpointConnector.

Figure 2. MirrorCheckpointConnector

MirrorCheckpointTask

The MirrorCheckpointTask is created by the MirrorCheckpointConnector. It is responsible for executing consumer group offset synchronization. It uses a producer for writing translated offsets to the target cluster. This producer is managed by the Kafka Connect framework, all the other clients are managed by the task itself.

Each task receives its assignment from the MirrorCheckpointConnector as a list of consumer groups. The offsets of the assigned consumer groups are periodically queried for the replicated topic partitions through an admin client, get translated based on the offset syncs topic and are synchronized in the target cluster consumer offsets.

Automatically apply consumer offsets in target cluster

Since sync.group.offsets.enabled is set to true by default, the offsets are periodically applied to the consumer groups in the target cluster automatically, no additional connector configuration is needed in order to make it work. The frequency of this process is controlled by the sync.group.offsets.interval.seconds property of the MirrorCheckpointConnector, which defaults to 60 seconds Cloudera recommends relying on this feature rather than applying offsets manually.

Checkpoints

The consumer groups can only be updated in the target cluster if there are no active members in the group at that time. To make sure the consumer offset information is always replicated to the target cluster, checkpoints are also created in the target cluster in an internal topic called [***SOURCE CLUSTER ALIAS***].checkpoints.internal.

This topic contains the information about each replicated consumer group where they left consuming in the source cluster. Getting the information from this topic and applying it to consumer groups is a manual and error-prone process. Cloudera recommends that you rely on the connector to apply offsets automatically.

Manual application of consumer group offsets in target cluster

If automatically applying offsets is not possible for you, you can use the topic that contains checkpoints in the target cluster anytime to extract the translated offsets from that topic and apply them manually to your consumer groups when it is appropriate and you have no active members in the related groups.

You can read the checkpoints using any KafkaConsumer client including the kafka-console-consumer.sh tool. When reading checkpoints you can also use the org.apache.kafka.connect.mirror.formatters.CheckpointFormatter to get a human readable and formatted output. Applying the offsets can be done using any admin client including the kafka-consumer-groups.sh tool.

Offset sync

Checkpointing involves translating source offsets to target offsets. This is based on the information stored in the offset sync topic. The offset sync is a compact topic, which means that at least the latest mapping is kept in the topic, but some old values with older offsets can also be present in the topic until rotation and cleanup. The offset translation is not possible if the offset to be translated from source is older than the one in the oldest available mapping. (Which happens when your consumer groups lag behind too much). If the offset translation is not possible, consumer group synchronization does not happen and checkpoints are not created. In this case, you should check and fine-tune the offset.lag.max property for MirrorSourceConnector. Configuring this property influences how often a new value should be created for each partition.

MirrorCheckpointConnector example

The following is an example KafkaConnector resource that represents an instance of the MirrorCheckpointConnector. This connector example replicates the offsets of the testgroup consumer group related to partitions of the test topic. The topics property (topics filter) must match the topics property in the MirrorSourceConnector that is part of the same replication flow.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-checkpoint-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
  tasksMax: 2
  config:
    topics: test
    groups: testgroup
    source.cluster.alias: us-east
    source.cluster.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
    target.cluster.alias: us-west
    target.cluster.bootstrap.servers: target-cluster-kafka-bootstrap.kafka:9092
    refresh.groups.interval.seconds: 10
    emit.checkpoints.interval.seconds: 10
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

MirrorHeartbeatConnector

The MirrorHeartbeatConnector is responsible for creating a heartbeats topic in a chosen cluster and to periodically produce heartbeats into the heartbeats topic.

The purpose of this is to always have at least a single topic that can be replicated. To achieve this, Cloudera recommends configuring the connector to create the heartbeats topic in the source cluster and let the MirrorSourceConnector to replicate it.

This functions as a reliable smoke test for the replication flow. This can be also helpful in edge cases where a MirrorClient is used that requires having heartbeats to discover the replication flows and upstream clusters. Configuring this connector is not required to deploy replication flow, but it is recommended.

MirrorHeartbeatTask

The MirrorHeartbeatTask is created by the MirrorHeartbeatConnector. It is responsible for producing heartbeats into the configured cluster’s heartbeats topic. It uses a producer for writing heartbeats to the heartbeats topic. This producer is managed by the Kafka Connect framework, all the other clients are managed by the task itself. There is always a single MirrorHeartbeatTask instance created by a MirrorHeartbeatConnector.

Creating heartbeats topic in source cluster

The heartbeats topic is created in the cluster specified in the target.cluster.* properties of the MirrorHeartbeatConnector. If you choose to use this connector you must ensure that the target.cluster.* properties refer to the source cluster in the replication flow.

With a setup like this, you will have a topic that is automatically replicated and acts as a reliable smoke test for your replication flow. To configure the connector to create the heartbeats in the source cluster, you override the producer client managed by Kafka Connect to connect and produce to the source Kafka cluster.

Cloudera also recommends configuring target.cluster.bootstrap.servers to point to the source cluster. In this context, the target means where to produce heartbeats, not the replication flow’s target. This property is required by other internal connector clients other than the producer.

MirrorHeartbeatConnector example

The following is an example KafkaConnector resource that represents an instance of the MirrorHeartbeatConnector.

This configuration example contains the client overrides and other settings that configure the connector to produce heartbeats to the source cluster.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-heartbeat-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
  tasksMax: 2
  config:
    source.cluster.alias: us-west
    target.cluster.alias: us-east
    target.cluster.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
    producer.override.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

Connector task and load balancing

Learn how tasks are distributed and how load is balanced in replication flows.

A typical production Kafka Connect cluster consists of multiple workers. Whenever a replication flow is configured, the replication connectors that make up a replication flow create their own tasks.

If you choose to deploy all three replication connectors, then the connectors will create one or more MirrorSourceTasks, one or more MirrorCheckpointTasks, as well as a single MirrorHeartbeatTask.

The connectors and tasks are assigned to the Kafka Connect workers in a round robin fashion.

When the Kafka Connect workers already have their assigned connectors and tasks, there can be changes that result in triggering a rebalance which means tasks and connectors should be reassigned. These changes can be the following.

  • A worker joins or leaves the group (membership change)
  • The filter for replicated topics or groups changes and the number of tasks changes because of this

The reassignment of connectors and tasks are done in a cooperative and incremental manner. This allows for the majority of the work to continue without interruption. Based on Kafka Connect group membership changes, the tasks can also be moved between workers.

Replication connector configurations

Learn what configuration properties and prefixes are available for replication connectors.

Connector properties

The replication connectors support various properties. Supported properties of the connectors can be categorized into groups. There are a number of common properties that are accepted by all three connectors. Additionally, each connector has a unique set of properties that it supports.

The following table lists each property group and provides a link to the relevant reference section of the Apache Kafka documentation.

Table 1. Replication connector properties
Property group Reference in Apache Kafka documentation
Common source connector properties Source Connector Configs | Kafka
Common replication connector properties MirrorMaker Common Configs | Kafka
MirrorSourceConnector properties MirrorMaker Source Configs | Kafka
MirrorCheckpointConnector properties MirrorMaker Checkpoint Configs | Kafka
MirrorHeartbeatConnector properties MirrorMaker Heartbeat Configs | Kafka

Configuration prefixes

All three replication connectors use multiple Kafka clients (producer, consumer, admin client) internally. These clients are created by the connector itself and are not managed by the Kafka Connect framework. You can provide common client configurations to these internal clients on different levels by using configuration prefixes.

There can be two types of variables in the prefix:

  • [***CLUSTER TYPE***] – This variable specifies the type of the cluster. This variable is either source or target.
  • [***CLIENT TYPE***] – This variable specifies the type of client. This variable can be producer, consumer, or admin.
[***CLUSTER TYPE***].cluster prefix
Properties that use the [***CLUSTER TYPE***].cluster prefix are applied to all clients used for connecting to the cluster type specified in the prefix.

For example, the following configuration ensures that all internal clients that interact with the source cluster will use the same bootstrap server.

source.cluster.bootstrap.servers=localhost:9092
[***CLIENT TYPE***] prefix
Properties that use the [***CLIENT TYPE***] prefix are applied to all clients of the type specified in the prefix regardless of what type of cluster (source or target) they connect to. This prefix has a higher precedence than the [***CLUSTER TYPE***].cluster prefix.

For example, the following configuration ensures that all clients that admin clients use the same bootstrap server.

admin.bootstrap.servers=localhost:9092
[***CLUSTER TYPE***].[***CLIENT TYPE***] prefix
Properties that use the [***CLUSTER TYPE***].[***CLIENT TYPE***] prefix are applied to all client types specified in the prefix that connect to the cluster type specified in the prefix. This prefix has a higher precedence than the [***CLIENT TYPE***] prefix.

For example, the following configuration ensures that all producers that connect to the target cluster use the same bootstrap server.

target.producer.bootstrap.servers=localhost:9093