Kafka with Flink

Cloudera Streaming Analytics offers Kafka connector as a source and a sink to create a complete stream processing architecture with a stream messaging platform. You must develop your application defining Kafka as a source and sink, after adding Kafka dependency to your project.

In CSA, adding Kafka as a connector creates a scalable communication channel between your Flink application and the rest of your infrastructure. Kafka is often responsible for delivering the input records and for forwarding them as an output, creating a frame around Flink.

When Kafka is used as a connector, Cloudera offers the following integration solutions:
  • Schema Registry
  • Streams Messaging Manager
  • Kafka Metrics Reporter

Both Kafka sources and sinks can be used with exactly once processing guarantees when checkpointing is enabled.

For more information about Apache Kafka, see the Cloudera Runtime documentation.

  1. Add the Kafka connector dependency to your Flink job.
    Example for Maven:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>3.1-csa1.12.0.0</version>
    </dependency>
  2. Set KafkaSource as the source in the Flink application logic.
    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("<your_broker_url>")
        .setTopics("<your_source_topic>")
        .setGroupId("<your_group_id>")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    source = KafkaSource.builder() \
        .set_bootstrap_servers('<your_broker_url>') \
        .set_topics('<your_source_topic>') \
        .set_group_id('<your_group_id>') \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
  3. Set KafkaSink as the sink in the Flink application logic.
    DataStream<String> stream = ...
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers("<your_broker_url>")
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic("<your_sink_topic>")
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
            
    stream.sinkTo(sink);
    stream = ...
    sink = KafkaSink.builder() \
       .set_bootstrap_servers(brokers) \
       .set_record_serializer(
            KafkaRecordSerializationSchema.builder()
                .set_topic('<your_sink_topic>')
                .set_value_serialization_schema(SimpleStringSchema())
                .build()) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .build()
    
    stream.sink_to(sink)