CDH 6 includes Apache Kafka as part of the core package. The documentation includes improved contents for how to set up, install, and administer your Kafka ecosystem. For more information, see the Cloudera Enterprise 6.0.x Apache Kafka Guide. We look forward to your feedback on both the existing and new documentation.
Using Apache Kafka Command-line Tools
$ kafka-topics --zookeeper --list sink1 t1 t2 $ kafka-topics --create --zookeeper hostname:2181/kafka --replication-factor 2 --partitions 4 --topic topicname
$ kafka-console-consumer --zookeeper --topic t1
$ kafka-console-producer --broker-list, --topic t1
The kafka-consumer-groups tool can be used to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.
This tool is primarily used for describing consumer groups and debugging any consumer offset issues. The output from the tool shows the log and consumer offsets for each partition connected to the consumer group that is being described. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.
Using the tool on secure and unsecure clusters differs slightly. On secure clusters, you have use the command-config option together with an appropriate property file.
- Viewing offsets on an unsecure cluster
Use the following command to view offsets committed to Kafka:
kafka-consumer-groups --new-consumer --bootstrap-server --describe --group flume
Output Example:GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER flume t1 0 1 3 2 test-consumer-group_postamac.local-1456198719410-29ccd54f-0
- Viewing offsets on a secure cluster
- In order to view offsets on a secure Kafka cluster, the consumer-groups tool has to be run with the command-config option. This option specifies the
property file that contains the necessary configurations to run the tool on a secure cluster. The process to create property file is identical to the client configuration process detailed in
Enabling Kerberos Authentication and Step 5 in Deploying SSL for
Kafka. Which process you need to follow depends on the security configuration of the cluster..
To view offsets do the following:
- Pass the jaas.conf file location as a JVM parameter.
export KAFKA_OPTS='
- Run the tool with the command-config option.
kafka-consumer-groups --bootstrap-server --describe --command-config
The command-config option specifies the property file that contains the necessary configurations to run the tool on a secure cluster. Which properties are configured in this file is dependent on the protocols being used.
Example file:This example shows what properties you have to set for theconsumer-groups when both Kerberos and TLS/SSL are configured on your cluster.exclude.internal.topics=false security.protocol = SASL_SSL = kafka ssl.truststore.location = /var/private/ssl/kafka.client.truststore.jks ssl.truststore.password = test1234
- Pass the jaas.conf file location as a JVM parameter.
This tool provides substantial control over partitions in a Kafka cluster. It is mainly used to balance storage loads across brokers through the following reassignment actions:
- Change the ordering of the partition assignment list. Used to control leader imbalances between brokers.
- Reassign partitions from one broker to another. Used to expand existing clusters.
- Reassign partitions between log directories on the same broker. Used to resolve storage load imbalance among available disks in the broker.
- Reassign partitions between log directories across multiple brokers. Used to resolve storage load imbalance across multiple brokers.
- Topics-to-Move JSON
- This JSON file specifies the topics that you want to reassign. This a simple file that tells the kafka-reassign-partitions tool which partitions it
should look at when generating a proposal for the reassignment configuration. The user has to create the topics-to-move JSON file from scratch.
The format of the file is the following:
{"topics": [{"topic": "mytopic1"}, {"topic": "mytopic2"}], "version":1 }
- Reassignment Configuration JSON
- This JSON file is a configuration file that contains the parameters used in the reassignment process. This file is created by the user, however, a proposal for its contents is
generated by the tool. When the kafka-reasssign-partitions tool is executed with the --generate option, it generates a proposed
configuration which can be fine-tuned and saved as a JSON file. The file created this way is the reassignment configuration JSON. To generate a proposal, the tool requires a topics-to-move file as
The format of the file is the following:
{"version":1, "partitions": [{"topic":"mytopic1","partition":3,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":2,"replicas":[6,5],"log_dirs":["any","any"]}] }
The reassignment configuration contains multiple properties that each control and specify an aspect of the configuration. The Reassignment Configuration Properties table lists each property and its description.
Property | Description |
topic | Specifies the topic. |
partition | Specifies the partition. |
replicas | Specifies the brokers that the selected partition is assigned to. The brokers are listed in order, which means that the first broker in the list is always the leader for that partition. Change the order of brokers to resolve any leader balancing issues among brokers. Change the broker IDs to reassign partitions to different brokers. |
log_dirs | Specifies the log directory of the brokers. The log directories are listed in the same order as the brokers. By default any is specified as the log directory, which means that the broker is free to choose where it places the replica. By default, the current broker implementation selects the log directory using a round-robin algorithm. An absolute path beginning with a / can be used to explicitly set where to store the partition replica. |
- Cloudera recommends that you minimize the volume of replica changes per command instance. Instead of moving 10 replicas with a single command, move two at a time in order to save cluster resources.
- This tool cannot be used to make an out-of-sync replica into the leader partition.
- Use this tool only when all brokers and topics are healthy.
- Anticipate system growth. Redistribute the load when the system is at 70% capacity. Waiting until redistribution becomes necessary due to reaching resource limits can make the redistribution process extremely time consuming.
Tool Usage
To reassign partitions, complete the following steps:
- Create a topics-to-move JSON file that specifies the topics you want to reassign. Use the following format:
{"topics": [{"topic": "mytopic1"}, {"topic": "mytopic2"}], "version":1 }
- Generate the content for the reassignment configuration JSON with the following command:
kafka-reassign-partitions --zookeeper hostname:port --topics-to-move-json-file topics to move.json --broker-list broker 1, broker 2 --generate
Running the command lists the distribution of partition replicas on your current brokers followed by a proposed partition reassignment configuration.
Example output:
Current partition replica assignment {"version":1, "partitions": [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}] } Proposed partition reassignment configuration {"version":1, "partitions": [{"topic":"mytopic1","partition":0,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":2,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":1,"replicas":[4,5],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":0,"replicas":[5,4],"log_dirs":["any","any"]}] }
In this example, the tool proposed a configuration which reassigns existing partitions on broker 1, 2, and 3 to brokers 4 and 5.
- Copy and paste the proposed partition reassignment configuration into an empty JSON file.
- Review, and if required, modify the suggested reassignment configuration.
- Save the file.
- Start the redistribution process with the following command:
kafka-reassign-partitions --zookeeper hostname:port --reassignment-json-file reassignment configuration.json --bootstrap-server hostname:port --execute
The tool prints a list containing the original replica assignment and a message that reassignment has started. Example output:
Current partition replica assignment {"version":1, "partitions": [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]}, {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions.
- Verify the status of the reassignment with the following command:
kafka-reassign-partitions --zookeeper hostname:port --reassignment-json-file reassignment configuration.json --bootstrap-server hostname:port --verify
The tool prints the reassignment status of all partitions. Example output:Status of partition reassignment: Reassignment of partition mytopic2-1 completed successfully Reassignment of partition mytopic1-0 completed successfully Reassignment of partition mytopic2-0 completed successfully Reassignment of partition mytopic1-2 completed successfully Reassignment of partition mytopic1-1 completed successfully
There are multiple ways to modify the configuration file. The following list of examples shows how a user can modify a proposed configuration and what these changes do. Changes to the original example are marked in bold.
{"version":1, "partitions": [{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}]}
- Reassign partitions between brokers
To reassign partitions from one broker to another, change the broker ID specified in replicas. For example:
This reassignment configuration moves partition mytopic1-0 from broker 1 to broker 5.
- Reassign partitions to another log directory on the same broker
- To reassign partitions between log directories on the same broker, change the appropriate any entry to an absolute path. For example:
This reassignment configuration moves partition mytopic1-0 to the /log/directory1 log directory.
- Reassign partitions between log directories across multiple brokers
- To reassign partitions between log directories across multiple brokers, change the broker ID specified in replicas and the appropriate any entry to an absolute path. For example:
This reassignment configuration moves partition mytopic1-0 to /log/directory1 on broker 5.
- Change partition assignment order (elect a new leader)
To change the ordering of the partition assignment list, change the order of the brokers in replicas. For example:
This reassignment configuration elects broker 2 as the new leader.
The kafka-log-dirs tool allows user to query a list of replicas per log directory on a broker. The tool provides information that is required for optimizing replica assignment across brokers.
{ "brokers": [ { "broker": 86, "logDirs": [ { "error": null, "logDir": "/var/local/kafka/data", "partitions": [ { "isFuture": false, "offsetLag": 0, "partition": "mytopic1-2", "size": 0 } ] } ] }, ... ], "version": 1 }
The Contents of the kafka-log-dirs Output table gives an overview of the information provided by the kafka-log-dirs tool.
Property | Description |
broker | Displays the ID of the broker. |
error | Indicates if there is a problem with the disk that hosts the topic partition. If an error is detected, org.apache.kafka.common.errors.KafkaStorageException is displayed. If no error is detected, the value is null. |
logDir | Specifies the location of the log directory. Returns an absolute path. |
isfuture | The reassignment state of the partition. This property shows whether there is currently replica movement underway between the log directories. |
offsetLag | Displays the offset lag of the partition. |
partition | Displays the name of the partition. |
size | Displays the size of the partition in bytes. |
Tool Usage
kafka-log-dirs --describe --bootstrap-server hostname:port --broker-list broker 1, broker 2 --topic-list topic 1, topic 2