Kafka Administration Using Command Line Tools

In some situations, it is convenient to use the command line tools available in Kafka to administer your cluster. However, it is important to note that not all tools available for Kafka are supported by Cloudera. Moreover, certain administration tasks can be carried more easily and conveniently using Cloudera Manager. Therefore, before you continue, make sure to review Unsupported Command Line Tools and Notes on Kafka CLI Administration.

Unsupported Command Line Tools

The following tools can be found as part of the Kafka distribution, but their use is generally discouraged for various reasons as documented here.

Tool Notes
connect-distributed

connect-standalone

Kafka Connect is currently not supported.
kafka-acls Cloudera recommends using Sentry to manage ACLs instead of this tool.
kafka-broker-api-versions Primarily useful for Client-to-Broker protocol related development.
kafka-configs Use Cloudera Manager to adjust any broker or security properties instead of the kafka‑configs tool. This tool should only be used to modify topic properties.
kafka-delete-records Do not use with CDH.
kafka-mirror-maker Use Cloudera Manager to create any CDH Mirror Maker instance.
kafka-preferred-replica-election This tool causes leadership for each partition to be transferred back to the 'preferred replica'. It can be used to balance leadership among the servers.

It is recommended to use kafka-reassign-partitions instead of kafka-preferred-replica-election.
kafka-replay-log-producer Can be used to “rename” a topic.
kafka-replica-verification Validates that all replicas for a set of topics have the same data. This tool is a “heavy duty” version of the ISR column of kafka-topics tool.
kafka-server-start

kafka-server-stop

Use Cloudera Manager to manage any Kafka host.
kafka-simple-consumer-shell Deprecated in Apache Kafka.
kafka-streams-application-reset Kafka Streams is currently not supported.
kafka-verifiable-consumer

kafka-verifiable-producer

These scripts are intended for system testing.
zookeeper-security-migration

zookeeper-server-start

zookeeper-server-stop

Use Cloudera Manager to manage any Zookeeper host.
zookeeper-shell Limit usage of this script to reading information from Zookeeper.

Notes on Kafka CLI Administration

Here are some additional points to be aware of regarding Kafka administration:

  • Use Cloudera Manager to start and stop Kafka and Zookeeper services. Do not use the kafka-server-start, kafka-server-stop, zookeeper-server-start, or zookeeper-server-stop commands.
  • For a parcel installation, all Kafka command line tools are located in /opt/cloudera/parcels/KAFKA/lib/kafka/bin/. For a package installation, all such tools can be found in /usr/bin/.
  • Ensure that the JAVA_HOME environment variable is set to your JDK installation directory before using the command-line tools. For example:
    export JAVA_HOME=/usr/java/jdk1.8.0_144-cloudera
    
  • Using any Zookeeper command manually can be very difficult to get right when it comes to interaction with Kafka. Cloudera recommends that you avoid doing any write operations or ACL modifications in Zookeeper.

kafka-topics

Use the kafka-topics tool to generate a snapshot of topics in the Kafka cluster.

kafka-topics --zookeeper zkhost --describe
Topic: topic-a1      PartitionCount:3      ReplicationFactor:3      Configs:
         Topic: topic-a1      Partition: 0      Leader: 64      Replicas: 64,62,63      Isr: 64,62,63
         Topic: topic-a1      Partition: 1      Leader: 62      Replicas: 62,63,64      Isr: 62,63,64
         Topic: topic-a1      Partition: 2      Leader: 63      Replicas: 63,64,62      Isr: 63,64,62
Topic: topic-a2      PartitionCount:1      ReplicationFactor:3      Configs:
         Topic: topic-a2      Partition: 0      Leader: 64      Replicas: 64,62,63      Isr: 64,62,63

The output lists each topic and basic partition information. Note the following about the output:

  • Partition count: The more partitions, the higher the possible parallelism among consumers and producers.
  • Replication factor: Shows 1 for no redundancy and higher for more redundancy.
  • Replicas and in-sync replicas (ISR): Shows which broker ID’s have the partitions and which replicas are current.

There are situations where this tool shows an invalid value for the leader broker ID or the number of ISRs is fewer than the number of replicas. In those cases, there may be something wrong with those specific topics.

It is possible to change topic configuration properties using this tool. Increasing the partition count, the replication factor or both is not recommended.

kafka-configs

The kafka-configs tool allows you to set and unset properties to topics. Cloudera recommends that you use Cloudera Manager instead of this tool to change properties on brokers, because this tool bypasses any Cloudera Manager safety checks.

Setting a topic property:

kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --add-config property=value

Checking a topic property:

$ kafka-configs --zookeeper zkhost --entity-type
            topics --entity-name topic --describe

Unsetting a topic property:

$ kafka-configs --zookeeper zkhost --entity-type
            topics --entity-name topic --alter --delete-config property

The Apache Kafka documentation includes a complete list of topic properties.

kafka-console-consumer

The kafka-console-consumer tool can be useful in a couple of ways:

  • Acting as an independent consumer of particular topics. This can be useful to compare results against a consumer program that you’ve written.
  • To test general topic consumption without the need to write any consumer code.

Examples of usage:

$ kafka-console-consumer --bootstrap-server <broker1>,<broker2>... --topic <topic> --from-beginning
<record-earliest-offset>
<record-earliest-offset+1>

Note the following about the tool:

  • This tool prints all records and keeps outputting as more records are written to the topic.
  • If the kafka-console-consumer tool is given no flags, it displays the full help message.
  • In older versions of Kafka, it may have been necessary to use the --new-consumer flag. As of Apache Kafka version 0.10.2, this is no longer necessary.

kafka-console-producer

This tool is used to write messages to a topic. It is typically not as useful as the console consumer, but it can be useful when the messages are in a text based format. In general, the usage will be something like:

cat file | kafka-console-producer args

kafka-consumer-groups

The basic usage of the kafka-consumer-groups tool is:

kafka-consumer-groups --bootstrap-server broker1,broker2... --describe --group GROUP_ID

This tool is primarily useful for debugging consumer offset issues. The output from the tool shows the log and consumer offsets for each partition connected to the consumer group corresponding to GROUP_ID. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.

Beyond this debugging usage, there are other more advanced options to this tool:

  • --execute --reset-offsets SCENARIO_OPTION: Resets the offsets for a consumer group to a particular value based on the SCENARIO_OPTION flag given.

    Valid flags for SCENARIO_OPTION are:

    • --to-datetime
    • --by-period
    • --to-earliest
    • --to-latest
    • --shift-by
    • --from-file
    • --to-current

    You will likely want to set the --topic flag to restrict this change to a specific topic or a specific set of partitions within that topic.

This tool can be used to reset all offsets on all topics. This is something you probably won’t ever want to do. It is highly recommended that you use this command carefully.

kafka-reassign-partitions

This tool provides substantial control over partitions in a Kafka cluster. It is mainly used to balance storage loads across brokers through the following reassignment actions:

  • Change the ordering of the partition assignment list. Used to control leader imbalances between brokers.
  • Reassign partitions from one broker to another. Used to expand existing clusters.
  • Reassign partitions between log directories on the same broker. Used to resolve storage load imbalance among available disks in the broker.
  • Reassign partitions between log directories across multiple brokers. Used to resolve storage load imbalance across multiple brokers.
The tool uses two JSON files for input. Both of these are created by the user. The two files are the following:
Topics-to-Move JSON
This JSON file specifies the topics that you want to reassign. This a simple file that tells the kafka-reassign-partitions tool which partitions it should look at when generating a proposal for the reassignment configuration. The user has to create the topics-to-move JSON file from scratch.
The format of the file is the following:
{"topics":  [{"topic": "mytopic1"},
             {"topic": "mytopic2"}],
 "version":1
}
Reassignment Configuration JSON
This JSON file is a configuration file that contains the parameters used in the reassignment process. This file is created by the user, however, a proposal for its contents is generated by the tool. When the kafka-reasssign-partitions tool is executed with the --generate option, it generates a proposed configuration which can be fine-tuned and saved as a JSON file. The file created this way is the reassignment configuration JSON. To generate a proposal, the tool requires a topics-to-move file as input.
The format of the file is the following:
{"version":1,
 "partitions":
   [{"topic":"mytopic1","partition":3,"replicas":[4,5],"log_dirs":["any","any"]},
    {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]},
    {"topic":"mytopic2","partition":2,"replicas":[6,5],"log_dirs":["any","any"]}]
}

The reassignment configuration contains multiple properties that each control and specify an aspect of the configuration. The Reassignment Configuration Properties table lists each property and its description.

Reassignment Configuration Properties
Property Description
topic Specifies the topic.
partition Specifies the partition.
replicas Specifies the brokers that the selected partition is assigned to. The brokers are listed in order, which means that the first broker in the list is always the leader for that partition. Change the order of brokers to resolve any leader balancing issues among brokers. Change the broker IDs to reassign partitions to different brokers.
log_dirs Specifies the log directory of the brokers. The log directories are listed in the same order as the brokers. By default any is specified as the log directory, which means that the broker is free to choose where it places the replica. By default, the current broker implementation selects the log directory using a round-robin algorithm. An absolute path beginning with a / can be used to explicitly set where to store the partition replica.
Notes and Recommendations:
  • Cloudera recommends that you minimize the volume of replica changes per command instance. Instead of moving 10 replicas with a single command, move two at a time in order to save cluster resources.
  • This tool cannot be used to make an out-of-sync replica into the leader partition.
  • Use this tool only when all brokers and topics are healthy.
  • Anticipate system growth. Redistribute the load when the system is at 70% capacity. Waiting until redistribution becomes necessary due to reaching resource limits can make the redistribution process extremely time consuming.

Tool Usage

To reassign partitions, complete the following steps:

  1. Create a topics-to-move JSON file that specifies the topics you want to reassign. Use the following format:
    {"topics":  [{"topic": "mytopic1"},
                 {"topic": "mytopic2"}],
     "version":1
    }
  2. Generate the content for the reassignment configuration JSON with the following command:
    kafka-reassign-partitions --zookeeper hostname:port --topics-to-move-json-file topics to move.json --broker-list broker 1, broker 2 --generate

    Running the command lists the distribution of partition replicas on your current brokers followed by a proposed partition reassignment configuration.

    Example output:

    Current partition replica assignment
    {"version":1,
     "partitions":
       [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]
    }
    
    Proposed partition reassignment configuration
    
    {"version":1,
     "partitions":
       [{"topic":"mytopic1","partition":0,"replicas":[4,5],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":1,"replicas":[4,5],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":0,"replicas":[5,4],"log_dirs":["any","any"]}]
    }

    In this example, the tool proposed a configuration which reassigns existing partitions on broker 1, 2, and 3 to brokers 4 and 5.

  3. Copy and paste the proposed partition reassignment configuration into an empty JSON file.
  4. Review, and if required, modify the suggested reassignment configuration.
  5. Save the file.
  6. Start the redistribution process with the following command:
    kafka-reassign-partitions --zookeeper hostname:port  --reassignment-json-file reassignment configuration.json --bootstrap-server hostname:port --execute

    The tool prints a list containing the original replica assignment and a message that reassignment has started. Example output:

    Current partition replica assignment
    
    {"version":1,
     "partitions":
       [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]
    }
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
  7. Verify the status of the reassignment with the following command:
    kafka-reassign-partitions --zookeeper hostname:port --reassignment-json-file reassignment configuration.json  --bootstrap-server hostname:port --verify
    
    The tool prints the reassignment status of all partitions. Example output:
    Status of partition reassignment: 
    Reassignment of partition mytopic2-1 completed successfully
    Reassignment of partition mytopic1-0 completed successfully
    Reassignment of partition mytopic2-0 completed successfully
    Reassignment of partition mytopic1-2 completed successfully
    Reassignment of partition mytopic1-1 completed successfully

Examples

There are multiple ways to modify the configuration file. The following list of examples shows how a user can modify a proposed configuration and what these changes do. Changes to the original example are marked in bold.

Suppose that the kafka-reassign-partitions tool generated the following proposed reassignment configuration:
{"version":1,
 "partitions":
   [{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}]}
Reassign partitions between brokers
To reassign partitions from one broker to another, change the broker ID specified in replicas. For example:
{"topic":"mytopic1","partition":0,"replicas":[5,2],"log_dirs":["any","any"]}

This reassignment configuration moves partition mytopic1-0 from broker 1 to broker 5.

Reassign partitions to another log directory on the same broker
To reassign partitions between log directories on the same broker, change the appropriate any entry to an absolute path. For example:
{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["/log/directory1","any"]}

This reassignment configuration moves partition mytopic1-0 to the /log/directory1 log directory.

Reassign partitions between log directories across multiple brokers
To reassign partitions between log directories across multiple brokers, change the broker ID specified in replicas and the appropriate any entry to an absolute path. For example:
{"topic":"mytopic1","partition":0,"replicas":[5,2],"log_dirs":["/log/directory1","any"]}

This reassignment configuration moves partition mytopic1-0 to /log/directory1 on broker 5.

Change partition assignment order (elect a new leader)
To change the ordering of the partition assignment list, change the order of the brokers in replicas. For example:
{"topic":"mytopic1","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}

This reassignment configuration elects broker 2 as the new leader.

kafka-log-dirs

The kafka-log-dirs tool allows user to query a list of replicas per log directory on a broker. The tool provides information that is required for optimizing replica assignment across brokers.

On successful execution, the tool prints a list of partitions per log directory for the specified topics and brokers. The list contains information on topic partition, size, offset lag, and reassignment state. Example output:
{
    "brokers": [
        {
            "broker": 86,
            "logDirs": [
                {
                    "error": null,
                    "logDir": "/var/local/kafka/data",
                    "partitions": [
                        {
                            "isFuture": false,
                            "offsetLag": 0,
                            "partition": "mytopic1-2",
                            "size": 0
                        }
                    ]
                }
            ]
        },
        ...
    ],
    "version": 1
}

The Contents of the kafka-log-dirs Output table gives an overview of the information provided by the kafka-log-dirs tool.

Contents of the kafka-log-dirs Output
Property Description
broker Displays the ID of the broker.
error Indicates if there is a problem with the disk that hosts the topic partition. If an error is detected, org.apache.kafka.common.errors.KafkaStorageException is displayed. If no error is detected, the value is null.
logDir Specifies the location of the log directory. Returns an absolute path.
isfuture The reassignment state of the partition. This property shows whether there is currently replica movement underway between the log directories.
offsetLag Displays the offset lag of the partition.
partition Displays the name of the partition.
size Displays the size of the partition in bytes.

Tool Usage

To retrieve replica assignment information, run the following command:
kafka-log-dirs --describe --bootstrap-server hostname:port --broker-list broker 1, broker 2 --topic-list topic 1, topic 2
If no topic is specified with the --topic-list option, then all topics are queried. If no broker is specified with the --broker-list option, then all brokers are queried. If a log directory is offline, the log directory will be marked offline in the script output. Error example:
 "error":"org.apache.kafka.common.errors.KafkaStorageException"

kafka-*-perf-test

The kafka-*-perf-test tool can be used in several ways. In general, it is expected that these tools should be used on a test or development cluster.

  • Measuring read and/or write throughput.
  • Stress testing the cluster based on specific parameters (such as message size).
  • Load testing for the purpose of evaluating specific metrics or determining the impact of cluster configuration changes.

The kafka-producer-perf-test script can either create a randomly generated byte record:

kafka-producer-perf-test --topic TOPIC --record-size SIZE_IN_BYTES

or randomly read from a set of provided records:

kafka-producer-perf-test --topic TOPIC --payload-delimiter DELIMITER --payload-file INPUT_FILE

where the INPUT_FILE is a concatenated set of pre-generated messages separated by DELIMITER. This script keeps producing messages or limited based on the --num-records flag.

The kafka-consumer-perf-test is:

kafka-consumer-perf-test --broker-list host1:port1,host2:port2,... --zookeeper zk1:port1,zk2:port2,... --topic TOPIC

The flags of most interest for this command are:

  • --group gid: If you run more than one instance of this test, you will want to set different ids for each instance.
  • --num-fetch-threads: Defaults to 1. Increase if higher throughput testing is needed.
  • --from-latest: To start consuming from the latest offset. May be needed for certain types of testing.

Enabling DEBUG or TRACE in command line scripts

In some cases, you may find it useful to produce extra debugging output from the Kafka client API. The DEBUG (shown below) or TRACE levels can be set by replacing the setting in the log4j properties file as follows:
cp /etc/kafka/conf/tools-log4j.properties /var/tmp
sed -i -e 's/WARN/DEBUG/g' /var/tmp/tools-log4j.properties

export KAFKA_OPTS="-Dlog4j.configuration=file:/var/tmp/tools-log4j.properties"

Understanding the kafka-run-class Bash Script

Almost all the provided Kafka tools eventually call the kafka-run-class script. This script is generally not called directly. However, if you are proficient with bash and want to understand certain features available in all Kafka scripts as well as some potential debugging scenarios, familiarity with the kafka-run-class script can prove highly beneficial.

For example, there are some useful environment variables that affect all the command line scripts:

  • KAFKA_DEBUG allows a Java debugger to attach to the JVM launched by the particular script. Setting KAFKA_DEBUG also allows some further debugging customization:
    • JAVA_DEBUG_PORT sets the JVM debugging port.
    • JAVA_DEBUG_OPTS can be used to override the default debugging arguments being passed to the JVM.
  • KAFKA_HEAP_OPTS can be used to pass memory setting arguments to the JVM.
  • KAFKA_JVM_PERFORMANCE_OPTS can be used to pass garbage collection flags to the JVM.