Kafka Logging

Cloudera Streaming Analytics include a Kafka log appender to provide a production grade solution. You can use Kafka logging to have a scalable storage layer for the logs, and you can also integrate with other logging applications with more simpler solutions.

By default, Flink logs are directed to files that can be viewed on the Flink GUI independently for each container. This solution is a best practice for a YARN application defaults, but long running production applications are lacking this function.

The log appender in the Flink parcel collects the logs into Kafka topics in a JSON format that is designed for downstream consumption in an enterprise log aggregation framework.

There are several benefits of storing the logs in Kafka:
  • Provides a scalable storage layer for the logs
  • Integrates easily with existing applications that has a simple logger configuration

To enable Kafka based logging, include the following log configuration in the Cloudera Manager Flink configuration page:

# Enable both file and kafka based logging
rootLogger.appenderRefs=file, kafka
rootLogger.appenderRef.kafka.ref=KafkaAppender

appender.kafka.name=KafkaAppender
appender.kafka.type=Kafka
appender.kafka.topic=flink.logs

# Log layout configuration
appender.kafka.layout.type=JSONLayout
appender.kafka.layout.yarnContainerId.type=KeyValuePair
appender.kafka.layout.yarnContainerId.key=yarnContainerId
appender.kafka.layout.yarnContainerId.value=${sys:yarnContainerId}

# Kafka configuration
appender.kafka.bootstrapServers.type=Property
appender.kafka.bootstrapServers.name=bootstrap.servers
appender.kafka.bootstrapServers.value=<kafka_broker>:9092

With this configuration, logs are written to the flink.logs topic in JSON format. The topic contains an extra field with the YARN container identifier for easier log separation.

These additional logging configurations are added to the default Flink log4j.properties file. Any duplicate key overrides the previously configured values in the file.

When set up correctly the resulting logs should be similar to the following:
{
  "instant" : {
	"epochSecond" : 1617121475,
	"nanoOfSecond" : 347000000
  },
  "thread" : "Legacy Source Thread - Source: Heap Monitor Source -> (Sink: Print to Std. Out, Create Alerts -> Sink: Logger Sink) (1/1)#0",
  "level" : "INFO",
  "loggerName" : "com.cloudera.streaming.examples.flink.LogSink",
  "message" : "HeapAlert{message='42 was found in the HeapMetrics ratio.', triggeringStats=HeapMetrics{area=PS Old Gen, used=19907592, max=465567744, ratio=0.04275981799976246, jobId=0, hostname='<flink-host>'}}",
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.slf4j.Log4jLogger",
  "threadId" : 71,
  "threadPriority" : 5,
  "yarnContainerId" : "container_1617025719919_0004_01_000002"
}

Security

In a secure environment with Kerberos and TLS enabled, add the following extra parameters:
appender.kafka.secProp1.type=Property
appender.kafka.secProp1.name=security.protocol
appender.kafka.secProp1.value=SASL_SSL
appender.kafka.secProp2.type=Property
appender.kafka.secProp2.name=sasl.mechanism
appender.kafka.secProp2.value=GSSAPI
appender.kafka.secProp3.type=Property
appender.kafka.secProp3.name=sasl.kerberos.service.name
appender.kafka.secProp3.value=kafka
appender.kafka.secProp4.type=Property
appender.kafka.secProp4.name=ssl.truststore.location
appender.kafka.secProp4.value=/samePathOnAllNodes/truststore.jks
appender.kafka.secProp5.type=Property
appender.kafka.secProp5.name=ssl.truststore.password
appender.kafka.secProp5.value=<truststore_password>
appender.kafka.secProp6.type=Property
appender.kafka.secProp6.name=sasl.jaas.config
appender.kafka.secProp6.value=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab_file_name>" principal="<user_name>";
Furthermore, by default the secure Kafka broker is on port 9093, you also need to change the port at the Kafka log appender configuration:
appender.kafka.bootstrapServers.value=<kafka_broker>:9093

Log configuration file

As an alternative, you can also include the log configuration file when you run a Flink job. It has to be added as a -yD argument:
flink run
  ... 
  -yD logging.configuration.file=<path_to_log_config_file>
  ...
This way you can use XML configuration as well, while Cloudera Manager only supports the properties format.