Kafka connector
Cloudera Streaming Analytics provides Kafka as not only a DataStream connector, but also enables Kafka in the Flink SQL feature. This means if you have designed your streaming application to have Kafka as source and sink, you can retrieve your output data in tables. When using the Kafka connector, you are required to specify one of the supported message formats.
Maven
dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.16.1-csa1.10.0.0</version>
</dependency>
For more information about the Kafka connector, see the Apache Flink documentation.
The following example shows a
CREATE TABLE
statement with Kafka
connector:CREATE TABLE source_table (
id BIGINT,
ts BIGINT,
itemId STRING,
quantity INT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'input_topic',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = '<hostname>:<port>',
'connector.properties.group.id' = 'test',
'format.type' = 'json'
);
On a secured environment where Kerberos and SSL is enabled, the following example can be
used:
CREATE TABLE source_table (
c1 STRING
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'source_topic',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = '<host>:<port>',
'connector.properties.group.id' = 'test',
'connector.properties.security.protocol' = 'SASL_SSL',
'connector.properties.sasl.kerberos.service.name' = 'kafka',
'connector.properties.ssl.truststore.location' = '<absolute_path_to_jks>',
'format.type' = 'csv'
)