Using the AvroConverter

The AvroConverter is a Kafka Connect converter shipped with Cloudera Runtime that enables Kafka Connect connectors to serialize or deserialize Kafka messages, consisting of key and value pairs, from or to Avro. Using the Avro format can make your messages more compact compared to using for example JSON format. AvroConverter is able to convert from Avro format to Kafka Connect internal data format, and from Kafka Connect internal data to Avro format, and is able to handle Avro schemas integrated with Schema Registry automatically.

Using AvroConverter with a source connector

When using the AvroConverter with a Kafka Connect source connector, the source connector fetches the data from an external system. The fetched data is converted into Kafka Connect internal data format. If Single Message Transforms (SMT) are configured, the messages are converted based on the SMT conditions as well. If the AvroConverter is enabled, the messages are converted from the internal data format to Avro. At this step, the AvroConverter automatically also creates a new schema version in Schema Registry for the Avro data, if the schema has not been already created. In case the schema already exists in Schema Registry, schema creation is skipped and the existing ID will be assigned to the payload during the serialization. The converted data in Avro is written to the Kafka cluster with its corresponding schema ID from Schema Registry.

The following illustration shows the steps how AvroConverter works with a Kafka Connect source connector:

Using AvroConverter with a sink connector

When using the AvroConverter with a Kafka Connect sink connector, the data is fetched from the Kafka cluster, which includes its schema ID from Schema Registry. The AvroConverver extracts the schema ID and gets the corresponding schema from Schema Registry. The data is converted from Avro format to the Kafka Connect internal data format. If SMT are configured, the messages are converted based on the SMT conditions as well. The converted data is written by the Kafka Connect sink connector to the external system.

The following illustration shows the steps how AvroConverter works with a Kafka Connect sink connector:

Mapping logical data types

AvroConverter is capable of mapping the data from the Kafka Connect internal data format to Avro. The mapping happens automatically during the conversion, which means that no property needs be configured. The AvroConverter also handles Kafka Connect specific logical types and maps them to Avro ones and vice versa.

The corresponding logical types for Kafka Connect and Avro with their direction is shown in the following table:

Table 1. Logical Type Mapping
Kafka Connect Logical Type Direction Avro Logical Type
Decimal ⟵⟶ Decimal
Date ⟵⟶ Date
Time ⟵⟶ TimeMillis
Time TimeMicros
Timestamp ⟵⟶ TimestampMillis
Timestamp TimestampMicros

Handling passthrough

When passthrough is enabled, messages are not converted between the Avro format and the Kafka Connect internal data format. In this case, the data is passed through unchanged to the Kafka Connect sink connector. If the passthrough is disabled, messages are converted from Avro to the Kafka Connect internal data format, which can make your input and output formats to be different.

The following illustration shows what happens when passthrough is enabled in the AvroConverter for a sink connector:
Enabling passthrough is recommended when both the input and output formats of your messages are Avro, and it can save any performance overhead generated by the conversion process. You need to ensure that the passthrough is only enabled if the connector can handle Avro objects. Cloudera provides the following connectors that are capable of handling passthrough:
  • HDFS Sink connector
  • S3 Sink connector
Any other third party connectors can make use of the passthrough if the connectors are implemented to handle the passthrough of the message.

You can enable and disable passthrough by setting the passthrough.enabled property to true or false when configuring the connector. For more information, see the KafkaAvroSerializer properties reference documentation under the Related information

Enabling and configuring the AvroConverter

When the AvroConverter is enabled as a key or value converter, you also need to provide further configuration properties that affect the behavior of the AvroConverter. All configuration properties must be prefixed with either the value.converter. or key.converter. prefixes. These prefixes control whether a specific property is applied to the key or value converter.

Enabling AvroConverter

You can enable AvroConverter for any connector using the key.converter and value.converter properties to the connector configuration. The value of these properties must be set to the class name of the AvroConverter.

For example, if you want to use the AvroConverter for both key and value conversion, you must add the following properties to the connector configuration:
"key.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
Conversion is enabled separately for key or value conversion, which enables you to use different converters for the key and value. For example, if you want to use the AvroConverter to convert values, but use a different converter, such as the StringConverter, to convert keys, you must add the following properties to the connector configuration:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",

Configuring passthrough

The passthrough configuration in AvroConverter controls whether or not the data is converted into the Kafka Connect internal data format before it is written into an output file or if data is passed through the connector without conversion. Passthrough is enabled by the passthrough.enabled property, which is set to true by default.

The passthrough can be set with different prefixes for key and value converters as shown in the following example configuration:
"key.converter.passthrough.enabled":  "false",
"value.converter.passthrough.enabled":  "false",

Enabling SerDes

To transform messages, the AvroConverter uses an embedded KafkaAvroSerializer and KafkaAvroDeserializer, which need to be configured with the AvroConverter to enable further control over the behavior.

You must provide the URL of the Schema Registry server to be able to use the KafkaAvroSerializer and KafkaAvroDeserializer, because both the serializer and the deserializer contain an embedded Schema Registry client. You can specify the Schema Registry server URL with the schema.registry.url property as shown in the following example:
"key.converter.schema.registry.url":  "http://registryserver.example.com:9090/api/v1"
"value.converter.schema.registry.url":  "http://registryserver.example.com:9090/api/v1"

You can configure additional properties related to serialization, deserialization and the Schema Registry client using the property reference documentations under the Related information.

Configuration example

The following example configuration shows a connector configuration with AvroConverter set as key and value converter with connection to a remote Schema Registry server.

{
"value.converter":  "com.cloudera.dim.kafka.connect.converts.AvroConverter", // use AvroConverter for the conversion of the Kafka message value
"value.converter.passthrough.enabled":  "false", // disable passthrough
"value.converter.schema.registry.url": "http://registryserver.example.com:9090/api/v1", // URL of the schema registry server that the Schema Registry client inside the value converter will connect to 
"value.converter.schema.registry.auth.username = [***USERNAME***],  // username to be used for authentication by the value converter’s embedded Schema Registry client
"value.converter.schema.registry.auth.password = [***PASSWORD***], //  password to be used for authentication by the value converter’s embedded Schema Registry client
"key.converter":  "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"key.converter.schema.registry.auth.username = [***USERNAME***],
"key.converter.schema.registry.auth.password = [***PASSWORD***],
"key.converter.passthrough.enabled":  "false",
"key.converter.schema.registry.url":  "http://registryserver.example.com:9090/api/v1"
}

Configuring AvroConverter for Debezium connectors with Schema Registry integration

Learn about how to configure the AvroConverter for Debezium connector with Schema Registry integration.

When you want to use the AvroConverter for the Debezium connectors, you need to configure the passthrough, schema compatibility, schema name value and key prefix properties.

passthrough.enabled

You must disable the passthrough to have automatically created schemas for CDC records in Kafka as shown in the following example:
"value.converter.passthrough.enabled": "false"
"key.converter.passthrough.enabled": "false"

This will ensure that the CDC event fields are loaded into the internal schema of Kafka Connect, from which the field will be converted into Avro format by the AvroConverter before writing it to Kafka. When doing this conversion, the schemas are automatically created in Schema Registry as well.

schema.compatibility

Whenever a schema is created in Schema Registry, compatibility is also set to it by default. Based on the compatibility, if the schema evolves, the new version will be validated against old ones. For more information about the compatibility in Schema Registry, see the Compatibility policies documentation under the Related information.

The compatibility must be set before the first CDC event is processed by the connector, because the first schema will be created based on the compatibility policies. If backward incompatible changes could happen in the database, consider setting the parameters to NONE, because in this case the Avro schemas will also be incompatible.

The default compatibility can be set based on the following example:
“value.converter.schema.compatibility”: “NONE”
“key.converter.schema.compatibility”: “NONE”

schema.name.key.suffix and schema.name.value.suffix

When a new schema is created, the name of the schema consists of the name of the topic and an appended value, if set. However, it is possible to set additional suffixes for schema names. The default suffix is empty for the values, but :k is added as a prefix for keys. Key and value suffixes can be set on converters according the following example:
"value.converter.schema.name.value.suffix", "value_suffix"
"key.converter.schema.name.key.suffix", "key_suffix"

Additional configuration properties for Debezium connectors and Schema Registry integration with AvroConverter

The bigint.unsigned.handling.mode in a Debezium connector handles how bigint data types are mapped, which have the following values that can affect the behavior of the logical data type conversion:
precise
The bigint unsigned values will be stored as byte arrays in Avro with decimal logical type. You can get a byte array or a BigDecimal automatically when you would like to read this data. The object type you get depends on if logical type conversion is enabled or not.
long
Whatever is stored in the database, it will be stored in Avro as long, and you get a Java long object in Java Virtual Machine (JVM). In this case, precision of the mapping is smaller, but no need for logical type handling.

Check schema.name.adjustment.mode and field.name.adjustment.mode to ensure that there are no conflicts between your database related names and the Avro specific naming convention.

Check decimal.handling.mode to decide how precise numbers you need in different cases, and if you need to use logical type for higher precision

Check time.precision.mode to decide how precise dates and times you need in different cases, and if you need to use logical type for higher precision.

Reading Debezium Avro records from Kafka

Learn how to read Debezium CDC Avro records from Kafka using the AvroConverter functions.

Avro records can be consumed from Kafka with the Schema Registry integration as described in the Integrating Kafka with Schema Registry documentation.

If any of your connectors produce Avro records with logical types, you do not need to write the conversion from raw Avro data to proper Java objects as you can enable automatic conversion. To enable automatic logical type conversion, you need to set the logical.type.conversion.enabled property in the consumer configuration as shown with the following in the following example:
“logical.type.conversion.enabled”: “true”

This conversion is handled by the internal SerDes classes in KafkaAvroSerializer and KafkaAvroDeserializer.

Logical type conversion automatically happens in Debezium connectors when they produce the Avro events to Kafka. Ensure that the logical.type.conversion.enabled property is not set as true for the key/value SerDes classes of the AvroConverter. The logical.type.conversion.enabled property only needs to be enabled on the consumer/producer side when the consumer/producer is not part of a Kafka Connect connector.