Single Message Transforms

Single Message Transforms (SMT) is a message transformation framework that you can deploy on top of Kafka Connect connectors to apply message transformations and filtering. Learn about the SMT framework as well as the transformation plugins available in CSM Operator.

Kafka Connect connectors provide ready-to-use tools to integrate between Kafka and external data stores. Still, in many use cases, the data moved by the connectors require some sanitization and transformation. To provide extra flexibility built on top of connectors, Kafka Connect also supports an SMT framework.

The SMT framework installs a transformation chain on top of connectors that modifies and filters the data on a single message basis. An SMT chain consists of transform and predicate plugins. Transform plugins are used to modify the data. For example, you can insert, replace, mask as well as perform various other modifications on the messages moved by connectors. Predicate plugins are used to add additional logic to your chain so that the transformation chain is only applied to messages which satisfy specified conditions.

The SMT framework requires that data is converted to the Kafka Connect internal data format. This data format is specific to Kafka Connect and consists of a structure and schema descriptor (SchemaAndValue) specific to Connect.

Supported SMT plugins

CSM Operator ships with and supports the following SMT plugins:
  • All Apache Kafka plugins. For more information, see Transformations in the Apache Kafka documentation.
  • The following Cloudera specific plugins:

Configuring an SMT chain

Learn how to configure a Single Message Transformation (SMT) chain for Kafka Connect connectors.

SMT chains can be configured within the configuration of a Kafka Connect connector using SMT specific configuration properties. To set up a chain, you first define your transformation chain with the transforms property and optionally define your predicates using the predicates property. Afterward, you use transforms.* and predicates.* to configure the plugins in the chain. For example, the following configuration snippet sets up a transformation chain that filters messages based on their header and removes a specified field from messages.

#...
kind: KafkaConnector
spec:
  config:
    transforms: FilterAudit,MaskField
    transforms.MaskField.type: org.apache.kafka.connect.transforms.MaskField$Value
    transforms.MaskField.fields: CreditCardNumber

    transforms.FilterAudit.type: org.apache.kafka.connect.transforms.Filter
    transforms.FilterAudit.predicate: IsAudit
    transforms.FilterAudit.negate: false

    predicates: IsAudit
    predicates.IsAudit.type: org.apache.kafka.connect.transforms.predicates.HasHeaderKey
    predicates.IsAudit.name: Audit

The following sections go through the properties in this example and give an overview on how to set up a transformation chain.

Configuring transforms

The transforms property contains a comma-separated list of transformation aliases. Each alias represents one step in the transformation chain. The aliases you add to the property are arbitrary names, they are used in other properties to configure that particular transformation step. For example, the following defines a two step transformation chain.

transforms: FilterAudit,MaskField

The transforms.[***ALIAS***].type property specifies which transformation plugin should be used in a transformation step. [***ALIAS***] is one of the aliases that you specified in transforms. The value of the property is the fully qualified name of the transformation plugin that should be used in the step. For example, the following line specifies org.apache.kafka.connect.transforms.MaskField$Value as the plugin for the MaskField step.

transforms.MaskField.type: org.apache.kafka.connect.transforms.MaskField$Value

Many transformation plugins support changing both the key and the value of a record. For these plugins, typically, a nested value or key class can be referenced as the type.

The transforms.[***ALIAS***].[***KEY***] property is used to configure the transformation plugins in your chain. This property is passed to the transformation plugin itself with transforms.[***ALIAS***] stripped from the property key. [***ALIAS***] is the alias of a plugin you specified in transforms. [***KEY***] is a property key that the plugin accepts. For example, the MaskField plugin has a fields property that specifies which fields should be removed from the structure.

transforms.MaskField.fields: CreditCardNumber

Configuring predicates

Predicates are a separate set of plugins. You use them to conditionally enable certain steps in the transformation chain. Predicates are configured in a similar way to transforms. You must specify the predicate aliases, associate the aliases with a plugin, and set plugin specific properties.

predicates: IsAudit
predicates.IsAudit.type: org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name: Audit

In this example the IsAudit predicate is an instance of the HasHeaderKey predicate plugin. This predicate returns true for records where a specific header key is present. predicates.IsAudit.name=Audit configures the predicate to look for the Audit header in the records.

After a predicate is set up, you can associate the predicate with any number of transformation steps using the predicate property. If a predicate is associated with a transformation, that transformation step is only applied to the messages that match the condition specified in the predicate.

A good example for using a predicate is the Filter transformation plugin. This is because Filter filters (drops) all messages by default. Therefore, it must be used together with predicates to specify filtering logic. For example, the following configuration instructs the SMT framework that the FilterAudit step should only be invoked for messages where the IsAudit predicate returns true. That is, all messages with the Audit header will be filtered and will not be transformed by any subsequent steps in the transformation chain.

transforms.FilterAudit.predicate: IsAudit
transforms.FilterAudit.negate: false

The condition of a predicate can be inverted using negate. If negate is set to true, the SMT framework applies the transformation to any record that does not match the condition. For example, the following configuration instructs the SMT framework that the FilterAudit step should only be invoked for messages where the IsAudit predicate returns false.

transforms.FilterAudit.predicate: IsAudit
transforms.FilterAudit.negate: true

ConvertFromBytes

ConvertFromBytes is a Cloudera specific transformation plugin that converts binary data to the Kafka Connect internal data format. You can use this plugin to make connectors that only support binary data compatible with the Single Message Transforms (SMT) framework.

Fully qualified names

  • com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Key
  • com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value

Description

The ConvertFromBytes transformation plugin accepts binary data and converts it into the Kafka Connect internal data format with a nested converter that transforms binary data. To support header based converter logic, the plugin requires a header converter to correctly transform record headers when interacting with the converter. This plugin supports both key and value conversion.

Using this plugin with connectors that only support binary data makes the connector fully compatible with the SMT framework. On their own, connectors that only support binary data have limited compatibility with transformations even if the binary data is structured. This is because transformations are only fully supported on data that is in the Kafka Connect internal data format. Binary only connectors, for example MirrorSourceConnector, emit data that has the BYTES schema and do not provide conversion to the Kafka Connect internal data format by default. When you use a binary only connector with the ConvertFromBytes plugin, the binary data is parsed into a compatible structure, which can then be further processed with the transformation chain.

Figure 1. Source connector example flow with ConvertFromBytes

Example

The following configuration example adds a ConvertFromBytes transformation as a first step of the transformation chain. The conversion uses a schemaless JSON transformation to parse the binary data. The transformation steps, the connector, or the converter, whichever comes directly after FromBytes, receives a properly structured record instead of binary data.

#...
kind: KafkaConnector
spec:
  config:
    transforms: FromBytes,...
    transforms.FromBytes.type: com.cloudera.dim.kafka.connect.transformations.convert.ConvertFromBytes$Value
    transforms.FromBytes.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.FromBytes.converter.schemals.enable: false

Configuration properties

Table 1. ConvertFromBytes properties reference
Property Default Value Required Description
converter True The fully qualified name of the converter implementation to use. For example: org.apache.kafka.connect.json.JsonConverter
header.converter org.apache.kafka.connect.storage.SimpleHeaderConverter True The fully qualified name of the header converter implementation to use. This converter must match the header converter of the connector.
converter. False A configuration prefix. Use this prefix to configure the properties of the converter specified in converter. Property keys and values specified with the prefix are passed directly to the converter with the prefix stripped. For example:
transforms.[***TRANSFORM ALIAS***].converter.[***CONVERTER PROPERTY KEY***]:[***CONVERTER PROPERTY VALUE***]
header.converter. False A configuration prefix. Use this prefix to configure the properties of the header converter specified in header.converter. Property keys and values specified with the prefix are passed directly to the header converter with the prefix stripped. For example:
transforms.[***TRANSFORM ALIAS***].converter.[***HEADER CONVERTER PROPERTY KEY***]:[***HEADER CONVERTER PROPERTY VALUE***]

ConvertToBytes

ConvertToBytes is a Cloudera specific transformation plugin that converts Kafka Connect internal data to binary data. You can use this plugin to make connectors that only support binary data compatible with the Single Message Transforms (SMT) framework.

Fully qualified names

  • com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Key
  • com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value

Description

The ConvertToBytes transformation plugin accepts data in the Kafka Connect internal data format and converts it to binary data with a nested converter. To support header based converter logic, the plugin requires a header converter to correctly transform record headers when interacting with the converter. This plugin supports both key and value conversion.

Using this plugin with connectors that only support binary data makes the connector fully compatible with the SMT framework. On their own, connectors that only support binary data have limited compatibility with transformations even if the binary data is structured. This is because the format of the data after transformations are carried out is normally the Kafka Connect internal data format. Binary only connectors, however, expect data that has the BYTES schema and do not provide conversion from the Kafka Connect internal data format by default. When you use the ConvertToBytes plugin with a binary only connector, the structured data is converted to binary format, which can then be picked up by the connector.

Figure 2. Sink connector example flow with ConvertToBytes

Example

The following configuration example adds a ConvertToBytes transformation as the last step of the transformation chain. The conversion uses a schemaless JSON transformation to serialize the structured data. The transformation steps, the connector, or the converter, whichever comes directly after ToBytes, receives a properly structured record instead of binary data.

#...
kind: KafkaConnector
spec:
  config:
    transforms: ...,ToBytes
    transforms.ToBytes.type: com.cloudera.dim.kafka.connect.transformations.convert.ConvertToBytes$Value
    transforms.ToBytes.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.ToBytes.converter.schemals.enable: false

Configuration properties

Table 2. ConvertToBytes properties reference
Property Default Value Required Description
converter True The fully qualified name of the converter implementation to use. For example: org.apache.kafka.connect.json.JsonConverter
header.converter org.apache.kafka.connect.storage.SimpleHeaderConverter True The fully qualified name of the header converter implementation to use. This converter must match the header converter of the connector.
converter. False A configuration prefix. Use this prefix to configure the properties of the converter specified in converter. Property keys and values specified with the prefix are passed directly to the converter with the prefix stripped. For example:
transforms.[***TRANSFORM ALIAS***].converter.[***CONVERTER PROPERTY KEY***]:[***CONVERTER PROPERTY VALUE***]
header.converter. False A configuration prefix. Use this prefix to configure the properties of the header converter specified in header.converter. Property keys and values specified with the prefix are passed directly to the header converter with the prefix stripped. For example:
transforms.[***TRANSFORM ALIAS***].converter.[***HEADER CONVERTER PROPERTY KEY***]:[***HEADER CONVERTER PROPERTY VALUE***]