Avro format
The Apache Avro format enables you to read and write Avro data. You must add the Avro dependency to your project and define the format type in CREATE table to Avro. You also need to specify the fields of the Avro record within the table.
The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.
When using the Avro schema string, you must specify the fields of the Avro record. The schema must correspond to the schema of the table in Flink.
For a detailed description of Avro schemas, see the Apache Avro documentation.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.16.1-csa1.10.0.0</version>
<scope>provided</scope>
</dependency>
The following example shows how to use an Avro schema string when creating a Kafka connector table. It is specified as a JSON object, having record as type, a name, and the specification of its fields. Note the correspondence of various data types, especially the decimal and array fields.
CREATE TABLE source_table(
string_field STRING,
long_field BIGINT,
decimal_field DECIMAL(38,18),
int_arr_field ARRAY<INT>
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'input_topic',
'connector.properties.group.id' = 'test',
'connector.properties.bootstrap.servers' = '<hostname>:<port>'
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "string_field", "type": "string"},
{"name": "long_field", "type": "long"},
{"name": "decimal_field", "type":
{"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 18}},
{"name": "int_arr_field", "type":
{"type":"array",
"items":"int"}}
]
}'
)