JSON format

The JSON format enables reading and writing JSON data.

The expected JSON schema will be derived from the table schema by default. Specifying the JSON schema manually is not supported.

Maven dependency
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-json</artifactId>
   <version>1.10.0-csa1.2.0.0</version>
   <scope>provided</scope>
</dependency>

For more information about the JSON format, see the Apache Flink documentation.

The following example shows the Kafka connector with JSON data type:
CREATE TABLE source_table (
  c1	INT,
  c2	STRING,
  c3	DECIMAL(38,18),
  c4	TIMESTAMP(3),
) WITH (
  	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'input_topic',
	'connector.startup-mode' = 'latest-offset',
	'connector.properties.bootstrap.servers' = '<hostname>:<port>',
	'connector.properties.group.id' = 'test',
	'format.type' = 'json'
);