Kafka Broker Settings
The following subsections describe configuration settings that influence the performance of Kafka brokers.
Connection Settings
Review the following connection setting in the Advanced kafka-broker category, and modify as needed:
zookeeper.session.timeout.ms
Specifies ZooKeeper session timeout, in milliseconds. The default value is 30000 ms.
If the server fails to signal heartbeat to ZooKeeper within this period of time, the server is considered to be dead. If you set this value too low, the server might be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server.
If you see frequent disconnection from the ZooKeeper server, review this setting. If long garbage collection pauses cause Kafka to lose its ZooKeeper session, you might need to configure longer timeout values.
- advertised.listeners
If you have manually set listeners to
advertised.listeners=PLAINTEXT://$HOSTNAME:$PORT
, after enabling Kerberos, change the listener configuration toadvertised.listeners= SASL_PLAINTEXT://$HOSTNAME:$PORT
.
Important | |
---|---|
Do not change the following connection settings:
|
Topic Settings
For each topic, Kafka maintains a structured commit log with one or more partitions. These topic partitions form the basic unit of parallelism in Kafka. In general, the more partitions there are in a Kafka cluster, the more parallel consumers can be added, resulting in higher throughput.
You can calculate the number of partitions based on your throughput requirements. If throughput from a producer to a single partition is P and throughput from a single partition to a consumer is C, and if your target throughput is T, the minimum number of required partitions is
max (T/P, T/C)
.
Note also that more partitions can increase latency:
End-to-end latency in Kafka is defined as the difference in time from when a message is published by the producer to when the message is read by the consumer.
Kafka only exposes a message to a consumer after it has been committed, after the message is replicated to all in-sync replicas.
Replication of one thousand partitions from one broker to another can take up 20ms. This is too long for some real-time applications.
In the new Kafka producer, messages are accumulated on the producer side; producers buffer the message per partition. This approach allows users to set an upper bound on the amount of memory used for buffering incoming messages. After enough data is accumulated or enough time has passed, accumulated messages are removed and sent to the broker. If you define more partitions, messages are accumulated for more partitions on the producer side.
Similarly, the consumer fetches batches of messages per partition. Consumer memory requirements are proportional to the number of partitions that the consumer subscribes to.
Important Topic Properties
Review the following settings in the Advanced kafka-broker category, and modify as needed:
auto.create.topics.enable
Enable automatic creation of topics on the server. If this property is set to true, then attempts to produce, consume, or fetch metadata for a nonexistent topic automatically create the topic with the default replication factor and number of partitions. The default is
enabled
.default.replication.factor
Specifies default replication factors for automatically created topics. For high availability production systems, you should set this value to at least 3.
num.partitions
Specifies the default number of log partitions per topic, for automatically created topics. The default value is 1. Change this setting based on the requirements related to your topic and partition design.
delete.topic.enable
Allows users to delete a topic from Kafka using the admin tool, for Kafka versions 0.9 and later. Deleting a topic through the admin tool will have no effect if this setting is turned off.
By default this feature is turned off (set to
false
).
Log Settings
Review the following settings in the Kafka Broker category, and modify as needed:
log.roll.hours
The maximum time, in hours, before a new log segment is rolled out. The default value is 168 hours (seven days).
This setting controls the period of time after which Kafka will force the log to roll, even if the segment file is not full. This ensures that the retention process is able to delete or compact old data.
log.retention.hours
The number of hours to keep a log file before deleting it. The default value is 168 hours (seven days).
When setting this value, take into account your disk space and how long you would like messages to be available. An active consumer can read quickly and deliver messages to their destination.
The higher the retention setting, the longer the data will be preserved. Higher settings generate larger log files, so increasing this setting might reduce your overall storage capacity.
log.dirs
A comma-separated list of directories in which log data is kept. If you have multiple disks, list all directories under each disk.
Review the following setting in the Advanced kafka-broker category, and modify as needed:
log.retention.bytes
The amount of data to retain in the log for each topic partition. By default, log size is unlimited.
Note that this is the limit for each partition, so multiply this value by the number of partitions to calculate the total data retained for the topic.
If
log.retention.hours
andlog.retention.bytes
are both set, Kafka deletes a segment when either limit is exceeded.log.segment.bytes
The log for a topic partition is stored as a directory of segment files. This setting controls the maximum size of a segment file before a new segment is rolled over in the log. The default is 1 GB.
Log Flush Management
Kafka writes topic messages to a log file immediately upon receipt, but the data is initially buffered in page cache. A log flush forces Kafka to flush topic messages from page cache, writing the messages to disk.
We recommend using the default flush settings, which rely on background flushes done by Linux and Kafka. Default settings provide high throughput and low latency, and they guarantee recovery through the use of replication.
If you decide to specify your own flush settings, you can force a flush after a period of time, or after a specified number of messages, or both (whichever limit is reached first). You can set property values globally and override them on a per-topic basis.
There are several important considerations related to log file flushing:
Durability: unflushed data is at greater risk of loss in the event of a crash. A failed broker can recover topic partitions from its replicas, but if a follower does not issue a fetch request or consume from the leader's log-end offset within the time specified by
replica.lag.time.max.ms
(which defaults to 10 seconds), the leader removes the follower from the in-sync replica ("ISR"). When this happens there is a slight chance of message loss if you do not explicitly setlog.flush.interval.messages
. If the leader broker fails and the follower is not caught up with the leader, the follower can still be under ISR for those 10 seconds and messages during leader transition to follower can be lost.Increased latency: data is not available to consumers until it is flushed (the
fsync
implementation in most Linux filesystems blocks writes to the file system).Throughput: a flush operation is typically an expensive operation.
Disk usage patterns are less efficient.
Page-level locking in background flushing is much more granular.
log.flush.interval.messages
specifies the number of messages to
accumulate on a log partition before Kafka forces a flush of data to
disk.
log.flush.scheduler.interval.ms
specifies the amount of time (in
milliseconds) after which Kafka checks to see if a log needs to be flushed to disk.
log.segment.bytes
specifies the size of the log file. Kafka flushes
the log file to disk whenever a log file reaches its maximum size.
log.roll.hours
specifies the maximum length of time before a new log
segment is rolled out (in hours); this value is secondary to
log.roll.ms
. Kafka flushes the log file to disk whenever a log file
reaches this time limit.
Compaction Settings
Review the following settings in the Advanced kafka-broker category, and modify as needed:
log.cleaner.dedupe.buffer.size
Specifies total memory used for log deduplication across all cleaner threads.
By default, 128 MB of buffer is allocated. You may want to review this and other
log.cleaner
configuration values, and adjust settings based on your use of compacted topics (__consumer_offsets
and other compacted topics).log.cleaner.io.buffer.size
Specifies the total memory used for log cleaner I/O buffers across all cleaner threads. By default, 512 KB of buffer is allocated. You may want to review this and other
log.cleaner
configuration values, and adjust settings based on your usage of compacted topics (__consumer_offsets
and other compacted topics).
General Broker Settings
Review the following settings in the Advanced kafka-broker category, and modify as needed:
auto.leader.rebalance.enable
Enables automatic leader balancing. A background thread checks and triggers leader balancing (if needed) at regular intervals. The default is
enabled
.unclean.leader.election.enable
This property allows you to specify a preference of availability or durability. This is an important setting: If availability is more important than avoiding data loss, ensure that this property is set to
true
. If preventing data loss is more important than availability, set this property tofalse
.This setting operates as follows:
If
unclean.leader.election.enable
is set totrue
(enabled), an out-of-sync replica will be elected as leader when there is no live in-sync replica (ISR). This preserves the availability of the partition, but there is a chance of data loss.If
unclean.leader.election.enable
is set tofalse
and there are no live in-sync replicas, Kafka returns an error and the partition will be unavailable.
This property is set to
true
by default, which favors availability.If durability is preferable to availability, set
unclean.leader.election
tofalse
.controlled.shutdown.enable
Enables controlled shutdown of the server. The default is
enabled
.min.insync.replicas
When a producer sets
acks
to "all",min.insync.replicas
specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception.When used together,
min.insync.replicas
and produceracks
allow you to enforce stronger durability guarantees.You should set
min.insync.replicas
to 2 for replication factor equal to 3.message.max.bytes
Specifies the maximum size of message that the server can receive. It is important that this property be set with consideration for the maximum fetch size used by your consumers, or a producer could publish messages too large for consumers to consume.
Note that there are currently two versions of consumer and producer APIs. The value of
message.max.bytes
must be smaller than themax.partition.fetch.bytes
setting in the new consumer, or smaller than thefetch.message.max.bytes
setting in the old consumer. In addition, the value must be smaller thanreplica.fetch.max.bytes
.replica.fetch.max.bytes
Specifies the number of bytes of messages to attempt to fetch. This value must be larger than
message.max.bytes
.broker.rack
The rack awareness feature distributes replicas of a partition across different racks. You can specify that a broker belongs to a particular rack through the "Custom kafka-broker" menu option. For more information about the rack awareness feature, see http://kafka.apache.org/documentation.html#basic_ops_racks.