Using Kafka with Flume

In CDH 5.2 and higher, Flume contains a Kafka source and sink. Use these to stream data from Kafka to Hadoop or from any Flume source to Kafka.

In CDH 5.7 and higher, the Flume connector to Kafka only works with Kafka 2.0 and higher.

This topic describes how to configure Kafka sources, sinks, and channels:

Kafka Source

Use the Kafka source to stream data in Kafka topics to Hadoop. The Kafka source can be combined with any Flume sink, making it easy to write Kafka data to HDFS, HBase, and Solr.

The following Flume configuration example uses a Kafka source to send data to an HDFS sink:

 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1
 
 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = zk01.example.com:2181
 tier1.sources.source1.topic = weblogs
 tier1.sources.source1.groupId = flume
 tier1.sources.source1.channels = channel1
 tier1.sources.source1.interceptors = i1
 tier1.sources.source1.interceptors.i1.type = timestamp
 tier1.sources.source1.kafka.consumer.timeout.ms = 100
 
 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000
 
 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
 tier1.sinks.sink1.hdfs.rollInterval = 5
 tier1.sinks.sink1.hdfs.rollSize = 0
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel = channel1

For higher throughput, configure multiple Kafka sources to read from the same topic. If you configure all the sources with the same groupID, and the topic contains multiple partitions, each source reads data from a different set of partitions, improving the ingest rate.

The following table describes parameters that the Kafka source supports. Required properties are listed in bold.

Kafka Source Properties
Property Name Default Value Description
type   Must be set to org.apache.flume.source.kafka.KafkaSource.
zookeeperConnect   The URI of the ZooKeeper server or quorum used by Kafka. This can be a single host (for example, zk01.example.com:2181) or a comma-separated list of hosts in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181).
topic   The Kafka topic from which this source reads messages. Flume supports only one topic per source.
groupID flume The unique identifier of the Kafka consumer group. Set the same groupID in all sources to indicate that they belong to the same consumer group.
batchSize 1000 The maximum number of messages that can be written to a channel in a single batch.
batchDurationMillis 1000 The maximum time (in ms) before a batch is written to the channel. The batch is written when the batchSize limit or batchDurationMillis limit is reached, whichever comes first.
Other properties supported by the Kafka consumer   Used to configure the Kafka consumer used by the Kafka source. You can use any consumer properties supported by Kafka. Prepend the consumer property name with the prefix kafka. (for example, kafka.fetch.min.bytes). See the Kafka documentation for the full list of Kafka consumer properties.

Tuning Notes

The Kafka source overrides two Kafka consumer parameters:
  1. auto.commit.enable is set to false by the source, and every batch is committed. For improved performance, set this to true using the kafka.auto.commit.enable setting. This can lead to data loss if the source goes down before committing.
  2. consumer.timeout.ms is set to 10, so when Flume polls Kafka for new data, it waits no more than 10 ms for the data to be available. Setting this to a higher value can reduce CPU utilization due to less frequent polling, but introduces latency in writing batches to the channel.

Kafka Sink

Use the Kafka sink to send data to Kafka from a Flume source. You can use the Kafka sink in addition to Flume sinks such as HBase or HDFS.

The following Flume configuration example uses a Kafka sink with an exec source:

 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1
 
 tier1.sources.source1.type = exec
 tier1.sources.source1.command = /usr/bin/vmstat 1
 tier1.sources.source1.channels = channel1
 
 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000
 
 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
 tier1.sinks.sink1.topic = sink1
 tier1.sinks.sink1.brokerList = kafka01.example.com:9092,kafka02.example.com:9092
 tier1.sinks.sink1.channel = channel1
 tier1.sinks.sink1.batchSize = 20

The following table describes parameters the Kafka sink supports. Required properties are listed in bold.

Kafka Sink Properties
Property Name Default Value Description
type   Must be set to org.apache.flume.sink.kafka.KafkaSink.
brokerList   The brokers the Kafka sink uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.
topic default-flume-topic The Kafka topic to which messages are published by default. If the event header contains a topic field, the event is published to the designated topic, overriding the configured topic.
batchSize 100 The number of messages to process in a single batch. Specifying a larger batchSize can improve throughput and increase latency.
request.required.acks 0 The number of replicas that must acknowledge a message before it is written successfully. Possible values are 0 (do not wait for an acknowledgement), 1 (wait for the leader to acknowledge only), and -1 (wait for all replicas to acknowledge). To avoid potential loss of data in case of a leader failure, set this to -1.
Other properties supported by the Kafka producer   Used to configure the Kafka producer used by the Kafka sink. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties.

The Kafka sink uses the topic and key properties from the FlumeEvent headers to determine where to send events in Kafka. If the header contains the topic property, that event is sent to the designated topic, overriding the configured topic. If the header contains the key property, that key is used to partition events within the topic. Events with the same key are sent to the same partition. If the key parameter is not specified, events are distributed randomly to partitions. Use these properties to control the topics and partitions to which events are sent through the Flume source or interceptor.

Kafka Channel

CDH 5.3 and higher includes a Kafka channel to Flume in addition to the existing memory and file channels. You can use the Kafka channel:
  • To write to Hadoop directly from Kafka without using a source.
  • To write to Kafka directly from Flume sources without additional buffering.
  • As a reliable and highly available channel for any source/sink combination.
The following Flume configuration uses a Kafka channel with an exec source and hdfs sink:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = exec
tier1.sources.source1.command = /usr/bin/vmstat 1
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList = kafka02.example.com:9092,kafka03.example.com:9092
tier1.channels.channel1.topic = channel2
tier1.channels.channel1.zookeeperConnect = zk01.example.com:2181
tier1.channels.channel1.parseAsFlumeEvent = true

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

The following table describes parameters the Kafka channel supports. Required properties are listed in bold.

Kafka Channel Properties
Property Name Default Value Description
type   Must be set to org.apache.flume.channel.kafka.KafkaChannel.
brokerList   The brokers the Kafka channel uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.
zookeeperConnect   The URI of the ZooKeeper server or quorum used by Kafka. This can be a single host (for example, zk01.example.com:2181) or a comma-separated list of hosts in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181).
topic flume-channel The Kafka topic the channel will use.
groupID flume The unique identifier of the Kafka consumer group the channel uses to register with Kafka.
parseAsFlumeEvent true Set to true if a Flume source is writing to the channel and expects AvroDataums with the FlumeEvent schema (org.apache.flume.source.avro.AvroFlumeEvent) in the channel. Set to false if other producers are writing to the topic that the channel is using.
readSmallestOffset false If true, reads all data in the topic. If false, reads only data written after the channel has started. Only used when parseAsFlumeEvent is false.
kafka.consumer.timeout.ms 100 Polling interval when writing to the sink.
Other properties supported by the Kafka producer   Used to configure the Kafka producer. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties.