Using the AvroConverter
The AvroConverter is a Kafka Connect converter shipped with Cloudera Runtime that enables Kafka Connect connectors to serialize or deserialize Kafka messages, consisting of key and value pairs, from or to Avro. Using the Avro format can make your messages more compact compared to using for example JSON format. AvroConverter is able to convert from Avro format to Kafka Connect internal data format, and from Kafka Connect internal data to Avro format, and is able to handle Avro schemas integrated with Schema Registry automatically.
Using AvroConverter with a source connector
When using the AvroConverter
with a Kafka Connect source connector, the
source connector fetches the data from an external system. The fetched data is converted into
Kafka Connect internal data format. If Single Message Transforms (SMT) are configured, the
messages are converted based on the SMT conditions as well. If the
AvroConverter
is enabled, the messages are converted from the internal data
format to Avro. At this step, the AvroConverter
automatically also creates a
new schema version in Schema Registry for the Avro data, if the schema has not been already
created. In case the schema already exists in Schema Registry, schema creation is skipped and
the existing ID will be assigned to the payload during the serialization. The converted data in
Avro is written to the Kafka cluster with its corresponding schema ID from Schema Registry.
AvroConverter
works with a
Kafka Connect source connector:Using AvroConverter with a sink connector
When using the AvroConverter
with a Kafka Connect sink connector, the data is
fetched from the Kafka cluster, which includes its schema ID from Schema Registry. The
AvroConverver
extracts the schema ID and gets the corresponding schema from
Schema Registry. The data is converted from Avro format to the Kafka Connect internal data
format. If SMT are configured, the messages are converted based on the SMT conditions as well.
The converted data is written by the Kafka Connect sink connector to the external system.
AvroConverter
works with a
Kafka Connect sink connector:Mapping logical data types
AvroConverter
is capable of mapping the data from the Kafka Connect internal
data format to Avro. The mapping happens automatically during the conversion, which means that
no property needs be configured. The AvroConverter
also handles Kafka Connect
specific logical types and maps them to Avro ones and vice versa.
The corresponding logical types for Kafka Connect and Avro with their direction is shown in the following table:
Kafka Connect Logical Type | Direction | Avro Logical Type |
---|---|---|
Decimal | ⟵⟶ | Decimal |
Date | ⟵⟶ | Date |
Time | ⟵⟶ | TimeMillis |
Time | ⟵ | TimeMicros |
Timestamp | ⟵⟶ | TimestampMillis |
Timestamp | ⟵ | TimestampMicros |
Handling passthrough
When passthrough is enabled, messages are not converted between the Avro format and the Kafka Connect internal data format. In this case, the data is passed through unchanged to the Kafka Connect sink connector. If the passthrough is disabled, messages are converted from Avro to the Kafka Connect internal data format, which can make your input and output formats to be different.
AvroConverter
for a sink connector:- HDFS Sink connector
- S3 Sink connector
You can enable and disable passthrough by setting the passthrough.enabled
property to true
or false
when configuring the connector. For
more information, see the KafkaAvroSerializer properties reference documentation
under the Related information
Enabling and configuring the AvroConverter
When the AvroConverter is enabled as a key or value converter, you also need to provide further configuration properties that affect the behavior of the AvroConverter. All configuration properties must be prefixed with either the value.converter. or key.converter. prefixes. These prefixes control whether a specific property is applied to the key or value converter.
Enabling AvroConverter
You can enable AvroConverter
for any connector using the
key.converter
and value.converter
properties to the connector
configuration. The value of these properties must be set to the class name of the AvroConverter.
AvroConverter
for both key and value
conversion, you must add the following properties to the connector
configuration:"key.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
AvroConverter
to convert values, but use a different converter, such as the
StringConverter
, to convert keys, you must add the following properties to the
connector
configuration:"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
Configuring passthrough
The passthrough configuration in AvroConverter
controls whether or not the
data is converted into the Kafka Connect internal data format before it is written into an
output file or if data is passed through the connector without conversion. Passthrough is
enabled by the passthrough.enabled
property, which is set to
true
by default.
"key.converter.passthrough.enabled": "false",
"value.converter.passthrough.enabled": "false",
Enabling SerDes
To transform messages, the AvroConverter
uses an embedded
KafkaAvroSerializer
and KafkaAvroDeserializer
, which need to
be configured with the AvroConverter
to enable further control over the
behavior.
KafkaAvroSerializer
and KafkaAvroDeserializer
, because both
the serializer and the deserializer contain an embedded Schema Registry client. You can specify
the Schema Registry server URL with the schema.registry.url
property as shown
in the following
example:"key.converter.schema.registry.url": "http://registryserver.example.com:9090/api/v1"
"value.converter.schema.registry.url": "http://registryserver.example.com:9090/api/v1"
You can configure additional properties related to serialization, deserialization and the Schema Registry client using the property reference documentations under the Related information.
Configuration example
The following example configuration shows a connector configuration with
AvroConverter
set as key and value converter with connection to a remote
Schema Registry server.
{
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter", // use AvroConverter for the conversion of the Kafka message value
"value.converter.passthrough.enabled": "false", // disable passthrough
"value.converter.schema.registry.url": "http://registryserver.example.com:9090/api/v1", // URL of the schema registry server that the Schema Registry client inside the value converter will connect to
"value.converter.schema.registry.auth.username = [***USERNAME***], // username to be used for authentication by the value converter’s embedded Schema Registry client
"value.converter.schema.registry.auth.password = [***PASSWORD***], // password to be used for authentication by the value converter’s embedded Schema Registry client
"key.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"key.converter.schema.registry.auth.username = [***USERNAME***],
"key.converter.schema.registry.auth.password = [***PASSWORD***],
"key.converter.passthrough.enabled": "false",
"key.converter.schema.registry.url": "http://registryserver.example.com:9090/api/v1"
}
Configuring AvroConverter for Debezium connectors with Schema Registry integration
Learn about how to configure the AvroConverter for Debezium connector with Schema Registry integration.
When you want to use the AvroConverter
for the Debezium connectors, you need
to configure the passthrough, schema compatibility, schema name value and key prefix
properties.
passthrough.enabled
"value.converter.passthrough.enabled": "false"
"key.converter.passthrough.enabled": "false"
This will ensure that the CDC event fields are loaded into the internal schema of Kafka
Connect, from which the field will be converted into Avro format by the
AvroConverter
before writing it to Kafka. When doing this conversion, the
schemas are automatically created in Schema Registry as well.
schema.compatibility
Whenever a schema is created in Schema Registry, compatibility is also set to it by default. Based on the compatibility, if the schema evolves, the new version will be validated against old ones. For more information about the compatibility in Schema Registry, see the Compatibility policies documentation under the Related information.
The compatibility must be set before the first CDC event is processed by the connector,
because the first schema will be created based on the compatibility policies. If backward
incompatible changes could happen in the database, consider setting the parameters to
NONE
, because in this case the Avro schemas will also be
incompatible.
“value.converter.schema.compatibility”: “NONE”
“key.converter.schema.compatibility”: “NONE”
schema.name.key.suffix and schema.name.value.suffix
:k
is added as a
prefix for keys. Key and value suffixes can be set on converters according the following
example:"value.converter.schema.name.value.suffix", "value_suffix"
"key.converter.schema.name.key.suffix", "key_suffix"
Additional configuration properties for Debezium connectors and Schema Registry integration with AvroConverter
bigint.unsigned.handling.mode
in a Debezium connector handles how
bigint
data types are mapped, which have the following values that can
affect the behavior of the logical data type conversion:precise
- The
bigint unsigned
values will be stored as byte arrays in Avro withdecimal
logical type. You can get a byte array or aBigDecimal
automatically when you would like to read this data. The object type you get depends on if logical type conversion is enabled or not. long
- Whatever is stored in the database, it will be stored in Avro as
long
, and you get a Javalong
object in Java Virtual Machine (JVM). In this case, precision of the mapping is smaller, but no need for logical type handling.
Check schema.name.adjustment.mode
and
field.name.adjustment.mode
to ensure that there are no conflicts between
your database related names and the Avro specific naming convention.
Check decimal.handling.mode
to decide how precise numbers you need in
different cases, and if you need to use logical type for higher precision
Check time.precision.mode
to decide how precise dates and times you need
in different cases, and if you need to use logical type for higher precision.
Reading Debezium Avro records from Kafka
Learn how to read Debezium CDC Avro records from Kafka using the AvroConverter functions.
Avro records can be consumed from Kafka with the Schema Registry integration as described in the Integrating Kafka with Schema Registry documentation.
logical.type.conversion.enabled
property in the consumer configuration as shown
with the following in the following
example:“logical.type.conversion.enabled”: “true”
This conversion is handled by the internal SerDes
classes in
KafkaAvroSerializer
and KafkaAvroDeserializer
.
Logical type conversion automatically happens in Debezium connectors when they produce the Avro
events to Kafka. Ensure that the logical.type.conversion.enabled
property is not
set as true
for the key/value SerDes
classes of the
AvroConverter
. The logical.type.conversion.enabled
property
only needs to be enabled on the consumer/producer side when the consumer/producer is not part of
a Kafka Connect connector.