Kafka connector

The Kafka connector enables reading and writing from and to an Apache Kafka topic.

When using the Kafka connector, you are required to specify one of the supported message formats.

Maven dependency

For more information about the Kafka connector, see the Apache Flink documentation.

The following example shows a CREATE TABLE statement with Kafka connector:
CREATE TABLE source_table (
	id		BIGINT,
	ts		BIGINT,
	itemId	STRING,
	quantity 	INT
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'input_topic',
	'connector.startup-mode' = 'latest-offset',
	'connector.properties.bootstrap.servers' = '<hostname>:<port>',
	'connector.properties.group.id' = 'test',
	'format.type' = 'json'
On a secured environment where Kerberos and SSL is enabled, the following example can be used:
CREATE TABLE source_table (
) WITH (
  'connector.type'                                    = 'kafka',
   'connector.version'                                 = 'universal',
   'connector.topic'                                   = 'source_topic',
   'connector.startup-mode'                            = 'earliest-offset',
   'connector.properties.bootstrap.servers'            = '<host>:<port>',
   'connector.properties.group.id'                     = 'test',
   'connector.properties.security.protocol'            = 'SASL_SSL',
   'connector.properties.sasl.kerberos.service.name'   = 'kafka',
   'connector.properties.ssl.truststore.location'      = '<absolute_path_to_jks>',
   'format.type'                                       = 'csv'