Using Single Message Transforms in replication flows

In Cloudera Streams Messaging - Kubernetes Operator you can apply Single Message Transforms (SMT) in the connectors that make up a replication flow. Configuring an SMT chain enables you to transform the Kafka records during replication. This collection of examples demonstrates how you can transform keys and values as well as metadata in Kafka records during replication.

The following examples on key and value transformation are simple examples that are meant to demonstrate the use of the SMT framework in data replication. They might not be directly applicable or appropriate for all use cases in a production environment. Specifically, these examples use the JsonConverter with schemaless data which is handled as a Map by the Kafka Connect framework. You can replace the JsonConverter for any other converters to handle data with schema depending on your data formats present in your use case.

While it is possible to modify the topic name of a record using the SMT framework, these types of transformations should not be used in replication flows. Modifying the topic name can block replication policy and data replication as a whole.

Transforming the key or value of the Kafka records in replication flows

When the MirrorSourceTask provides Kafka records for the Kafka Connect framework, it provides them with keys and values as only bytes that have the BYTES schema. This is true even if your data inside the blob is structured data, for example JSON.

The result of this is that you can not directly manipulate the data, because most SMT plugins rely on the Kafka Connect internal data format and its schema. In this context, the BYTES schema is meaningless. You can use the ConvertFromBytes plugin with an appropriate converter to be able to run manipulations on structured data.

The following example converts each replicated message value into JSON format with the ConvertFromBytes plugin. This example assumes that the message values contain JSON data.

#...
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    transforms: ConvertFromBytes
    transforms.ConvertFromBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
    transforms.ConvertFromBytes.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.ConvertFromBytes.converter.schemas.enable: false

Adding additional transformations

You can put any transformation after the ConvertFromBytes plugin. The following example replaces two fields in the record values with the ReplaceField plugin.

#...
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    transforms: ConvertFromBytes,ReplaceField
    transforms.ConvertFromBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
    transforms.ConvertFromBytes.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.ConvertFromBytes.converter.schemas.enable: false
    transforms.ReplaceField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.ReplaceField.renames: name:replaced_name,age:replaced_age

After applying your transformation, you have to consider how to create bytes from your structured JSON. There is a required converter in the connector configuration which is applied on the records just before providing them for the Kafka connect framework’s producer.

This conversion happens after the data goes through your SMT chain. In this example, you can simply use JsonConverter as value converter, you do not need additional SMT steps to convert values back.

#...
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter

Once both the SMT chain and the converters in the connector configuration are applied, you will get a value transformation chain like the following.

Figure 1. Value conversion using ConvertFromBytes and ReplaceField
The keys were not converted to JSON, so you can use ByteArrayConverter on them, only the values need to be converted from JSON to byte array. The key transformation chain is as follows.
Figure 2. Key conversion using ByteArrayConverter

Filtering data using SMTs

If your replication flow replicates topics with different data formats, a transformation chain like the one in the examples above will fail when trying converting data of the wrong type.

A typical example of that happens when your replication flow uses a MirrorHeartbeatConnector. The heartbeats topic contains records that can not be converted into JSON. Since heartbeat records are automatically replicated by the MirrorSourceConnector, you will encounter exceptions during data conversion

In cases like this, you must use predicates to filter heartbeat records from the transformation chain.

#...
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    transforms: ConvertFromBytes,ReplaceField,ConvertToBytes
    transforms.ConvertFromBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
    transforms.ConvertFromBytes.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.ConvertFromBytes.converter.schemas.enable: false
    transforms.ReplaceField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.ReplaceField.renames: name:replaced_name,age:replaced_age
    transforms.ConvertToBytes.type: com.cloudera.dim.kafka.connect.transforms.ConvertToBytes$Value
    transforms.ConvertToBytes.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.ConvertToBytes.converter.schemas.enable: false
    predicates: NotHeartbeats
    predicates.NotHeartbeats.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    predicates.NotHeartbeats.pattern: ^(?!(.+\.)?heartbeats).*$
    transforms.ConvertFromBytes.predicate: NotHeartbeats
    transforms.ReplaceField.predicate: NotHeartbeats
    transforms.ConvertToBytes.predicate: NotHeartbeats
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

Since heartbeats records are not converted into JSON, they remain byte arrays. All the other record values, however, will be converted to JSON.

To unify the data format of the record values, you have to convert your non heartbeat record values back to byte arrays, using ConvertToBytes. After applying your configuration, all record values become byte arrays, so you can use ByteArrayConverteras the final converter. Key conversion in this case is the same as in the previous example.

Figure 3. Value conversion with a predicate that filters heartbeat records

Transforming metadata of Kafka records in replication flows

Unlike transformation of keys or values, you can transform the metadata (headers, timestamps and so on) in Kafka records without any preliminary conversion. That is, you do not need to create a chain with multiple transforms or predicates. You can simply use a single plugin like InsertHeader.

The following transformation chain example adds smt-header-key=smt-header-value as a fixed header to all of the replicated records using the InsertHeader plugin.

#...
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.mirror.MirrorSourceConnector
  config:
    transforms: InsertHeader
    transforms.InsertHeader.header: smt-header-key
    transforms.InsertHeader.type: org.apache.kafka.connect.transforms.InsertHeader
    transforms.InsertHeader.value.literal: smt-header-value