Kafka Logging

Cloudera Streaming Analytics include a Kafka log appender to provide a production grade solution. You can use Kafka logging to have a scalable storage layer for the logs, and you can also integrate with other logging applications with more simpler solutions.

By default, Flink logs are directed to files that can be viewed on the Flink GUI independently for each container. This solution is a best practice for a YARN application defaults, but long running production applications are lacking this function.

The log appender in the Flink parcel collects the logs into Kafka topics in a JSON format that is designed for downstream consumption in an enterprise log aggregation framework.

There are several benefits of storing the logs in Kafka:
  • Provides a scalable storage layer for the logs
  • Integrates easily with existing applications that has a simple logger configuration

To enable Kafka based logging, include the following log configuration in the Cloudera Manager Flink configuration page:

# Enable both file and kafka based logging
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

With this configuration, logs are written to the flink.logs topic with a JSON format. The topic contains an extra field with the YARN container identifier for easier log separation. These additional logging configurations are added to the log4j.properties Flink default file. Any duplicate key overrides the previously configured values in the file.

When set up correctly the resulting logs should be similar to the following:
{ "source_host": "<flink_host>",
  "method": "completePendingCheckpoint",
  "level": "INFO",
  "message": "Completed checkpoint 1 for job 5e70cf704ed010372e2007333db10cf0 (50738 bytes in 2721 ms).",
  "mdc": {},
  "yarnContainerId": "container_1571051884501_0001_01_000001",
  "@timestamp": "2019-10-14T11:21:07.400Z",
  "file": "CheckpointCoordinator.java",
  "line_number": "906",
  "thread_name": "jobmanager-future-thread-1",
  "@version": 1,
  "logger_name":    "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
  "class": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator"
}

Security

In a secure environment with Kerberos and TLS enabled, add the following extra parameters:
log4j.appender.kafka.securityProtocol=SASL_SSL
log4j.appender.kafka.saslKerberosServiceName=kafka
log4j.appender.kafka.sslTruststoreLocation=/samePathOnAllNodes/truststore.jks
log4j.appender.kafka.clientJaasConfPath=kafka.jaas.conf
Also provide the following kafka.jaas.conf file and ship it to the cluster with the -yt kafka.jaas.conf parameter for the flink run command.
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="krb5.keytab"
  principal="<user_name>";
};