CSV format

The CSV format allows your applications to read data from, and write data to different external sources in CSV. You must add the CSV dependency to your project and define the format type in CREATE table to CSV.

The CSV format will derive format schema from the table schema by default. The format schema can be defined with Flink types also, but this functionality is not supported yet.

Maven dependency
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-csv</artifactId>
  <version>1.19.1-csa1.13.1.0</version>
</dependency>

For more information about the JSON format, see the Apache Flink documentation.

The following example shows the Kafka connector with CSV data type:
CREATE TABLE source_table (
  c1	INT,
  c2	STRING,
  c3	TIMESTAMP(3)
) WITH (
   'connector.type'                           = 'kafka',
   'connector.version'                        = 'universal',
   'connector.topic'                          = 'sink_topic',
   'connector.properties.bootstrap.servers'   = '<host>:<port>',
   'connector.properties.group.id'            = 'test',
   'format.type'                              = 'csv'
)