Single Message Transforms
Single Message Transforms (SMT) is a message transformation framework that you can deploy on top of Kafka Connect connectors to apply message transformations and filtering. Learn about the SMT framework as well as the transformation plugins available in CSM Operator.
Kafka Connect connectors provide ready-to-use tools to integrate between Kafka and external data stores. Still, in many use cases, the data moved by the connectors require some sanitization and transformation. To provide extra flexibility built on top of connectors, Kafka Connect also supports an SMT framework.
The SMT framework installs a transformation chain on top of connectors that modifies and filters the data on a single message basis. An SMT chain consists of transform and predicate plugins. Transform plugins are used to modify the data. For example, you can insert, replace, mask as well as perform various other modifications on the messages moved by connectors. Predicate plugins are used to add additional logic to your chain so that the transformation chain is only applied to messages which satisfy specified conditions.
The SMT framework requires that data is converted to the Kafka Connect internal data format.
This data format is specific to Kafka Connect and consists of a structure and schema
descriptor (SchemaAndValue
) specific to Connect.
Supported SMT plugins
- All Apache Kafka plugins. For more information, see Transformations in the Apache Kafka documentation.
- The following Cloudera specific plugins:
Configuring an SMT chain
Learn how to configure a Single Message Transformation (SMT) chain for Kafka Connect connectors.
SMT chains can be configured within the configuration of a Kafka Connect connector using SMT
specific configuration properties. To set up a chain, you first define your transformation chain
with the transforms
property and optionally define your predicates using the
predicates
property. Afterward, you use transforms.*
and
predicates.*
to configure the plugins in the chain. For example, the following
configuration snippet sets up a transformation chain that filters messages based on their header
and removes a specified field from messages.
#...
kind: KafkaConnector
spec:
config:
transforms: FilterAudit,MaskField
transforms.MaskField.type: org.apache.kafka.connect.transforms.MaskField$Value
transforms.MaskField.fields: CreditCardNumber
transforms.FilterAudit.type: org.apache.kafka.connect.transforms.Filter
transforms.FilterAudit.predicate: IsAudit
transforms.FilterAudit.negate: false
predicates: IsAudit
predicates.IsAudit.type: org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name: Audit
The following sections go through the properties in this example and give an overview on how to set up a transformation chain.
Configuring transforms
The transforms
property contains a comma-separated list of transformation
aliases. Each alias represents one step in the transformation chain. The aliases you add to
the property are arbitrary names, they are used in other properties to configure that
particular transformation step. For example, the following defines a two step transformation
chain.
transforms: FilterAudit,MaskField
The transforms.[***ALIAS***].type
property specifies which
transformation plugin should be used in a transformation step.
[***ALIAS***] is one of the aliases that you specified in
transforms
. The value of the property is the fully qualified name of the
transformation plugin that should be used in the step. For example, the following line
specifies org.apache.kafka.connect.transforms.MaskField$Value
as the plugin
for the MaskField
step.
transforms.MaskField.type: org.apache.kafka.connect.transforms.MaskField$Value
Many transformation plugins support changing both the key and the value of a record. For these plugins, typically, a nested value or key class can be referenced as the type.
The
transforms.[***ALIAS***].[***KEY***]
property is used to configure the transformation plugins in your chain. This property is
passed to the transformation plugin itself with
transforms.[***ALIAS***]
stripped from the property
key. [***ALIAS***] is the alias of a plugin you specified in
transforms
. [***KEY***] is a property key that the
plugin accepts. For example, the MaskField
plugin has a
fields
property that specifies which fields should be removed from the
structure.
transforms.MaskField.fields: CreditCardNumber
Configuring predicates
Predicates are a separate set of plugins. You use them to conditionally enable certain steps in the transformation chain. Predicates are configured in a similar way to transforms. You must specify the predicate aliases, associate the aliases with a plugin, and set plugin specific properties.
predicates: IsAudit
predicates.IsAudit.type: org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name: Audit
In this example the IsAudit
predicate is an instance of the
HasHeaderKey
predicate plugin. This predicate returns true for records
where a specific header key is present. predicates.IsAudit.name=Audit
configures the predicate to look for the Audit
header in the records.
After a predicate is set up, you can associate the predicate with any number of transformation
steps using the predicate
property. If a predicate is associated with a
transformation, that transformation step is only applied to the messages that match the
condition specified in the predicate.
A good example for using a predicate is the Filter
transformation plugin.
This is because Filter
filters (drops) all messages by default. Therefore,
it must be used together with predicates to specify filtering logic. For example, the
following configuration instructs the SMT framework that the FilterAudit
step should only be invoked for messages where the IsAudit
predicate
returns true. That is, all messages with the Audit
header will be filtered
and will not be transformed by any subsequent steps in the transformation chain.
transforms.FilterAudit.predicate: IsAudit
transforms.FilterAudit.negate: false
The condition of a predicate can be inverted using negate
. If
negate
is set to true, the SMT framework applies the transformation to
any record that does not match the condition. For example,
the following configuration instructs the SMT framework that the
FilterAudit
step should only be invoked for messages where the
IsAudit
predicate returns false.
transforms.FilterAudit.predicate: IsAudit
transforms.FilterAudit.negate: true
ConvertFromBytes
ConvertFromBytes is a Cloudera specific transformation plugin that converts binary data to the Kafka Connect internal data format. You can use this plugin to make connectors that only support binary data compatible with the Single Message Transforms (SMT) framework.
Fully qualified names
- com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Key
- com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
Description
The ConvertFromBytes
transformation plugin accepts binary data and converts
it into the Kafka Connect internal data format with a nested converter that transforms binary
data. To support header based converter logic, the
plugin requires a header converter to correctly transform record headers when interacting with
the converter. This plugin supports both key and value conversion.
Using this plugin with connectors that only support binary data makes the connector fully
compatible with the SMT framework. On their own, connectors that only support binary data have
limited compatibility with transformations even if the binary data is structured. This is
because transformations are only fully supported on data that is in the Kafka Connect internal
data format. Binary only connectors, for example
MirrorSourceConnector
, emit data that has the BYTES schema and do not provide
conversion to the Kafka Connect internal data format by default. When you use a binary
only connector with the ConvertFromBytes
plugin, the binary data is parsed into
a compatible structure, which can then be further processed with the transformation chain.
Example
The following configuration example adds a ConvertFromBytes
transformation as
a first step of the transformation chain. The conversion uses a schemaless JSON transformation
to parse the binary data. The transformation steps, the connector, or the converter, whichever
comes directly after FromBytes
, receives a properly structured record instead
of binary data.
#...
kind: KafkaConnector
spec:
config:
transforms: FromBytes,...
transforms.FromBytes.type: com.cloudera.dim.kafka.connect.transformations.convert.ConvertFromBytes$Value
transforms.FromBytes.converter: org.apache.kafka.connect.json.JsonConverter
transforms.FromBytes.converter.schemals.enable: false
Configuration properties
Property | Default Value | Required | Description |
---|---|---|---|
converter | True | The fully qualified name of the converter implementation to use. For example: org.apache.kafka.connect.json.JsonConverter |
|
header.converter | org.apache.kafka.connect.storage.SimpleHeaderConverter |
True | The fully qualified name of the header converter implementation to use. This converter must match the header converter of the connector. |
converter. | False | A configuration prefix. Use this prefix to configure the properties of the converter
specified in converter . Property keys and values specified with the prefix
are passed directly to the converter with the prefix stripped. For example:
|
|
header.converter. | False | A configuration prefix. Use this prefix to configure the properties of the header
converter specified in header.converter . Property keys and values specified
with the prefix are passed directly to the header converter with the prefix stripped. For
example:
|
ConvertToBytes
ConvertToBytes is a Cloudera specific transformation plugin that converts Kafka Connect internal data to binary data. You can use this plugin to make connectors that only support binary data compatible with the Single Message Transforms (SMT) framework.
Fully qualified names
- com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Key
- com.cloudera.dim.kafka.connect.transforms.ConvertFromBytes$Value
Description
The ConvertToBytes
transformation plugin accepts data in the Kafka Connect
internal data format and converts it to binary data with a nested converter. To support header
based converter logic, the
plugin requires a header converter to correctly transform record headers when interacting with
the converter. This plugin supports both key and value conversion.
Using this plugin with connectors that only support binary data makes the connector fully
compatible with the SMT framework. On their own,
connectors that only support binary data have limited compatibility with transformations even
if the binary data is structured. This is because the format of the data after
transformations are carried out is normally the Kafka Connect internal data format. Binary only
connectors, however, expect data that has the BYTES schema and do not provide conversion from
the Kafka Connect internal data format by default. When you use the
ConvertToBytes
plugin with a binary only connector, the structured data is
converted to binary format, which can then be picked up by the connector.
Example
The following configuration example adds a ConvertToBytes transformation as
the last step of the transformation chain. The conversion uses a schemaless JSON transformation
to serialize the structured data. The transformation steps, the connector, or the converter,
whichever comes directly after ToBytes
, receives a properly structured record
instead of binary data.
#...
kind: KafkaConnector
spec:
config:
transforms: ...,ToBytes
transforms.ToBytes.type: com.cloudera.dim.kafka.connect.transformations.convert.ConvertToBytes$Value
transforms.ToBytes.converter: org.apache.kafka.connect.json.JsonConverter
transforms.ToBytes.converter.schemals.enable: false
Configuration properties
Property | Default Value | Required | Description |
---|---|---|---|
converter | True | The fully qualified name of the converter implementation to use. For example: org.apache.kafka.connect.json.JsonConverter |
|
header.converter | org.apache.kafka.connect.storage.SimpleHeaderConverter |
True | The fully qualified name of the header converter implementation to use. This converter must match the header converter of the connector. |
converter. | False | A configuration prefix. Use this prefix to configure the properties of the converter
specified in converter . Property keys and values specified with the prefix
are passed directly to the converter with the prefix stripped. For example:
|
|
header.converter. | False | A configuration prefix. Use this prefix to configure the properties of the header
converter specified in header.converter . Property keys and values specified
with the prefix are passed directly to the header converter with the prefix stripped. For
example:
|