Kafka Logging
Cloudera Streaming Analytics include a Kafka log appender to provide a production grade solution. You can use Kafka logging to have a scalable storage layer for the logs, and you can also integrate with other logging applications with more simpler solutions.
By default, Flink logs are directed to files that can be viewed on the Flink GUI independently for each container. This solution is a best practice for a YARN application defaults, but long running production applications are lacking this function.
The log appender in the Flink parcel collects the logs into Kafka topics in a JSON format that is designed for downstream consumption in an enterprise log aggregation framework.
- Provides a scalable storage layer for the logs
- Integrates easily with existing applications that has a simple logger configuration
To enable Kafka based logging, include the following log configuration in the Cloudera Manager Flink configuration page:
# Enable both file and kafka based logging
rootLogger.appenderRefs=file, kafka
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.name=KafkaAppender
appender.kafka.type=Kafka
appender.kafka.topic=flink.logs
# Log layout configuration
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.yarnContainerId.type=KeyValuePair
appender.kafka.layout.yarnContainerId.key=yarnContainerId
appender.kafka.layout.yarnContainerId.value=${sys:yarnContainerId}
# Kafka configuration
appender.kafka.bootstrapServers.type=Property
appender.kafka.bootstrapServers.name=bootstrap.servers
appender.kafka.bootstrapServers.value=<kafka_broker>:9092
With this configuration, logs are written to the flink.logs
topic in JSON
format. The topic contains an extra field with the YARN container identifier for easier log
separation.
These additional logging configurations are added to the default Flink
log4j.properties
file. Any duplicate key overrides the previously
configured values in the file.
{
"instant" : {
"epochSecond" : 1617121475,
"nanoOfSecond" : 347000000
},
"thread" : "Legacy Source Thread - Source: Heap Monitor Source -> (Sink: Print to Std. Out, Create Alerts -> Sink: Logger Sink) (1/1)#0",
"level" : "INFO",
"loggerName" : "com.cloudera.streaming.examples.flink.LogSink",
"message" : "HeapAlert{message='42 was found in the HeapMetrics ratio.', triggeringStats=HeapMetrics{area=PS Old Gen, used=19907592, max=465567744, ratio=0.04275981799976246, jobId=0, hostname='<flink-host>'}}",
"endOfBatch" : false,
"loggerFqcn" : "org.apache.logging.slf4j.Log4jLogger",
"threadId" : 71,
"threadPriority" : 5,
"yarnContainerId" : "container_1617025719919_0004_01_000002"
}
Security
appender.kafka.secProp1.type=Property
appender.kafka.secProp1.name=security.protocol
appender.kafka.secProp1.value=SASL_SSL
appender.kafka.secProp2.type=Property
appender.kafka.secProp2.name=sasl.mechanism
appender.kafka.secProp2.value=GSSAPI
appender.kafka.secProp3.type=Property
appender.kafka.secProp3.name=sasl.kerberos.service.name
appender.kafka.secProp3.value=kafka
appender.kafka.secProp4.type=Property
appender.kafka.secProp4.name=ssl.truststore.location
appender.kafka.secProp4.value=/samePathOnAllNodes/truststore.jks
appender.kafka.secProp5.type=Property
appender.kafka.secProp5.name=ssl.truststore.password
appender.kafka.secProp5.value=<truststore_password>
appender.kafka.secProp6.type=Property
appender.kafka.secProp6.name=sasl.jaas.config
appender.kafka.secProp6.value=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab_file_name>" principal="<user_name>";
appender.kafka.bootstrapServers.value=<kafka_broker>:9093
Log configuration file
-yD
argument:flink run
...
-yD logging.configuration.file=<path_to_log_config_file>
...