Kafka connector

Cloudera Streaming Analytics provides Kafka as not only a DataStream connector, but also enables Kafka in the Flink SQL feature. This means if you have designed your streaming application to have Kafka as source and sink, you can retrieve your output data in tables. When using the Kafka connector, you are required to specify one of the supported message formats.

Maven dependency
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_2.12</artifactId>
   <version>1.15.1-csa1.8.0.0</version>
</dependency>

For more information about the Kafka connector, see the Apache Flink documentation.

The following example shows a CREATE TABLE statement with Kafka connector:
CREATE TABLE source_table (
	id		BIGINT,
	ts		BIGINT,
	itemId	STRING,
	quantity 	INT
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'input_topic',
	'connector.startup-mode' = 'latest-offset',
	'connector.properties.bootstrap.servers' = '<hostname>:<port>',
	'connector.properties.group.id' = 'test',
	'format.type' = 'json'
);
On a secured environment where Kerberos and SSL is enabled, the following example can be used:
CREATE TABLE source_table (
  c1 STRING
) WITH (
  'connector.type'                                    = 'kafka',
   'connector.version'                                 = 'universal',
   'connector.topic'                                   = 'source_topic',
   'connector.startup-mode'                            = 'earliest-offset',
   'connector.properties.bootstrap.servers'            = '<host>:<port>',
   'connector.properties.group.id'                     = 'test',
   'connector.properties.security.protocol'            = 'SASL_SSL',
   'connector.properties.sasl.kerberos.service.name'   = 'kafka',
   'connector.properties.ssl.truststore.location'      = '<absolute_path_to_jks>',
   'format.type'                                       = 'csv'
)