Kafka Metrics Reporter
In Cloudera Streaming Analytics, Kafka Metrics Reporter is available as another monitoring solution when Kafka is used as a connector within the pipeline to retrieve metrics about your streaming performance.
{
"timestamp" : number -> millisecond timestamp of the metric record
"name" : string -> name of the metric
(e.g. numBytesOut)
"type" : string -> metric type enum: GAUGE, COUNTER, METER, HISTOGRAM
"variables" : {string => string} -> Scope variables
(e.g. {"<job_id>" : "123", "<host>" : "localhost"})
"values" : {string => number} -> Metric specific values
(e.g. {"count" : 100})
}
For more information about Metrics Reporter, see the Apache Flink documentation.
Configuration of Kafka Metrics Reporter
The Kafka metrics reporter can be configured similarly to other upstream metric reporters.
topic
: target Kafka topic where the metric records will be written at the configured intervalsbootstrap.servers
: Kafka server addresses to set up the producer
interval
: reporting interval, default value is 10 seconds, format is 60 SECONDSlog.errors
: logging of metric reporting errors, value either true or false
flink run --jobmanager yarn-cluster --detached --parallelism 2 --yarnname HeapMonitor \
-yD metrics.reporter.kafka.class=org.apache.flink.metrics.kafka.KafkaMetricsReporter \
-yD metrics.reporter.kafka.topic=metrics-topic.log \
-yD metrics.reporter.kafka.bootstrap.servers=kafka-broker:9091 \
-yD metrics.reporter.kafka.interval="60 SECONDS" \
-yD metrics.reporter.kafka.log.errors=false \
flink-simple-tutorial-1.1-SNAPSHOT.jar
flink run --jobmanager yarn-cluster --detached --parallelism 2 --yarnname HeapMonitor \
-yD security.kerberos.login.keytab=some.keytab \
-yD security.kerberos.login.principal=some_principal \
-yD metrics.reporter.kafka.class=org.apache.flink.metrics.kafka.KafkaMetricsReporter \
-yD metrics.reporter.kafka.topic=metrics-topic.log \
-yD metrics.reporter.kafka.bootstrap.servers=kafka_broker_host:9093 \
-yD metrics.reporter.kafka.interval="60 SECONDS" \
-yD metrics.reporter.kafka.log.errors=false \
-yD metrics.reporter.kafka.security.protocol=SASL_SSL \
-yD metrics.reporter.kafka.sasl.kerberos.service.name=kafka \
-yD metrics.reporter.kafka.ssl.truststore.location=truststore.jks \
flink-simple-tutorial-1.1-SNAPSHOT.jar
You can also set the metrics properties globally in Cloudera Manager using Flink
Client Advanced Configuration Snippet (Safety Valve) for
flink-conf-xml/flink-conf.xml
.
Arbitrary Kafka producer properties
The reporter supports passing arbitrary Kafka producer properties that can be used to modify the behavior, enable security, and so on. Serializer classes should not be modified as it can lead to reporting errors.
# Required configuration
metrics.reporter.kafka.class:
org.apache.flink.metrics.kafka.KafkaMetricsReporter
metrics.reporter.kafka.topic: metrics-topic.log
metrics.reporter.kafka.bootstrap.servers: broker1:9092,broker2:9092
# Optional configuration
metrics.reporter.kafka.interval: 60 SECONDS
metrics.reporter.kafka.log.errors: false
# Optional Kafka producer properties
metrics.reporter.kafka.security.protocol : SSL
metrics.reporter.kafka.ssl.truststore.location :
/var/private/ssl/kafka.client.truststore.jks