Kafka connector
Cloudera Streaming Analytics provides Kafka as not only a DataStream connector, but also enables Kafka in the Flink SQL feature. This means if you have designed your streaming application to have Kafka as source and sink, you can retrieve your output data in tables. When using the Kafka connector, you are required to specify one of the supported message formats.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>3.1-csadh1.12.0.0</version> </dependency>
For more information about the Kafka connector, see the Apache Flink documentation.
The following example shows a
statement with Kafka
connector:CREATE TABLE source_table ( id BIGINT, ts BIGINT, itemId STRING, quantity INT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'input_topic', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = '<hostname>:<port>', 'connector.properties.group.id' = 'test', 'format.type' = 'json' );
On a secured environment where Kerberos and SSL is enabled, the following example can be
CREATE TABLE source_table ( c1 STRING ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'source_topic', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.bootstrap.servers' = '<host>:<port>', 'connector.properties.group.id' = 'test', 'connector.properties.security.protocol' = 'SASL_SSL', 'connector.properties.sasl.kerberos.service.name' = 'kafka', 'connector.properties.ssl.truststore.location' = '<absolute_path_to_jks>', 'format.type' = 'csv' )