Configuring an SMT chain
Learn how to configure a Single Message Transformation (SMT) chain for Kafka Connect connectors.
SMT chains can be configured within the configuration of a Kafka Connect connector using SMT
specific configuration properties. To set up a chain, you first define your transformation chain
with the transforms
property and optionally define your predicates using the
predicates
property. Afterward, you use transforms.*
and
predicates.*
to configure the plugins in the chain. For example, the following
configuration snippet sets up a transformation chain that filters messages based on their header
and removes a specified field from messages.
transforms=FilterAudit,MaskField,
transforms.MaskField.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.MaskField.fields=CreditCardNumber
transforms.FilterAudit.type=org.apache.kafka.connect.transforms.Filter
transforms.FilterAudit.predicate=IsAudit
transforms.FilterAudit.negate=false
predicates=IsAudit
predicates.IsAudit.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name=Audit
The following sections go through the properties in this example and give an overview on how to set up a transformation chain.
Configuring transforms
transforms
property contains a comma-separated list of transformation
aliases. Each alias represents one step in the transformation chain. The aliases you add to the
property are arbitrary names, they are used in other properties to configure that particular
transformation step. For example, the following defines a two step transformation
chain.transforms=FilterAudit,MaskField
transforms.[***ALIAS***].type
property specifies which
transformation plugin should be used in a transformation step. [***ALIAS***]
is one of the aliases that you specified in transforms
. The value of the
property is the fully qualified name of the transformation plugin that should be used in the
step. For example, the following line specifies
org.apache.kafka.connect.transforms.MaskField$Value
as the plugin for the
MaskField
step.transforms.MaskField.type=org.apache.kafka.connect.transforms.MaskField$Value
Many
transformation plugins support changing both the key and the value of a record. For these
plugins, typically, a nested value or key class can be referenced as the type.transforms.[***ALIAS***].[***KEY***]
property is used to configure the transformation plugins in your chain. This property is passed
to the transformation plugin itself with
transforms.[***ALIAS***]
stripped from the property key.
[***ALIAS***] is the alias of a plugin you specified in
transforms
. [***KEY***] is a property key that the plugin
accepts. For example, the MaskField
plugin has a fields
property that specifies which fields should be removed from the
structure.transforms.MaskField.fields=CreditCardNumber
Configuring predicates
predicates=IsAudit
predicates.IsAudit.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name=Audit
In this example the IsAudit
predicate is an instance of the
HasHeaderKey
predicate plugin. This predicate returns true for records where a
specific header key is present. predicates.IsAudit.name=Audit
configures the
predicate to look for the Audit
header in the records.After a predicate is set up, you can associate the predicate with any number of transformation
steps using the predicate
property. If a predicate is associated with a
transformation, that transformation step is only applied to the messages that match the
condition specified in the predicate.
Filter
transformation plugin.
This is because Filter
filters (drops) all messages by default. Therefore, it
must be used together with predicates to specify filtering logic. For example, the following
configuration instructs the SMT framework that the FilterAudit
step should only
be invoked for messages where the IsAudit
predicate returns true. That is, all
messages with the Audit
header will be filtered and will not be transformed by
any subsequent steps in the transformation
chain.transforms.FilterAudit.predicate=IsAudit
transforms.FilterAudit.negate=false
negate
. If
negate
is set to true, the SMT framework applies the transformation to any
record that does not match the condition. For example, the
following configuration instructs the SMT framework that the FilterAudit
step
should only be invoked for messages where the IsAudit
predicate returns false.
transforms.FilterAudit.predicate=IsAudit
transforms.FilterAudit.negate=true