Cloudera Streaming Analytics offers Kafka connector as a source and a sink to create
        a complete stream processing architecture with a stream messaging platform. You must develop
        your application defining Kafka as a source and sink, after adding Kafka dependency to your
        project.
        
            In CSA, adding Kafka as a connector creates a scalable communication channel between
                your Flink application and the rest of your infrastructure. Kafka is often
                responsible for delivering the input records and for forwarding them as an output,
                creating a frame around Flink.
            When Kafka is used as a connector, Cloudera offers the following integration
                    solutions:
                    - Schema Registry
 
                    - Streams Messaging Manager
 
                    - Kafka Metrics Reporter
 
                
 
            Both Kafka sources and sinks can be used with exactly once processing guarantees when
                checkpointing is enabled.
            For more information about Apache Kafka, see the Cloudera Runtime documentation.
        
        - 
                Add the Kafka connector dependency to your Flink job.
                
                    Example for Maven:
                        
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>3.2-csa1.13.2.0</version>
</dependency>
 
                 
             - 
                Set 
KafkaSource as the source in the Flink application
                    logic.
                
                    KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("<your_broker_url>")
    .setTopics("<your_source_topic>")
    .setGroupId("<your_group_id>")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
source = KafkaSource.builder() \
    .set_bootstrap_servers('<your_broker_url>') \
    .set_topics('<your_source_topic>') \
    .set_group_id('<your_group_id>') \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()
 
                 
             - 
                Set 
KafkaSink as the sink in the Flink application
                    logic.
                
                    DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers("<your_broker_url>")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("<your_sink_topic>")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build())
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);
stream = ...
sink = KafkaSink.builder() \
   .set_bootstrap_servers(brokers) \
   .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic('<your_sink_topic>')
            .set_value_serialization_schema(SimpleStringSchema())
            .build()) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()
stream.sink_to(sink)