Kafka with Flink

Apache Kafka can be used as a source and sink for the Flink application to create a complete stream processing architecture with a stream message platform.

In CSA, adding Kafka as a connector creates a scalable communication channel between your Flink application and the rest of your infrastructure. Kafka is often responsible for delivering the input records and for forwarding them as an output, creating a frame around Flink.

When Kafka is used as a connector, Cloudera offers the following integration solutions:
  • Schema Registry
  • Streams Messaging Manager
  • Logging to Kafka
  • Kafka Metrics Reporter

Both Kafka sources and sinks can be used with exactly once processing guarantees when checkpointing is enabled.

For more information about Apache Kafka, see the Cloudera Stream Processing documentation.

  1. Add the Kafka connector dependency to your Flink job.
    Example for Maven:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
  2. Set FlinkKafkaConsumer as the source in the Flink application logic.
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "<your_broker_url>");
    properties.put("group.id", "<your_group_id>");
    
    FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(
                   "<your_input_topic>",
                    new SimpleStringSchema(),
                    properties);
    
  3. Set FlinkKafkaProducer as the sink in the Flink application logic.
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "<your_broker_url>");
    
    FlinkKafkaProducer<String> output = new FlinkKafkaProducer<>(
                    <your_output_topic>",
                    new SimpleStringSchema(),
                    properties,
        Semantic.EXACTLY_ONCE);