Kafka Logging
Cloudera Streaming Analytics include a Kafka log appender to provide a production grade solution. You can use Kafka logging to have a scalable storage layer for the logs, and you can also integrate with other logging applications with more simpler solutions.
By default, Flink logs are directed to files that can be viewed on the Flink GUI independently for each container. This solution is a best practice for a YARN application defaults, but long running production applications are lacking this function.
The log appender in the Flink parcel collects the logs into Kafka topics in a JSON format that is designed for downstream consumption in an enterprise log aggregation framework.
- Provides a scalable storage layer for the logs
- Integrates easily with existing applications that has a simple logger configuration
To enable Kafka based logging, include the following log configuration in the Cloudera Manager Flink configuration page:
# Enable both file and kafka based logging
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
With this configuration, logs are written to the flink.logs
topic with a JSON
format. The topic contains an extra field with the YARN container identifier for easier log
separation. These additional logging configurations are added to the
log4j.properties
Flink default file. Any duplicate key overrides the
previously configured values in the file.
{ "source_host": "<flink_host>",
"method": "completePendingCheckpoint",
"level": "INFO",
"message": "Completed checkpoint 1 for job 5e70cf704ed010372e2007333db10cf0 (50738 bytes in 2721 ms).",
"mdc": {},
"yarnContainerId": "container_1571051884501_0001_01_000001",
"@timestamp": "2019-10-14T11:21:07.400Z",
"file": "CheckpointCoordinator.java",
"line_number": "906",
"thread_name": "jobmanager-future-thread-1",
"@version": 1,
"logger_name": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
"class": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator"
}
Security
log4j.appender.kafka.securityProtocol=SASL_SSL
log4j.appender.kafka.saslKerberosServiceName=kafka
log4j.appender.kafka.sslTruststoreLocation=/samePathOnAllNodes/truststore.jks
log4j.appender.kafka.clientJaasConfPath=kafka.jaas.conf
kafka.jaas.conf
file and ship it to the cluster
with the -yt kafka.jaas.conf
parameter for the flink run
command.KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="krb5.keytab"
principal="<user_name>";
};