Avro format

The Apache Avro format enables you to read and write Avro data. You must add the Avro dependency to your project and define the format type in CREATE table to Avro. You also need to specify the fields of the Avro record within the table.

The format schema can be defined either as a fully qualified class name of an Avro specific record or as an Avro schema string. If a class name is used, the class must be available in the classpath during runtime.

When using the Avro schema string, you must specify the fields of the Avro record. The schema must correspond to the schema of the table in Flink.

For a detailed description of Avro schemas, see the Apache Avro documentation.

Maven dependency
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.19.1-csa1.13.1.0</version>
</dependency>

The following example shows how to use an Avro schema string when creating a Kafka connector table. It is specified as a JSON object, having record as type, a name, and the specification of its fields. Note the correspondence of various data types, especially the decimal and array fields.

Example
CREATE TABLE source_table(
	string_field	STRING,
	long_field 	 	BIGINT,
	decimal_field 	DECIMAL(38,18),
    	int_arr_field 	ARRAY<INT>
) WITH (
	'connector.type'                           = 'kafka',
	'connector.version'                        = 'universal',
	'connector.topic'                          = 'input_topic',
    	'connector.properties.group.id'            = 'test',
	'connector.properties.bootstrap.servers'   = '<hostname>:<port>'
	'format.type'                              = 'avro',
    	'format.avro-schema' =
  	'{
     	  "type": "record",
     	  "name": "test",
     	  "fields" : [
      	{"name": "string_field", "type": "string"},
      	{"name": "long_field",   "type": "long"},
      	{"name": "decimal_field", "type": 
{"type": "bytes", 
 "logicalType": "decimal", 
 "precision": 38, 
 "scale": 18}},
          	{"name": "int_arr_field", "type": 
{"type":"array", 
 "items":"int"}}
      ]
    }'
)