Configuring an SMT chain
Learn how to configure a Single Message Transformation (SMT) chain for Kafka Connect connectors.
SMT chains can be configured within the configuration of a Kafka Connect connector using SMT
specific configuration properties. To set up a chain, you first define your transformation chain
with the transforms
property and optionally define your predicates using the
predicates
property. Afterward, you use transforms.*
and
predicates.*
to configure the plugins in the chain. For example, the following
configuration snippet sets up a transformation chain that filters messages based on their header
and removes a specified field from messages.
transforms=FilterAudit,MaskField transforms.MaskField.type=org.apache.kafka.connect.transforms.MaskField$Value transforms.MaskField.fields=CreditCardNumber transforms.FilterAudit.type=org.apache.kafka.connect.transforms.Filter transforms.FilterAudit.predicate=IsAudit transforms.FilterAudit.negate=false predicates=IsAudit predicates.IsAudit.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey predicates.IsAudit.name=Audit
The following sections go through the properties in this example and give an overview on how to set up a transformation chain.
Configuring transforms
The transforms
property contains a comma-separated list of transformation
aliases. Each alias represents one step in the transformation chain. The aliases you add to
the property are arbitrary names, they are used in other properties to configure that
particular transformation step. For example, the following defines a two step transformation
chain.
transforms=FilterAudit,MaskField
The transforms.[***ALIAS***].type
property specifies which
transformation plugin should be used in a transformation step.
[***ALIAS***] is one of the aliases that you specified in
transforms
. The value of the property is the fully qualified name of the
transformation plugin that should be used in the step. For example, the following line
specifies org.apache.kafka.connect.transforms.MaskField$Value
as the plugin
for the MaskField
step.
transforms.MaskField.type=org.apache.kafka.connect.transforms.MaskField$Value
Many transformation plugins support changing both the key and the value of a record. For these plugins, typically, a nested value or key class can be referenced as the type.
The
transforms.[***ALIAS***].[***KEY***]
property is used to configure the transformation plugins in your chain. This property is
passed to the transformation plugin itself with
transforms.[***ALIAS***]
stripped from the property
key. [***ALIAS***] is the alias of a plugin you specified in
transforms
. [***KEY***] is a property key that the
plugin accepts. For example, the MaskField
plugin has a
fields
property that specifies which fields should be removed from the
structure.
transforms.MaskField.fields=CreditCardNumber
Configuring predicates
Predicates are a separate set of plugins. You use them to conditionally enable certain steps in the transformation chain. Predicates are configured in a similar way to transforms. You must specify the predicate aliases, associate the aliases with a plugin, and set plugin specific properties.
predicates=IsAudit predicates.IsAudit.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey predicates.IsAudit.name=Audit
In this example the IsAudit
predicate is an instance of the
HasHeaderKey
predicate plugin. This predicate returns true for records
where a specific header key is present. predicates.IsAudit.name=Audit
configures the predicate to look for the Audit
header in the records.
After a predicate is set up, you can associate the predicate with any number of transformation
steps using the predicate
property. If a predicate is associated with a
transformation, that transformation step is only applied to the messages that match the
condition specified in the predicate.
A good example for using a predicate is the Filter
transformation plugin.
This is because Filter
filters (drops) all messages by default. Therefore,
it must be used together with predicates to specify filtering logic. For example, the
following configuration instructs the SMT framework that the FilterAudit
step should only be invoked for messages where the IsAudit
predicate
returns true. That is, all messages with the Audit
header will be filtered
and will not be transformed by any subsequent steps in the transformation chain.
transforms.FilterAudit.predicate=IsAudit transforms.FilterAudit.negate=false
The condition of a predicate can be inverted using negate
. If
negate
is set to true, the SMT framework applies the transformation to
any record that does not match the condition. For example,
the following configuration instructs the SMT framework that the
FilterAudit
step should only be invoked for messages where the
IsAudit
predicate returns false.
transforms.FilterAudit.predicate=IsAudit transforms.FilterAudit.negate=true