Running your Flink application

After preparing your environment, you need to connect Kafka to Flink, and create Kafka topics where the input and output data is messaged. When Kafka is ready, you need to generate data to your Kafka topic and let Flink apply the computations you have added in your application design.

  1. Create your streaming Flink application with Kafka as source and sink.
    The Stateful Tutorial has a detailed guide how to create your application using Kafka as source and sink. For more information, see the Setting up Kafka inputs and outputs section.
  2. Build your Flink project with maven after creating the application logic.
    mvn clean package
  3. Upload your Flink project to the Streaming Analytics cluster.
    scp <location>/flink-stateful-tutorial-1.2-SNAPSHOT.jar <your_workload_username>@<manager_node_FQDN>:.
    Password:<your_workload_password>
  4. Upload your keytab file to the Streaming Analytics cluster, if you have not uploaded it yet.
    scp <location>/<your_keytab_file> <your_workload_username>@<manager_node_FQDN>:.
    Password:<your_workload_password>
  5. Configure the Flink job properties to securely connect to the Streams Messaging cluster.
    Using the job.properties file from the Stateful Tutorial, replace your kafka brokers and trustore location. The input and output topics have "flink" as a prefix in the following example to comply with the set Ranger policies.
    kafka.bootstrap.servers=<your_kafka_broker>:9093,<your_kafka_broker1>:9093,<your_kafka_broker2>:9093
    kafka.group.id=flink
    kafka.flink.partition-discovery.interval-millis=60000
    kafka.retries=3
    transaction.input.topic=flink.transaction.log.1
    generate.queries=false
    query.input.topic=flink.query.input.log.1
    query.output.topic=flink.query.output.log.1
    num.items=1000000
    sleep=100
    
    kafka.security.protocol=SASL_SSL
    kafka.sasl.kerberos.service.name=kafka
    kafka.ssl.truststore.location=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks
    To retrieve the Kafka broker hostnames:
    1. Navigate to Management Console > Environments, and select the environment where you have created your cluster.
    2. Click on the Streams Messaging cluster.
    3. Click Hardware.
    4. Search for the Kafka brokers.
    5. Click the copy icon next to the Kafka brokers to obtain the hostnames.
  6. Upload the Flink job properties file to the Streaming Analytics cluster.
    scp <location>/config/job.properties <your_workload_username>@<manager_node_FQDN>:.
    Password:<your_workload_password>
  7. Use ls command to list and check if you have every necessary file on your Streaming Analytics cluster.

    After listing the files on your cluster, you should be able to see your keytab file, the Stateful Tutorial jar file and the job.properties file.

  8. Create your topics in Streams Messaging Manager (SMM).
    1. Go to Management Console > Data Hub Clusters.
    2. Search for your Streams Messaging cluster.
    3. Open SMM from the list of Services.
    4. Go to Topics > Add new.
    5. Provide information for Topic Name, Partitions, Availability, and Limits.

      Create three topics with 16 partitions for each. You need to name them flink.tutorial.transaction.log, flink.tutorial.query.input.log, and flink.tutorial.query.input.log to comply with the Stateful Flink Application Tutorial.

  9. Start generating data to your target Kafka topic.
    Submit the Data Generator job from the Stateful Flink Application Tutorial.
    flink run -yD security.kerberos.login.keytab=<your_keytab_filename> \
    -yD security.kerberos.login.principal=<csso_name> \
    -m yarn-cluster -d -p 2 -ys 2 -ynm Datagenerator \ 
    -c com.cloudera.streaming.examples.flink.KafkaDataGeneratorJob \
    flink-stateful-tutorial-1.2-SNAPSHOT.jar job.properties
  10. Open SMM and check if the data is generated.
    After waiting a few minutes, you can notice the Data in count increases as the generated data arrives to the Kafka topic. You can check the incoming data in the Data explorer tab.
  11. Deploy your Flink streaming application.
    Submit the Transaction Job from the Stateful Flink Application Tutorial.
    flink run -yD security.kerberos.login.keytab=<your_keytab_filename> \
    -yD security.kerberos.login.principal=<csso_name> \
    -m yarn-cluster -d -p 2 -ys 2 -ynm TransactionProcessor \
    flink-stateful-tutorial-1.2-SNAPSHOT.jar job.properties
  12. Open SMM and check the log of your application.
    You can notice how the query data results are shown in SMM.
You have the following options to monitor and manage your Flink applications: