Configuring an SMT chain

Learn how to configure a Single Message Transformation (SMT) chain for Kafka Connect connectors.

SMT chains can be configured within the configuration of a Kafka Connect connector using SMT specific configuration properties. To set up a chain, you first define your transformation chain with the transforms property and optionally define your predicates using the predicates property. Afterward, you use transforms.* and predicates.* to configure the plugins in the chain. For example, the following configuration snippet sets up a transformation chain that filters messages based on their header and removes a specified field from messages.

transforms=FilterAudit,MaskField

transforms.MaskField.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.MaskField.fields=CreditCardNumber

transforms.FilterAudit.type=org.apache.kafka.connect.transforms.Filter
transforms.FilterAudit.predicate=IsAudit
transforms.FilterAudit.negate=false

predicates=IsAudit
predicates.IsAudit.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name=Audit

The following sections go through the properties in this example and give an overview on how to set up a transformation chain.

Configuring transforms

The transforms property contains a comma-separated list of transformation aliases. Each alias represents one step in the transformation chain. The aliases you add to the property are arbitrary names, they are used in other properties to configure that particular transformation step. For example, the following defines a two step transformation chain.

transforms=FilterAudit,MaskField

The transforms.[***ALIAS***].type property specifies which transformation plugin should be used in a transformation step. [***ALIAS***] is one of the aliases that you specified in transforms. The value of the property is the fully qualified name of the transformation plugin that should be used in the step. For example, the following line specifies org.apache.kafka.connect.transforms.MaskField$Value as the plugin for the MaskField step.

transforms.MaskField.type=org.apache.kafka.connect.transforms.MaskField$Value

Many transformation plugins support changing both the key and the value of a record. For these plugins, typically, a nested value or key class can be referenced as the type.

The transforms.[***ALIAS***].[***KEY***] property is used to configure the transformation plugins in your chain. This property is passed to the transformation plugin itself with transforms.[***ALIAS***] stripped from the property key. [***ALIAS***] is the alias of a plugin you specified in transforms. [***KEY***] is a property key that the plugin accepts. For example, the MaskField plugin has a fields property that specifies which fields should be removed from the structure.

transforms.MaskField.fields=CreditCardNumber

Configuring predicates

Predicates are a separate set of plugins. You use them to conditionally enable certain steps in the transformation chain. Predicates are configured in a similar way to transforms. You must specify the predicate aliases, associate the aliases with a plugin, and set plugin specific properties.

predicates=IsAudit
predicates.IsAudit.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.IsAudit.name=Audit

In this example the IsAudit predicate is an instance of the HasHeaderKey predicate plugin. This predicate returns true for records where a specific header key is present. predicates.IsAudit.name=Audit configures the predicate to look for the Audit header in the records.

After a predicate is set up, you can associate the predicate with any number of transformation steps using the predicate property. If a predicate is associated with a transformation, that transformation step is only applied to the messages that match the condition specified in the predicate.

A good example for using a predicate is the Filter transformation plugin. This is because Filter filters (drops) all messages by default. Therefore, it must be used together with predicates to specify filtering logic. For example, the following configuration instructs the SMT framework that the FilterAudit step should only be invoked for messages where the IsAudit predicate returns true. That is, all messages with the Audit header will be filtered and will not be transformed by any subsequent steps in the transformation chain.

transforms.FilterAudit.predicate=IsAudit
transforms.FilterAudit.negate=false

The condition of a predicate can be inverted using negate. If negate is set to true, the SMT framework applies the transformation to any record that does not match the condition. For example, the following configuration instructs the SMT framework that the FilterAudit step should only be invoked for messages where the IsAudit predicate returns false.

transforms.FilterAudit.predicate=IsAudit
transforms.FilterAudit.negate=true