Running your Flink application
After preparing your environment, you need to connect Kafka to Flink, and create Kafka topics where the input and output data is messaged. When Kafka is ready, you need to generate data to your Kafka topic and let Flink apply the computations you have added in your application design.
-
Create your streaming Flink application with Kafka as source and sink.
The Stateful Tutorial has a detailed guide how to create your application using Kafka as source and sink. For more information, see the Setting up Kafka inputs and outputs section. -
Build your Flink project with maven after creating the application logic.
mvn clean package
-
Upload your Flink project to the Streaming Analytics cluster.
scp <location>/flink-stateful-tutorial-1.2-SNAPSHOT.jar <your_workload_username>@<manager_node_FQDN>:. Password:<your_workload_password>
-
Upload your keytab file to the Streaming Analytics cluster, if you have not
uploaded it yet.
scp <location>/<your_keytab_file> <your_workload_username>@<manager_node_FQDN>:. Password:<your_workload_password>
-
Configure the Flink job properties to securely connect to the Streams Messaging
cluster.
Using the job.properties
file from the Stateful Tutorial, replace your kafka brokers and trustore location. The input and output topics have "flink" as a prefix in the following example to comply with the set Ranger policies.kafka.bootstrap.servers=<your_kafka_broker>:9093,<your_kafka_broker1>:9093,<your_kafka_broker2>:9093 kafka.group.id=flink kafka.flink.partition-discovery.interval-millis=60000 kafka.retries=3 transaction.input.topic=flink.transaction.log.1 generate.queries=false query.input.topic=flink.query.input.log.1 query.output.topic=flink.query.output.log.1 num.items=1000000 sleep=100 kafka.security.protocol=SASL_SSL kafka.sasl.kerberos.service.name=kafka kafka.ssl.truststore.location=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks
To retrieve the Kafka broker hostnames:- Navigate to Management Console > Environments, and select the environment where you have created your cluster.
- Click on the Streams Messaging cluster.
- Click Hardware.
- Search for the Kafka brokers.
- Click the copy icon next to the Kafka brokers to obtain the hostnames.
-
Upload the Flink job properties file to the Streaming Analytics cluster.
scp <location>/config/job.properties <your_workload_username>@<manager_node_FQDN>:. Password:<your_workload_password>
-
Use
ls
command to list and check if you have every necessary file on your Streaming Analytics cluster.After listing the files on your cluster, you should be able to see your keytab file, the Stateful Tutorial jar file and the
job.properties
file. -
Create your topics in Streams Messaging Manager (SMM).
- Go to Management Console > Data Hub Clusters.
- Search for your Streams Messaging cluster.
- Open SMM from the list of Services.
- Go to Topics > Add new.
-
Provide information for Topic Name, Partitions, Availability, and
Limits.
Create three topics with 16 partitions for each. You need to name them
flink.tutorial.transaction.log
,flink.tutorial.query.input.log
, andflink.tutorial.query.input.log
to comply with the Stateful Flink Application Tutorial.
-
Start generating data to your target Kafka topic.
Submit the Data Generator job from the Stateful Flink Application Tutorial. flink run -yD security.kerberos.login.keytab=<your_keytab_filename> \ -yD security.kerberos.login.principal=<csso_name> \ -m yarn-cluster -d -p 2 -ys 2 -ynm Datagenerator \ -c com.cloudera.streaming.examples.flink.KafkaDataGeneratorJob \ flink-stateful-tutorial-1.2-SNAPSHOT.jar job.properties
-
Open SMM and check if the data is generated.
After waiting a few minutes, you can notice the Data in count increases as the generated data arrives to the Kafka topic. You can check the incoming data in the Data explorer tab. -
Deploy your Flink streaming application.
Submit the Transaction Job from the Stateful Flink Application Tutorial. flink run -yD security.kerberos.login.keytab=<your_keytab_filename> \ -yD security.kerberos.login.principal=<csso_name> \ -m yarn-cluster -d -p 2 -ys 2 -ynm TransactionProcessor \ flink-stateful-tutorial-1.2-SNAPSHOT.jar job.properties
-
Open SMM and check the log of your application.
You can notice how the query data results are shown in SMM.