Replication connectors and connector architecture
Replication of Kafka data is carried out by the replication connectors that you deploy in a Kafka Connect cluster dedicated to a replication flow. Get familiar with these connectors, learn about their architecture and configuration properties.
There are three different connectors that you deploy to create a replication. Each has its own purpose and carries out a different task related to replication. The replication connectors are as follows.
- MirrorSourceConector – Replicates topics between source and target clusters.
- MirrorCheckpointConnector – Replicates the committed group offsets between the source and target clusters.
- MirrorHeartbeatconnector – Creates a heartbeats topic in a chosen cluster and periodically produces heartbeats into the heartbeats topic.
MirrorSourceConnector
The MirrorSourceConnector is responsible for replicating topics between the source and the target cluster.
The topics that should be replicated are specified with topic filters (also referred to as
allow and deny lists) specified in topics
connector property. This gives you
full control over what is and what is not replicated. In addition to replicating user specified
topics, the connector automatically replicates all heartbeats topics. which are created by the
MirrorHeartbeatConnector.
MirrorSourceTask
The MirrorSourceTask is created by the MirrorSourceConnector. It is responsible for executing data replication. It uses a producer for writing replicated data to the target cluster. This producer is managed by the Kafka Connect framework, all the other clients are managed by the task itself.
Each task receives its assignment from the MirrorSourceConnector as a list of topic partitions. These are assigned to the consumer. The fetched records are then forwarded to the producer. The target topic name is generated based on what replication policy is configured.
Note: Since the MirrorSourceTask instances share the load over topic
partitions, there is no point setting the tasksMax
property of the connector
to higher than the number of topic partitions that need to be replicated.
Offset sync
In addition to replicating data, the MirrorSourceConnector also manages an offset mapping between the source and target cluster for each replicated topic partition. This offset mapping is called offset sync and it is used by the MirrorCheckpointConnector for replicating consumer group offsets.
By default the offset sync is stored in an internal Kafka topic in the source Kafka cluster.
The topic is named mm2-offset-syncs.[***TARGET CLUSTER
ALIAS***].internal
.
The offset sync is a compact topic, which means that at least the latest mapping for each replicated topic partition is kept in the topic, but some old values with older offsets can also be present in the topic until rotation and cleanup.
With the offset.lag.max
property you can influence how often a new offset
sync should be created. If you create it often, your mapping will be more accurate, but if your
consumer groups can lag behind, it increases the chance that offset translation will be
unsuccessful. For more information, see MirrorCheckpointConnector.
MirrorSourceConnector example
The following is an example KafkaConnector resource that represents an instance of the MirrorSourceConnector.
This connector example replicates the partitions of the test
topic from a
Kafka cluster that is aliased as target
. The topics
property
(topic filter) must match the topics
property in the
MirrorCheckpointConnector that is part of the same replication flow.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
tasksMax: 2
config:
topics: test
source.cluster.alias: us-west
source.cluster.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
target.cluster.alias: us-east
target.cluster.bootstrap.servers: target-cluster-kafka-bootstrap.kafka:9092
refresh.topics.interval.seconds: 10
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
MirrorCheckpointConnector
The MirrorCheckpointConnector is responsible for replicating the committed group offsets between the source and target clusters. The offsets are translated based on the offset sync topic managed by the MirrorSourceConnector. The translated offsets are periodically applied to the consumer group offsets in the target cluster by the MirrorCheckpointConnector.
MirrorCheckpointTask
The MirrorCheckpointTask is created by the MirrorCheckpointConnector. It is responsible for executing consumer group offset synchronization. It uses a producer for writing translated offsets to the target cluster. This producer is managed by the Kafka Connect framework, all the other clients are managed by the task itself.
Automatically apply consumer offsets in target cluster
Since sync.group.offsets.enabled
is set to true by default, the offsets are
periodically applied to the consumer groups in the target cluster automatically, no
additional connector configuration is needed in order to make it work. The frequency of this
process is controlled by the sync.group.offsets.interval.seconds
property
of the MirrorCheckpointConnector, which defaults to 60 seconds. Having
this feature enabled is a must in any replication flow that you set up.
Checkpoints
The consumer groups can only be updated in the target cluster if there are no active members
in the group at that time. To make sure the consumer offset information is always replicated
to the target cluster, checkpoints are also created in the target cluster in an internal
topic called [***SOURCE CLUSTER
ALIAS***].checkpoints.internal
. This topic contains the information
about each replicated consumer group where they left consuming in the source cluster.
Guarantees
Checkpointing guarantees that replicated group checkpoints are monotonic. This is true as long as the upstream committed offset of the group is monotonic. This means that checkpointing prioritizes monotonicity over emitting new checkpoint records.
The difficulty in performing checkpointing consistently is the offset translation. Checkpointing relies on the offset syncs to perform offset translation from upstream to downstream offsets. The offset syncs are backed by a compact topic which ensures that the last offset sync of a partition is always kept in the topic, but it is also possible that older offset syncs are also present.
Checkpointing utilizes these older offset syncs to perform offset translation on a wide range of upstream offsets. The width of this range solely depends on the number and age of the offset sync records that are present in the backing topic. In a best-case scenario, offset translation is performed on a wide range of offsets if the offset sync history is present. In a worst-case scenario, offset translation can only happen based on the last offset sync record.
Any consumer groups which lag behind the translatable range are not checkpointed. To
fine-tune the worst-case guarantees, configure the offset.lag.max
property
for the MirrorSourceConnector. Configuring this property influences how
often a new offset sync should be created for each partition.
MirrorCheckpointConnector example
The following is an example KafkaConnector resource that represents an
instance of the MirrorCheckpointConnector. This connector example replicates
the offsets of the testgroup
consumer group related to partitions of the
test
topic. The topics
property (topics filter) must match
the topics
property in the MirrorSourceConnector that is
part of the same replication flow.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-checkpoint-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
tasksMax: 2
config:
topics: test
groups: testgroup
source.cluster.alias: us-east
source.cluster.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
target.cluster.alias: us-west
target.cluster.bootstrap.servers: target-cluster-kafka-bootstrap.kafka:9092
refresh.groups.interval.seconds: 10
emit.checkpoints.interval.seconds: 10
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
MirrorHeartbeatConnector
The MirrorHeartbeatConnector is responsible for creating a heartbeats topic in a chosen cluster and to periodically produce heartbeats into the heartbeats topic.
The purpose of this is to always have at least a single topic that can be replicated. To achieve this, Cloudera recommends configuring the connector to create the heartbeats topic in the source cluster and let the MirrorSourceConnector to replicate it.
This functions as a reliable smoke test for the replication flow. This can be also helpful in edge cases where a MirrorClient is used that requires having heartbeats to discover the replication flows and upstream clusters. Configuring this connector is not required to deploy replication flow, but it is recommended.
MirrorHeartbeatTask
The MirrorHeartbeatTask is created by the MirrorHeartbeatConnector. It is responsible for producing heartbeats into the configured cluster’s heartbeats topic. It uses a producer for writing heartbeats to the heartbeats topic. This producer is managed by the Kafka Connect framework, all the other clients are managed by the task itself. There is always a single MirrorHeartbeatTask instance created by a MirrorHeartbeatConnector.
Creating heartbeats topic in source cluster
The heartbeats topic is created in the cluster specified in the
target.cluster.*
properties of the
MirrorHeartbeatConnector. If you choose to use this connector you must
ensure that the target.cluster.* properties refer to the source cluster
in the replication flow.
With a setup like this, you will have a topic that is automatically replicated and acts as a reliable smoke test for your replication flow. To configure the connector to create the heartbeats in the source cluster, you override the producer client managed by Kafka Connect to connect and produce to the source Kafka cluster.
Cloudera also recommends configuring target.cluster.bootstrap.servers
to
point to the source cluster. In this context, the target means where to produce heartbeats,
not the replication flow’s target. This property is required by other internal connector
clients other than the producer.
MirrorHeartbeatConnector example
The following is an example KafkaConnector resource that represents an instance of the MirrorHeartbeatConnector.
This configuration example contains the client overrides and other settings that configure the connector to produce heartbeats to the source cluster.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-heartbeat-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
tasksMax: 2
config:
source.cluster.alias: us-west
target.cluster.alias: us-east
target.cluster.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
producer.override.bootstrap.servers: source-cluster-kafka-bootstrap.kafka:9092
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
Connector task and load balancing
Learn how tasks are distributed and how load is balanced in replication flows.
A typical production Kafka Connect cluster consists of multiple workers. Whenever a replication flow is configured, the replication connectors that make up a replication flow create their own tasks.
If you choose to deploy all three replication connectors, then the connectors will create one or more MirrorSourceTasks, one or more MirrorCheckpointTasks, as well as a single MirrorHeartbeatTask.
The connectors and tasks are assigned to the Kafka Connect workers in a round robin fashion.
When the Kafka Connect workers already have their assigned connectors and tasks, there can be changes that result in triggering a rebalance which means tasks and connectors should be reassigned. These changes can be the following.
- A worker joins or leaves the group (membership change)
- The filter for replicated topics or groups changes and the number of tasks changes because of this
The reassignment of connectors and tasks are done in a cooperative and incremental manner. This allows for the majority of the work to continue without interruption. Based on Kafka Connect group membership changes, the tasks can also be moved between workers.
Replication connector configurations
Learn what configuration properties and prefixes are available for replication connectors.
Connector properties
The replication connectors support various properties. Supported properties of the connectors can be categorized into groups. There are a number of common properties that are accepted by all three connectors. Additionally, each connector has a unique set of properties that it supports.
The following table lists each property group and provides a link to the relevant reference section of the Apache Kafka documentation.
Property group | Reference in Apache Kafka documentation |
---|---|
Common source connector properties | Source Connector Configs | Kafka |
Common replication connector properties | MirrorMaker Common Configs | Kafka |
MirrorSourceConnector properties | MirrorMaker Source Configs | Kafka |
MirrorCheckpointConnector properties | MirrorMaker Checkpoint Configs | Kafka |
MirrorHeartbeatConnector properties | MirrorMaker Heartbeat Configs | Kafka |
Configuration prefixes
All three replication connectors use multiple Kafka clients (producer, consumer, admin client) internally. These clients are created by the connector itself and are not managed by the Kafka Connect framework. You can provide common client configurations to these internal clients on different levels by using configuration prefixes.
There can be two types of variables in the prefix:
- [***CLUSTER TYPE***] – This variable specifies the type of the
cluster. This variable is either
source
ortarget
. - [***CLIENT TYPE***] – This variable specifies the type of client. This variable can be producer, consumer, or admin.
- [***CLUSTER TYPE***].cluster prefix
- Properties that use the [***CLUSTER TYPE***].cluster prefix are applied
to all clients used for connecting to the cluster type specified in the prefix.
For example, the following configuration ensures that all internal clients that interact with the source cluster will use the same bootstrap server.
source.cluster.bootstrap.servers=localhost:9092
- [***CLIENT TYPE***] prefix
- Properties that use the [***CLIENT TYPE***] prefix are applied to all
clients of the type specified in the prefix regardless of what type of cluster (source or
target) they connect to. This prefix has a higher precedence than the
[***CLUSTER TYPE***].cluster prefix
.For example, the following configuration ensures that all clients that admin clients use the same bootstrap server.
admin.bootstrap.servers=localhost:9092
- [***CLUSTER TYPE***].[***CLIENT TYPE***] prefix
- Properties that use the [***CLUSTER TYPE***].[***CLIENT TYPE***] prefix
are applied to all client types specified in the prefix that connect to the cluster type
specified in the prefix. This prefix has a higher precedence than the [***CLIENT
TYPE***] prefix.
For example, the following configuration ensures that all producers that connect to the target cluster use the same bootstrap server.
target.producer.bootstrap.servers=localhost:9093