Cloudera Streaming Analytics offers Kafka connector as a source and a sink to create
a complete stream processing architecture with a stream messaging platform. You must develop
your application defining Kafka as a source and sink, after adding Kafka dependency to your
project.
In CSA, adding Kafka as a connector creates a scalable communication channel between
your Flink application and the rest of your infrastructure. Kafka is often
responsible for delivering the input records and for forwarding them as an output,
creating a frame around Flink.
When Kafka is used as a connector, Cloudera offers the following integration
solutions:
- Schema Registry
- Streams Messaging Manager
- Kafka Metrics Reporter
Both Kafka sources and sinks can be used with exactly once processing guarantees when
checkpointing is enabled.
For more information about Apache Kafka, see the Cloudera Runtime documentation.
-
Add the Kafka connector dependency to your Flink job.
Example for Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.2-csa1.11.0.1</version>
</dependency>
-
Set
KafkaSource
as the source in the Flink application
logic.
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your_broker_url>")
.setTopics("<your_source_topic>")
.setGroupId("<your_group_id>")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
source = KafkaSource.builder() \
.set_bootstrap_servers('<your_broker_url>') \
.set_topics('<your_source_topic>') \
.set_group_id('<your_group_id>') \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
-
Set
KafkaSink
as the sink in the Flink application
logic.
DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("<your_broker_url>")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("<your_sink_topic>")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
stream = ...
sink = KafkaSink.builder() \
.set_bootstrap_servers(brokers) \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic('<your_sink_topic>')
.set_value_serialization_schema(SimpleStringSchema())
.build()) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()
stream.sink_to(sink)