Kafka Administration Using Command Line Tools
Continue reading:
- Unsupported Command Line Tools
- Notes on Kafka CLI Administration
- kafka-topics
- kafka-configs
- kafka-console-consumer
- kafka-console-producer
- kafka-consumer-groups
- kafka-reassign-partitions
- kafka-log-dirs
- zookeeper-security-migration
- kafka-delegation-tokens
- kafka-*-perf-test
- Enabling DEBUG or TRACE in command line scripts
- Understanding the kafka-run-class Bash Script
Unsupported Command Line Tools
The following tools can be found as part of the Kafka distribution, but their use is generally discouraged for various reasons as documented here.
Tool | Notes |
---|---|
connect-distributed
connect-standalone |
Kafka Connect is currently not supported. |
kafka-acls | Cloudera recommends using Sentry to manage ACLs instead of this tool. |
kafka-broker-api-versions | Primarily useful for Client-to-Broker protocol related development. |
kafka-configs | Use Cloudera Manager to adjust any broker or security properties instead of the kafka‑configs tool. This tool should only be used to modify topic properties. |
kafka-delete-records | Do not use with CDH. |
kafka-mirror-maker | Use Cloudera Manager to create any CDH Mirror Maker instance. |
kafka-preferred-replica-election | This tool causes leadership for each partition to be transferred back to the 'preferred replica'. It can be used to balance leadership among the servers. It is recommended to use kafka-reassign-partitions instead of kafka-preferred-replica-election. |
kafka-replay-log-producer | Can be used to “rename” a topic. |
kafka-replica-verification | Validates that all replicas for a set of topics have the same data. This tool is a “heavy duty” version of the ISR column of kafka-topics tool. |
kafka-server-start
kafka-server-stop |
Use Cloudera Manager to manage any Kafka host. |
kafka-simple-consumer-shell | Deprecated in Apache Kafka. |
kafka-streams-application-reset | Kafka Streams is currently not supported. |
kafka-verifiable-consumer
kafka-verifiable-producer |
These scripts are intended for system testing. |
zookeeper-server-start zookeeper-server-stop |
Use Cloudera Manager to manage any Zookeeper host. |
zookeeper-shell | Limit usage of this script to reading information from Zookeeper. |
Notes on Kafka CLI Administration
Here are some additional points to be aware of regarding Kafka administration:
- Use Cloudera Manager to start and stop Kafka and Zookeeper services. Do not use the kafka-server-start, kafka-server-stop, zookeeper-server-start, or zookeeper-server-stop commands.
- For a parcel installation, all Kafka command line tools are located in /opt/cloudera/parcels/KAFKA/lib/kafka/bin/. For a package installation, all such tools can be found in /usr/bin/.
- Ensure that the JAVA_HOME environment variable is set to your JDK installation directory before using the command-line tools. For example:
export JAVA_HOME=/usr/java/jdk1.8.0_144-cloudera
- Using any Zookeeper command manually can be very difficult to get right when it comes to interaction with Kafka. Cloudera recommends that you avoid doing any write operations or ACL modifications in Zookeeper.
kafka-topics
Use the kafka-topics tool to generate a snapshot of topics in the Kafka cluster.
kafka-topics --zookeeper zkhost --describe
Topic: topic-a1 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-a1 Partition: 0 Leader: 64 Replicas: 64,62,63 Isr: 64,62,63 Topic: topic-a1 Partition: 1 Leader: 62 Replicas: 62,63,64 Isr: 62,63,64 Topic: topic-a1 Partition: 2 Leader: 63 Replicas: 63,64,62 Isr: 63,64,62 Topic: topic-a2 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic-a2 Partition: 0 Leader: 64 Replicas: 64,62,63 Isr: 64,62,63
The output lists each topic and basic partition information. Note the following about the output:
- Partition count: The more partitions, the higher the possible parallelism among consumers and producers.
- Replication factor: Shows 1 for no redundancy and higher for more redundancy.
- Replicas and in-sync replicas (ISR): Shows which broker ID’s have the partitions and which replicas are current.
There are situations where this tool shows an invalid value for the leader broker ID or the number of ISRs is fewer than the number of replicas. In those cases, there may be something wrong with those specific topics.
It is possible to change topic configuration properties using this tool. Increasing the partition count, the replication factor or both is not recommended.
kafka-configs
The kafka-configs tool allows you to set and unset properties to topics. Cloudera recommends that you use Cloudera Manager instead of this tool to change properties on brokers, because this tool bypasses any Cloudera Manager safety checks.
Setting a topic property:
kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --add-config property=value
Checking a topic property:
$ kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --describe
Unsetting a topic property:
$ kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --delete-config property
The Apache Kafka documentation includes a complete list of topic properties.
kafka-console-consumer
The kafka-console-consumer tool can be useful in a couple of ways:
- Acting as an independent consumer of particular topics. This can be useful to compare results against a consumer program that you’ve written.
- To test general topic consumption without the need to write any consumer code.
Examples of usage:
$ kafka-console-consumer --bootstrap-server <broker1>,<broker2>... --topic <topic> --from-beginning <record-earliest-offset> <record-earliest-offset+1>
Note the following about the tool:
- This tool prints all records and keeps outputting as more records are written to the topic.
- If the kafka-console-consumer tool is given no flags, it displays the full help message.
- In older versions of Kafka, it may have been necessary to use the --new-consumer flag. As of Apache Kafka version 0.10.2, this is no longer necessary.
kafka-console-producer
This tool is used to write messages to a topic. It is typically not as useful as the console consumer, but it can be useful when the messages are in a text based format. In general, the usage will be something like:
cat file | kafka-console-producer args
kafka-consumer-groups
The basic usage of the kafka-consumer-groups tool is:
kafka-consumer-groups --bootstrap-server broker1,broker2... --describe --group GROUP_ID
This tool is primarily useful for debugging consumer offset issues. The output from the tool shows the log and consumer offsets for each partition connected to the consumer group corresponding to GROUP_ID. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.
Beyond this debugging usage, there are other more advanced options to this tool:
- --execute --reset-offsets SCENARIO_OPTION: Resets the offsets for a consumer group to a particular value based on the
SCENARIO_OPTION flag given.
Valid flags for SCENARIO_OPTION are:
- --to-datetime
- --by-period
- --to-earliest
- --to-latest
- --shift-by
- --from-file
- --to-current
You will likely want to set the --topic flag to restrict this change to a specific topic or a specific set of partitions within that topic.
This tool can be used to reset all offsets on all topics. This is something you probably won’t ever want to do. It is highly recommended that you use this command carefully.
kafka-reassign-partitions
This tool provides substantial control over partitions in a Kafka cluster. It is mainly used to balance storage loads across brokers through the following reassignment actions:
- Change the ordering of the partition assignment list. Used to control leader imbalances between brokers.
- Reassign partitions from one broker to another. Used to expand existing clusters.
- Reassign partitions between log directories on the same broker. Used to resolve storage load imbalance among available disks in the broker.
- Reassign partitions between log directories across multiple brokers. Used to resolve storage load imbalance across multiple brokers.
- Topics-to-Move JSON
- This JSON file specifies the topics that you want to reassign. This a simple file that tells the kafka-reassign-partitions tool which partitions it
should look at when generating a proposal for the reassignment configuration. The user has to create the topics-to-move JSON file from scratch.
The format of the file is the following:
{"topics": [{"topic": "mytopic1"}, {"topic": "mytopic2"}], "version":1 }
- Reassignment Configuration JSON
- This JSON file is a configuration file that contains the parameters used in the reassignment process. This file is created by the user, however, a proposal for its contents is
generated by the tool. When the kafka-reasssign-partitions tool is executed with the --generate option, it generates a proposed
configuration which can be fine-tuned and saved as a JSON file. The file created this way is the reassignment configuration JSON. To generate a proposal, the tool requires a topics-to-move file as
input.
The format of the file is the following:
{"version":1, "partitions": [{"topic":"mytopic1","partition":3,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":2,"replicas":[6,5],"log_dirs":["any","any"]}] }
The reassignment configuration contains multiple properties that each control and specify an aspect of the configuration. The Reassignment Configuration Properties table lists each property and its description.
Property | Description |
---|---|
topic | Specifies the topic. |
partition | Specifies the partition. |
replicas | Specifies the brokers that the selected partition is assigned to. The brokers are listed in order, which means that the first broker in the list is always the leader for that partition. Change the order of brokers to resolve any leader balancing issues among brokers. Change the broker IDs to reassign partitions to different brokers. |
log_dirs | Specifies the log directory of the brokers. The log directories are listed in the same order as the brokers. By default any is specified as the log directory, which means that the broker is free to choose where it places the replica. By default, the current broker implementation selects the log directory using a round-robin algorithm. An absolute path beginning with a / can be used to explicitly set where to store the partition replica. |
- Cloudera recommends that you minimize the volume of replica changes per command instance. Instead of moving 10 replicas with a single command, move two at a time in order to save cluster resources.
- This tool cannot be used to make an out-of-sync replica into the leader partition.
- Use this tool only when all brokers and topics are healthy.
- Anticipate system growth. Redistribute the load when the system is at 70% capacity. Waiting until redistribution becomes necessary due to reaching resource limits can make the redistribution process extremely time consuming.
Tool Usage
To reassign partitions, complete the following steps:
- Create a topics-to-move JSON file that specifies the topics you want to reassign. Use the following format:
{"topics": [{"topic": "mytopic1"}, {"topic": "mytopic2"}], "version":1 }
- Generate the content for the reassignment configuration JSON with the following command:
kafka-reassign-partitions --zookeeper hostname:port --topics-to-move-json-file topics to move.json --broker-list broker 1, broker 2 --generate
Running the command lists the distribution of partition replicas on your current brokers followed by a proposed partition reassignment configuration.
Example output:
Current partition replica assignment {"version":1, "partitions": [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}] } Proposed partition reassignment configuration {"version":1, "partitions": [{"topic":"mytopic1","partition":0,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":2,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":1,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":0,"replicas":[5,4],"log_dirs":["any","any"]}] }
In this example, the tool proposed a configuration which reassigns existing partitions on broker 1, 2, and 3 to brokers 4 and 5.
- Copy and paste the proposed partition reassignment configuration into an empty JSON file.
- Review, and if required, modify the suggested reassignment configuration.
- Save the file.
- Start the redistribution process with the following command:
kafka-reassign-partitions --zookeeper hostname:port --reassignment-json-file reassignment configuration.json --bootstrap-server hostname:port --execute
The tool prints a list containing the original replica assignment and a message that reassignment has started. Example output:
Current partition replica assignment {"version":1, "partitions": [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions.
- Verify the status of the reassignment with the following command:
kafka-reassign-partitions --zookeeper hostname:port --reassignment-json-file reassignment configuration.json --bootstrap-server hostname:port --verify
The tool prints the reassignment status of all partitions. Example output:Status of partition reassignment: Reassignment of partition mytopic2-1 completed successfully Reassignment of partition mytopic1-0 completed successfully Reassignment of partition mytopic2-0 completed successfully Reassignment of partition mytopic1-2 completed successfully Reassignment of partition mytopic1-1 completed successfully
Examples
There are multiple ways to modify the configuration file. The following list of examples shows how a user can modify a proposed configuration and what these changes do. Changes to the original example are marked in bold.
{"version":1, "partitions": [{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}]}
- Reassign partitions between brokers
-
To reassign partitions from one broker to another, change the broker ID specified in replicas. For example:
{"topic":"mytopic1","partition":0,"replicas":[5,2],"log_dirs":["any","any"]}
This reassignment configuration moves partition mytopic1-0 from broker 1 to broker 5.
- Reassign partitions to another log directory on the same broker
- To reassign partitions between log directories on the same broker, change the appropriate any entry to an absolute path. For example:
{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["/log/directory1","any"]}
This reassignment configuration moves partition mytopic1-0 to the /log/directory1 log directory.
- Reassign partitions between log directories across multiple brokers
- To reassign partitions between log directories across multiple brokers, change the broker ID specified in replicas and the appropriate any entry to an absolute path. For example:
{"topic":"mytopic1","partition":0,"replicas":[5,2],"log_dirs":["/log/directory1","any"]}
This reassignment configuration moves partition mytopic1-0 to /log/directory1 on broker 5.
- Change partition assignment order (elect a new leader)
-
To change the ordering of the partition assignment list, change the order of the brokers in replicas. For example:
{"topic":"mytopic1","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}
This reassignment configuration elects broker 2 as the new leader.
kafka-log-dirs
The kafka-log-dirs tool allows user to query a list of replicas per log directory on a broker. The tool provides information that is required for optimizing replica assignment across brokers.
{ "brokers": [ { "broker": 86, "logDirs": [ { "error": null, "logDir": "/var/local/kafka/data", "partitions": [ { "isFuture": false, "offsetLag": 0, "partition": "mytopic1-2", "size": 0 } ] } ] }, ... ], "version": 1 }
The Contents of the kafka-log-dirs Output table gives an overview of the information provided by the kafka-log-dirs tool.
Property | Description |
---|---|
broker | Displays the ID of the broker. |
error | Indicates if there is a problem with the disk that hosts the topic partition. If an error is detected, org.apache.kafka.common.errors.KafkaStorageException is displayed. If no error is detected, the value is null. |
logDir | Specifies the location of the log directory. Returns an absolute path. |
isfuture | The reassignment state of the partition. This property shows whether there is currently replica movement underway between the log directories. |
offsetLag | Displays the offset lag of the partition. |
partition | Displays the name of the partition. |
size | Displays the size of the partition in bytes. |
Tool Usage
kafka-log-dirs --describe --bootstrap-server hostname:port --broker-list broker 1, broker 2 --topic-list topic 1, topic 2
"error":"org.apache.kafka.common.errors.KafkaStorageException"
zookeeper-security-migration
The zookeeper-security-migration tool is used in the process of restricting or unrestricting access to metadata stored in Zookeeper. When executed, the tool updates the ACLs of znodes based on the configuration specified by the user.
Tool Usage
zookeeper-security-migration --zookeeper.connect hostname:port --zookeeper.acl secure
zookeeper-security-migration --zookeeper.connect hostname:port --zookeeper.acl unsecure
kafka-delegation-tokens
The kafka-delegation-tokens provides the user with the functionality required for using and managing delegation tokens.
Tool Usage
The tool can be used to issue, renew, expire, or describe delegation tokens.
- Issue, and store for verification
- The owner of the token is the currently authenticated principal. A renewer can be specified when requesting the token.
kafka-delegation-tokens --bootstrap-server hostname:port --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1
- Renew
- Only the owner and the principals that are renewers of the delegation token can extend its validity by renewing it before it expires. A successful renewal extends the Delegation
Token’s expiration time for another renew-interval, until it reaches its max lifetime. Expired delegation tokens cannot be used to authenticate, the brokers will remove expired delegation tokens from
the broker’s cache and from Zookeeper.
kafka-delegation-tokens --bootstrap-server hostname:port --renew --renew-time-period -1 --command-config client.properties --hmac lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==
- Remove
- Delegation tokens are removed when they are canceled by the client or when they expire.
kafka-delegation-tokens --bootstrap-server hostname:port --expire --expiry-time-period -1 --command-config client.properties --hmac lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==
- Describe
- Tokens can be described by owners, renewers or the Kafka super user.
kafka-delegation-tokens --bootstrap-server hostname:port --describe --command-config client.properties --owner-principal User:user1
kafka-*-perf-test
The kafka-*-perf-test tool can be used in several ways. In general, it is expected that these tools should be used on a test or development cluster.
- Measuring read and/or write throughput.
- Stress testing the cluster based on specific parameters (such as message size).
- Load testing for the purpose of evaluating specific metrics or determining the impact of cluster configuration changes.
The kafka-producer-perf-test script can either create a randomly generated byte record:
kafka-producer-perf-test --topic TOPIC --record-size SIZE_IN_BYTES
or randomly read from a set of provided records:
kafka-producer-perf-test --topic TOPIC --payload-delimiter DELIMITER --payload-file INPUT_FILE
where the INPUT_FILE is a concatenated set of pre-generated messages separated by DELIMITER. This script keeps producing messages or limited based on the --num-records flag.
The kafka-consumer-perf-test is:
kafka-consumer-perf-test --broker-list host1:port1,host2:port2,... --zookeeper zk1:port1,zk2:port2,... --topic TOPIC
The flags of most interest for this command are:
- --group gid: If you run more than one instance of this test, you will want to set different ids for each instance.
- --num-fetch-threads: Defaults to 1. Increase if higher throughput testing is needed.
- --from-latest: To start consuming from the latest offset. May be needed for certain types of testing.
Enabling DEBUG or TRACE in command line scripts
cp /etc/kafka/conf/tools-log4j.properties /var/tmp sed -i -e 's/WARN/DEBUG/g' /var/tmp/tools-log4j.properties export KAFKA_OPTS="-Dlog4j.configuration=file:/var/tmp/tools-log4j.properties"
Understanding the kafka-run-class Bash Script
Almost all the provided Kafka tools eventually call the kafka-run-class script. This script is generally not called directly. However, if you are proficient with bash and want to understand certain features available in all Kafka scripts as well as some potential debugging scenarios, familiarity with the kafka-run-class script can prove highly beneficial.
For example, there are some useful environment variables that affect all the command line scripts:
- KAFKA_DEBUG allows a Java debugger to attach to the JVM launched by the particular script. Setting KAFKA_DEBUG also allows some further debugging
customization:
- JAVA_DEBUG_PORT sets the JVM debugging port.
- JAVA_DEBUG_OPTS can be used to override the default debugging arguments being passed to the JVM.
- KAFKA_HEAP_OPTS can be used to pass memory setting arguments to the JVM.
- KAFKA_JVM_PERFORMANCE_OPTS can be used to pass garbage collection flags to the JVM.