CDH 6 includes Apache Kafka as part of the core package. The documentation includes improved contents for how to set up, install, and administer your Kafka ecosystem. For more information, see the Cloudera Enterprise 6.0.x Apache Kafka Guide. We look forward to your feedback on both the existing and new documentation.

Issues Fixed in CDK 4.0.0 Powered By Apache Kafka

CDK 4.0.0 Powered By Apache Kafka fixes the following issues:

Kafka stuck with under-replicated partitions after Zookeeper session expires

This problem might occur when your Kafka cluster includes a large number of under-replicated Kafka partitions. One or more broker logs include messages such as the following:

[2016-01-17 03:36:00,888] INFO Partition [__samza_checkpoint_event-creation_1,3] on broker 3: Shrinking ISR for partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 (kafka.cluster.Partition)
[2016-01-17 03:36:00,891] INFO Partition [__samza_checkpoint_event-creation_1,3] on broker 3: Cached zkVersion [66] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)

There will also be an indication of the ZooKeeper session expiring in one or more Kafka broker logs around the same time as the previous errors:

          INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient)

The log is typically in /var/log/kafka on each host where a Kafka broker is running. The location is set by the property kafka.log4j.dir in Cloudera Manager. The log name is kafka-broker-hostname.log. In diagnostic bundles, the log is under logs/ hostname-ip-address/.

Affected Versions: CDK 1.4.x, 2.0.x, 2.1.x, 2.2.x Powered By Apache Kafka

Fixed Versions:
  • Full Fix: CDK 4.0.0 and higher Powered By Apache Kafka
  • Partial Fix: CDK 3.0.0 and higher Powered By Apache Kafka are less likely to encounter this issue.

Workaround: To move forward after seeing this problem, restart the Kafka brokers affected. You can restart individual brokers from the Instances tab in the Kafka service page in Cloudera Manager.

To reduce the chances of this issue happening again, do what you can to make sure ZooKeeper sessions do not expire:

  • Reduce the potential for long garbage collection pauses by brokers:
    • Use a better garbage collection mechanism in the JVM, such as G1GC. You can do this by adding –XX:+UseG1GC in the broker_java_opts.
    • Increase broker heap size if it is too small (broker_max_heap_size) (be careful that you don’t choose a heap size that can cause out-of-memory problems given all the services running on the node).
  • Increase the ZooKeeper session timeout configuration on brokers (, to reduce the likelihood that sessions expire.
  • Ensure ZooKeeper itself is well resourced and not overwhelmed, so it can respond. For example, it is highly recommended to locate the ZooKeeper log directory is on its own disk.

Cloudera JIRA: CDH-42514

Apache JIRA: KAFKA-2729

Upstream Issues Fixed

The following upstream issues are fixed in CDH 4.0.0:

  • KAFKA-3514 - Stream timestamp computation needs some further thoughts
  • KAFKA-4514 - Add Codec for ZStandard Compression
  • KAFKA-4932 - Add support for UUID serialization and deserialization
  • KAFKA-5066 - Add KafkaMetricsConfig (Yammer metrics reporters) props to documentation
  • KAFKA-5690 - Add support to list ACLs for a given principal
  • KAFKA-5891 - Proper handling of LogicalTypes in Cast
  • KAFKA-5975 - No response when deleting topics and delete.topic.enable=false
  • KAFKA-6123 - Give client MetricsReporter auto-generated
  • KAFKA-6195 - Resolve DNS aliases in bootstrap.server (KIP-235)
  • KAFKA-6684 - Support casting Connect values with bytes schema to string
  • KAFKA-6753 - Updating the OfflinePartitions count only when necessary
  • KAFKA-6761 - Reduce streams footprint part IV add optimization
  • KAFKA-6835 - Enable topic unclean leader election to be enabled without controller change
  • KAFKA-6859 - Do not send LeaderEpochRequest for undefined leader epochs
  • KAFKA-6863 - Kafka clients should try to use multiple DNS resolved IP
  • KAFKA-6914 - Set parent classloader of DelegatingClassLoader same as the worker's
  • KAFKA-6923 - Refactor Serializer/Deserializer for KIP-336
  • KAFKA-6926 - Simplified some logic to eliminate some suppressions of NPath complexity checks
  • KAFKA-6950 - Delay response to failed client authentication to prevent potential DoS issues (KIP-306)
  • KAFKA-6998 - Disable Caching when max.cache.bytes are zero.
  • KAFKA-7019 - Make reading metadata lock-free by maintaining an atomically-updated read snapshot
  • KAFKA-7044 - Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group
  • KAFKA-7080 - and KAFKA-7222: Cleanup overlapping KIP changes
  • KAFKA-7096 - Clear buffered data for partitions that are explicitly unassigned by user
  • KAFKA-7117 - Support AdminClient API in AclCommand (KIP-332)
  • KAFKA-7128 - ; Follower has to catch up to offset within current leader epoch to join ISR
  • KAFKA-7134 - KafkaLog4jAppender exception handling with ignoreExceptions
  • KAFKA-7139 - Support option to exclude the internal topics in
  • KAFKA-7169 - Validate SASL extensions through callback on server side
  • KAFKA-7196 - Remove heartbeat delayed operation for those removed consumers at the end of each rebalance
  • KAFKA-7211 - ; MM should handle TimeoutException in commitSync
  • KAFKA-7215 - Improve LogCleaner Error Handling
  • KAFKA-7216 - Ignore unknown ResourceTypes while loading acl cache
  • KAFKA-7222 - Add Windows grace period
  • KAFKA-7223 - In-Memory Suppression Buffering
  • KAFKA-7240 - -total metrics in Streams are incorrect
  • KAFKA-7242 - Reverse xform configs before saving
  • KAFKA-7277 - Migrate Streams API to Duration instead of longMs times
  • KAFKA-7278 - replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
  • KAFKA-7280 - Synchronize consumer fetch request/response handling
  • KAFKA-7285 - Create new producer on each rebalance if EOS enabled
  • KAFKA-7286 - Avoid getting stuck loading large metadata records
  • KAFKA-7287 - Set open ACL for old consumer znode path
  • KAFKA-7288 - Fix for SslSelectorTest.testCloseConnectionInClosingState
  • KAFKA-7296 - Handle coordinator loading error in TxnOffsetCommit
  • KAFKA-7298 - Raise UnknownProducerIdException if next sequence number is unknown
  • KAFKA-7299 - Batch LeaderAndIsr requests for AutoLeaderRebalance
  • KAFKA-7301 - Fix streams Scala join ambiguous overload
  • KAFKA-7311 - Reset next batch expiry time on each poll loop
  • KAFKA-7313 - StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists
  • KAFKA-7316 - Fix Streams Scala filter recursive call #5538
  • KAFKA-7322 - Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated
  • KAFKA-7324 - NPE due to lack of SASLExtensions in SASL/OAUTHBEARER
  • KAFKA-7326 - KStream.print() should flush on each line for PrintStream
  • KAFKA-7332 - Update CORRUPT_MESSAGE exception message description
  • KAFKA-7333 - Protocol changes for KIP-320
  • KAFKA-7338 - Specify AES128 default encryption type for Kerberos tests
  • KAFKA-7347 - Return not leader error for OffsetsForLeaderEpoch requests to non-replicas
  • KAFKA-7353 - Connect logs 'this' for anonymous inner classes
  • KAFKA-7354 - Fix IdlePercent and NetworkProcessorAvgIdlePercent metric
  • KAFKA-7366 - Make topic configs segment.bytes and to take effect immediately
  • KAFKA-7369 - Handle retriable errors in AdminClient list groups API
  • KAFKA-7379 - [streams] send.buffer.bytes should be allowed to set -1 in KafkaStreams
  • KAFKA-7385 - Fix log cleaner behavior when only empty batches are retained
  • KAFKA-7386 - streams-scala should not cache serdes
  • KAFKA-7388 - equal sign in property value for password
  • KAFKA-7394 - OffsetsForLeaderEpoch supports topic describe access
  • KAFKA-7395 - Add fencing to replication protocol (KIP-320)
  • KAFKA-7396 - Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes
  • KAFKA-7399 - KIP-366, Make FunctionConversions deprecated
  • KAFKA-7400 - Compacted topic segments that precede the log start offse...
  • KAFKA-7403 - Use default timestamp if no expire timestamp set in offset commit value
  • KAFKA-7406 - Name join group repartition topics
  • KAFKA-7409 - Validate message format version before creating topics or altering configs
  • KAFKA-7414 - Out of range errors should never be fatal for follower
  • KAFKA-7415 - Persist leader epoch and start offset on becoming a leader
  • KAFKA-7428 - ConnectionStressSpec: add "action", allow multiple clients
  • KAFKA-7429 - Enable key/truststore update with same filename/password
  • KAFKA-7434 - Fix NPE in DeadLetterQueueReporter
  • KAFKA-7437 - Persist leader epoch in offset commit metadata
  • KAFKA-7439 - Replace EasyMock and PowerMock with Mockito in clients module
  • KAFKA-7441 - Allow LogCleanerManager.resumeCleaning() to be used concurrently
  • KAFKA-7453 - Expire registered channels not selected within idle timeout
  • KAFKA-7454 - Use lazy allocation for SslTransportLayer buffers and null them on close
  • KAFKA-7456 - Serde Inheritance in DSL
  • KAFKA-7459 - Use thread-safe Pool for RequestMetrics.requestRateInternal
  • KAFKA-7460 - Fix Connect Values converter date format pattern
  • KAFKA-7462 - Make token optional for OAuthBearerLoginModule
  • KAFKA-7467 - NoSuchElementException is raised because controlBatch is empty
  • KAFKA-7475 - capture remote address on connection authetication errors, and log it
  • KAFKA-7476 - Fix Date-based types in SchemaProjector
  • KAFKA-7477 - Improve Streams close timeout semantics
  • KAFKA-7481 - Add upgrade/downgrade notes for 2.1.x
  • KAFKA-7482 - LeaderAndIsrRequest should be sent to the shutting down broker
  • KAFKA-7483 - Allow streams to pass headers through Serializer.
  • KAFKA-7496 - Handle invalid filters gracefully in KafkaAdminClient#describeAcls
  • KAFKA-7498 - Remove references from `common.requests` to `clients`
  • KAFKA-7501 - Fix producer batch double deallocation when receiving message too large error on expired batch
  • KAFKA-7505 - Process incoming bytes on write error to report SSL failures
  • KAFKA-7519 - Clear pending transaction state when expiration fails
  • KAFKA-7532 - Clean-up controller log when shutting down brokers
  • KAFKA-7534 - Error in flush calling close may prevent underlying store from closing
  • KAFKA-7535 - KafkaConsumer doesn't report records-lag if isolation.level is read_committed
  • KAFKA-7560 - PushHttpMetricsReporter should not convert metric value to double
  • KAFKA-7742 - Fixed removing hmac entry for a token being removed from DelegationTokenCache