Using Single Message Transforms in replication flows
In CSM Operator you can apply Single Message Transforms (SMT) in the connectors that make up a replication flow. Configuring an SMT chain enables you to transform the Kafka records during replication. This collection of examples demonstrates how you can transform keys and values as well as metadata in Kafka records during replication.
The following examples on key and value transformation are simple examples that are meant to demonstrate the use of the SMT framework in data replication. They might not be directly applicable or appropriate for all use cases in a production environment. Specifically, these examples use the JsonConverter with schemaless data which is handled as a Map by the Kafka Connect framework. You can replace the JsonConverter for any other converters to handle data with schema depending on your data formats present in your use case.
While it is possible to modify the topic name of a record using the SMT framework, these types of transformations should not be used in replication flows. Modifying the topic name can block replication policy and data replication as a whole.
Transforming the key or value of the Kafka records in replication flows
When the MirrorSourceTask provides Kafka records for the Kafka
Connect framework, it provides them with keys and values as only bytes that have the
BYTES
schema. This is true even if your data inside the blob is
structured data, for example JSON.
The result of this is that you can not directly manipulate the data, because most SMT plugins rely on the Kafka Connect internal data format and its schema. In this context, the BYTES schema is meaningless. You can use the ConvertFromBytes plugin with an appropriate converter to be able to run manipulations on structured data.
The following example converts each replicated message value into JSON format with the ConvertFromBytes plugin. This example assumes that the message values contain JSON data.
#...
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
config:
transforms: ConvertFromBytes
transforms.ConvertFromBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
transforms.ConvertFromBytes.converter: org.apache.kafka.connect.json.JsonConverter
transforms.ConvertFromBytes.converter.schemas.enable: false
Adding additional transformations
You can put any transformation after the ConvertFromBytes plugin. The following example replaces two fields in the record values with the ReplaceField plugin.
#...
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
config:
transforms: ConvertFromBytes,ReplaceField
transforms.ConvertFromBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
transforms.ConvertFromBytes.converter: org.apache.kafka.connect.json.JsonConverter
transforms.ConvertFromBytes.converter.schemas.enable: false
transforms.ReplaceField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.ReplaceField.renames: name:replaced_name,age:replaced_age
After applying your transformation, you have to consider how to create bytes from your structured JSON. There is a required converter in the connector configuration which is applied on the records just before providing them for the Kafka connect framework’s producer.
This conversion happens after the data goes through your SMT chain. In this example, you can simply use JsonConverter as value converter, you do not need additional SMT steps to convert values back.
#...
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
config:
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
Once both the SMT chain and the converters in the connector configuration are applied, you will get a value transformation chain like the following.
Filtering data using SMTs
If your replication flow replicates topics with different data formats, a transformation chain like the one in the examples above will fail when trying converting data of the wrong type.
A typical example of that happens when your replication flow uses a MirrorHeartbeatConnector. The heartbeats topic contains records that can not be converted into JSON. Since heartbeat records are automatically replicated by the MirrorSourceConnector, you will encounter exceptions during data conversion
In cases like this, you must use predicates to filter heartbeat records from the transformation chain.
#...
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
config:
transforms: ConvertFromBytes,ReplaceField,ConvertToBytes
transforms.ConvertFromBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
transforms.ConvertFromBytes.converter: org.apache.kafka.connect.json.JsonConverter
transforms.ConvertFromBytes.converter.schemas.enable: false
transforms.ReplaceField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.ReplaceField.renames: name:replaced_name,age:replaced_age
transforms.ConvertToBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertToBytes$Value
transforms.ConvertToBytes.converter: org.apache.kafka.connect.json.JsonConverter
transforms.ConvertToBytes.converter.schemas.enable: false
predicates: NotHeartbeats
predicates.NotHeartbeats.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.NotHeartbeats.pattern: ^(?!(.+\.)?heartbeats).*$
transforms.ConvertFromBytes.predicate: NotHeartbeats
transforms.ReplaceField.predicate: NotHeartbeats
transforms.ConvertToBytes.predicate: NotHeartbeats
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
Since heartbeats records are not converted into JSON, they remain byte arrays. All the other record values, however, will be converted to JSON.
To unify the data format of the record values, you have to convert your non heartbeat record values back to byte arrays, using ConvertToBytes. After applying your configuration, all record values become byte arrays, so you can use ByteArrayConverteras the final converter. Key conversion in this case is the same as in the previous example.
Transforming metadata of Kafka records in replication flows
Unlike transformation of keys or values, you can transform the metadata (headers, timestamps and so on) in Kafka records without any preliminary conversion. That is, you do not need to create a chain with multiple transforms or predicates. You can simply use a single plugin like InsertHeader.
The following transformation chain example adds
smt-header-key=smt-header-value
as a fixed header to all of the replicated
records using the InsertHeader plugin.
#...
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
config:
transforms: InsertHeader
transforms.InsertHeader.header: smt-header-key
transforms.InsertHeader.type: org.apache.kafka.connect.transforms.InsertHeader
transforms.InsertHeader.value.literal: smt-header-value