Analyzing your data with Kafka

Describes how to create and use a Flink streaming application with Kafka in CDP Public Cloud.

  1. Create your streaming Flink application adding Kafka as source and sink.
    Add the following FlinkKafkaConsumer and FlinkKafkaProducer entry to your Flink application logic.
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "<your_broker_url>");
    properties.put("group.id", "<your_group_id>");
    
    FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(
                   "<your_input_topic>",
                    new SimpleStringSchema(),
                    properties);
    
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "<your_broker_url>");
    
    FlinkKafkaProducer<String> output = new FlinkKafkaProducer<>(
                    <your_output_topic>",
                    new SimpleStringSchema(),
                    Properties,
        Semantic.EXACTLY_ONCE);
    
  2. Add the Kafka broker hostnames to the Flink job properties file.
    1. Navigate to Management Console > Environments, and select the environment where you have created your cluster.
    2. Click on the Streams Messaging cluster.
    3. Click Hardware.
    4. Search for the Kafka brokers.
    5. Click the copy icon next to the Kafka brokers to obtain the hostnames.
    6. Access the Flink Manager Node through CLI with your workload username and password.
      ssh <your_workload_username>@<manager_node_FQDN>
      Password:<your_workload_password>
    7. Add the Kafka hostnames to kafka.bootstrap.servers.
  3. Create a topic in SMM.
    1. Go to Management Console > Data Hub Clusters.
    2. Search for your Streams Messaging cluster.
    3. Open Streams Messaging Messenger (SMM) from the list of Services.
    4. Go to Topics > Add new.
    5. Provide information for Topic Name, Partitions, Availability, and Limits.

      Create a topic with 16 partitions and name it flink.tutorial.transaction.log to comply with the Stateful Flink Application Tutorial.

    6. Access the Flink Manager Node through CLI with your workload username and password.
    7. Add the name of the created Kafka topic to the Flink job properties file.
  4. Start generating data to your target Kafka topic.
    Submit the Data Generator job from the Stateful Flink Application Tutorial.
    flink run -d -p 2 -ys 2 -ynm DataGenerator -c com.cloudera.streaming.examples.flink.KafkaDataGeneratorJob target/flink-stateful-tutorial-1.2-SNAPSHOT.jar config/job.properties
  5. Open SMM and check if the data is generated.

    After waiting a few minutes, you can notice the Data in count increases as the generated data arrives to the Kafka topic. You can check the incoming data in the Data explorer tab.

  6. Deploy your Flink streaming application.
    Submit the Transaction Job from the Stateful Flink Application Tutorial.
    flink run -d -p 8 -ys 4 -ytm 1500 -ynm TransactionProcessor target/flink-stateful-tutorial-1.2-SNAPSHOT.jar config/job.properties
  7. Open SMM and check the log of your application.
    You can notice how the query data results are shown in SMM.

You have the following options to monitor and manage your Flink applications: