Kafka Administration Using Command Line Tools
Continue reading:
Unsupported Command Line Tools
The following tools can be found as part of the Kafka distribution, but their use is generally discouraged for various reasons as documented here.
Tool | Notes |
---|---|
connect-distributed
connect-standalone |
Kafka Connect is currently not supported. |
kafka-acls | Cloudera recommends using Sentry to manage ACLs instead of this tool. |
kafka-broker-api-versions | Primarily useful for Client-to-Broker protocol related development. |
kafka-configs | Use Cloudera Manager to adjust any broker or security properties instead of the kafka‑configs tool. This tool should only be used to modify topic properties. |
kafka-delete-records | Do not use with CDH. |
kafka-log-dirs | For querying log directory message. You can use the Cloudera Manager console instead: |
kafka-mirror-maker | Use Cloudera Manager to create any CDH Mirror Maker instance. |
kafka-preferred-replica-election | This tool causes leadership for each partition to be transferred back to the 'preferred replica'. It can be used to balance leadership among the servers. It is recommended to use kafka-reassign-partitions instead of kafka-preferred-replica-election. |
kafka-replay-log-producer | Can be used to “rename” a topic. |
kafka-replica-verification | Validates that all replicas for a set of topics have the same data. This tool is a “heavy duty” version of the ISR column of kafka-topics tool. |
kafka-server-start
kafka-server-stop |
Use Cloudera Manager to manage any Kafka host. |
kafka-simple-consumer-shell | Deprecated in Apache Kafka. |
kafka-streams-application-reset | Kafka Streams is currently not supported. |
kafka-verifiable-consumer
kafka-verifiable-producer |
These scripts are intended for system testing. |
zookeeper-security-migration
zookeeper-server-start zookeeper-server-stop |
Use Cloudera Manager to manage any Zookeeper host. |
zookeeper-shell | Limit usage of this script to reading information from Zookeeper. |
Notes on Kafka CLI Administration
Here are some additional points to be aware of regarding Kafka administration:
- Use Cloudera Manager to start and stop Kafka and Zookeeper services. Do not use the kafka-server-start, kafka-server-stop, zookeeper-server-start, or zookeeper-server-stop commands.
- For a parcel installation, all Kafka command line tools are located in /opt/cloudera/parcels/KAFKA/lib/kafka/bin/. For a package installation, all such tools can be found in /usr/bin/.
- Ensure that the JAVA_HOME environment variable is set to your JDK installation directory before using the command-line tools. For example:
export JAVA_HOME=/usr/java/jdk1.8.0_144-cloudera
- Using any Zookeeper command manually can be very difficult to get right when it comes to interaction with Kafka. Cloudera recommends that you avoid doing any write operations or ACL modifications in Zookeeper.
kafka-topics
Use the kafka-topics tool to generate a snapshot of topics in the Kafka cluster.
kafka-topics --zookeeper zkhost --describe
Topic: topic-a1 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-a1 Partition: 0 Leader: 64 Replicas: 64,62,63 Isr: 64,62,63 Topic: topic-a1 Partition: 1 Leader: 62 Replicas: 62,63,64 Isr: 62,63,64 Topic: topic-a1 Partition: 2 Leader: 63 Replicas: 63,64,62 Isr: 63,64,62 Topic: topic-a2 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic-a2 Partition: 0 Leader: 64 Replicas: 64,62,63 Isr: 64,62,63
The output lists each topic and basic partition information. Note the following about the output:
- Partition count: The more partitions, the higher the possible parallelism among consumers and producers.
- Replication factor: Shows 1 for no redundancy and higher for more redundancy.
- Replicas and in-sync replicas (ISR): Shows which broker ID’s have the partitions and which replicas are current.
There are situations where this tool shows an invalid value for the leader broker ID or the number of ISRs is fewer than the number of replicas. In those cases, there may be something wrong with those specific topics.
It is possible to change topic configuration properties using this tool. Increasing the partition count, the replication factor or both is not recommended.
kafka-configs
The kafka-configs tool allows you to set and unset properties to topics. Cloudera recommends that you use Cloudera Manager instead of this tool to change properties on brokers, because this tool bypasses any Cloudera Manager safety checks.
Setting a topic property:
kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --add-config property=value
Checking a topic property:
$ kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --describe
Unsetting a topic property:
$ kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --delete-config property
The Apache Kafka documentation includes a complete list of topic properties.
kafka-console-consumer
The kafka-console-consumer tool can be useful in a couple of ways:
- Acting as an independent consumer of particular topics. This can be useful to compare results against a consumer program that you’ve written.
- To test general topic consumption without the need to write any consumer code.
Examples of usage:
$ kafka-console-consumer --bootstrap-server <broker1>,<broker2>... --topic <topic> --from-beginning <record-earliest-offset> <record-earliest-offset+1>
Note the following about the tool:
- This tool prints all records and keeps outputting as more records are written to the topic.
- If the kafka-console-consumer tool is given no flags, it displays the full help message.
- In older versions of Kafka, it may have been necessary to use the --new-consumer flag. As of Apache Kafka version 0.10.2, this is no longer necessary.
kafka-console-producer
This tool is used to write messages to a topic. It is typically not as useful as the console consumer, but it can be useful when the messages are in a text based format. In general, the usage will be something like:
cat file | kafka-console-producer args
kafka-consumer-groups
The basic usage of the kafka-consumer-groups tool is:
kafka-consumer-groups --bootstrap-server broker1,broker2... --describe --group GROUP_ID
This tool is primarily useful for debugging consumer offset issues. The output from the tool shows the log and consumer offsets for each partition connected to the consumer group corresponding to GROUP_ID. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.
Beyond this debugging usage, there are other more advanced options to this tool:
- --execute --reset-offsets SCENARIO_OPTION: Resets the offsets for a consumer group to a particular value based on the
SCENARIO_OPTION flag given.
Valid flags for SCENARIO_OPTION are:
- --to-datetime
- --by-period
- --to-earliest
- --to-latest
- --shift-by
- --from-file
- --to-current
You will likely want to set the --topic flag to restrict this change to a specific topic or a specific set of partitions within that topic.
This tool can be used to reset all offsets on all topics. This is something you probably won’t ever want to do. It is highly recommended that you use this command carefully.
kafka-reassign-partitions
This tool allows a great deal of control over partitions in a Kafka cluster. You can do either or both of the following:
- Change the ordering of the partition assignment list. This is usually done to control leader imbalances between brokers.
- Move replicas from one broker to another. The most common usage of this is after adding new brokers to the cluster.
To reassign partitions:
- Create a list of topics you want to move.
topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
- Use the --generate option in kafka-reassign-partitions to list the distribution of partitions and replicas on your current
brokers, followed by a list of suggested locations for partitions on your new broker.
$ kafka-reassign-partitions --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "4" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,1]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,2]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } {"version":1, "partitions":[{"topic":"foo1","partition":3,"replicas":[4]}, {"topic":"foo1","partition":1,"replicas":[4]}, {"topic":"foo2","partition":2,"replicas":[4]}] }
- Revise the suggested list if required, and then save it as a JSON file.
- Use the --execute option in kafka-reassign-partitions to start the redistribution process, which can take several hours in
some cases.
kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
- Use the --verify option in kafka-reassign-partitionsto check the status of your partitions.
There are several caveats to using this command:
- It is highly recommended that you minimize the volume of replica changes. Say, instead of moving ten replicas with a single command, move two at a time in order to keep the cluster healthy.
- It is not possible to use this command to make an out-of-sync replica into the leader partition.
- Given the earlier restrictions, it is best to use this command only when all brokers and topics are healthy.
kafka-*-perf-test
The kafka-*-perf-test tool can be used in several ways. In general, it is expected that these tools should be used on a test or development cluster.
- Measuring read and/or write throughput.
- Stress testing the cluster based on specific parameters (such as message size).
- Load testing for the purpose of evaluating specific metrics or determining the impact of cluster configuration changes.
The kafka-producer-perf-test script can either create a randomly generated byte record:
kafka-producer-perf-test --topic TOPIC --record-size SIZE_IN_BYTES
or randomly read from a set of provided records:
kafka-producer-perf-test --topic TOPIC --payload-delimiter DELIMITER --payload-file INPUT_FILE
where the INPUT_FILE is a concatenated set of pre-generated messages separated by DELIMITER. This script keeps producing messages or limited based on the --num-records flag.
The kafka-consumer-perf-test is:
kafka-consumer-perf-test --broker-list host1:port1,host2:port2,... --zookeeper zk1:port1,zk2:port2,... --topic TOPIC
The flags of most interest for this command are:
- --group gid: If you run more than one instance of this test, you will want to set different ids for each instance.
- --num-fetch-threads: Defaults to 1. Increase if higher throughput testing is needed.
- --from-latest: To start consuming from the latest offset. May be needed for certain types of testing.
Enabling DEBUG or TRACE in command line scripts
cp /etc/kafka/conf/tools-log4j.properties /var/tmp sed -i -e 's/WARN/DEBUG/g' /var/tmp/tools-log4j.properties export KAFKA_OPTS="-Dlog4j.configuration=file:/var/tmp/tools-log4j.properties"
Understanding the kafka-run-class Bash Script
Almost all the provided Kafka tools eventually call the kafka-run-class script. This script is generally not called directly. However, if you are proficient with bash and want to understand certain features available in all Kafka scripts as well as some potential debugging scenarios, familiarity with the kafka-run-class script can prove highly beneficial.
For example, there are some useful environment variables that affect all the command line scripts:
- KAFKA_DEBUG allows a Java debugger to attach to the JVM launched by the particular script. Setting KAFKA_DEBUG also allows some further debugging
customization:
- JAVA_DEBUG_PORT sets the JVM debugging port.
- JAVA_DEBUG_OPTS can be used to override the default debugging arguments being passed to the JVM.
- KAFKA_HEAP_OPTS can be used to pass memory setting arguments to the JVM.
- KAFKA_JVM_PERFORMANCE_OPTS can be used to pass garbage collection flags to the JVM.