What's New in Apache Kafka

Learn about the new features of Kafka in Cloudera Runtime 7.1.9.

Rebase on Kafka 3.4.1

Kafka shipped with this version of Cloudera Runtime is based on Apache Kafka 3.4.1. For more information, see the following upstream resources:

Apache Kafka Notable Changes:
Apache Kafka Release Notes:

Kafka KRaft [Technical Preview]

Apache Kafka Raft (KRaft) is a consensus protocol used for metadata management that was developed as a replacement for Apache ZooKeeper. Using KRaft for managing Kafka metadata instead of ZooKeeper offers various benefits including a simplified architecture and a reduced operational footprint.

Kafka KRaft in this release of Cloudera Runtime is in technical preview and does not support the following:
  • Deployments with multiple log directories. This includes deployments that use JBOD for storage.
  • Delegation token based authentication.
  • Migrating an already running Kafka service from ZooKeeper to KRaft.
  • Atlas Integration.

For a conceptual overview on KRaft, see Kafka KRaft. For more information on how to set up a cluster with KRaft, see KRaft setup.

Kafka log directory monitoring improvements

A new Cloudera Manager chart, trigger, and action is added for the Kafka service. These assist you in monitoring the log directory space of the Kafka Brokers, and enable you to prevent Kafka disks from filling up.

The chart is called Log Directory Free Capacity. It shows the capacity of each Kafka Broker log directory.

The trigger is called Broker Log Directory Free Capacity Check. It fires if the capacity of any log directory falls below 10%. The trigger is automatically created for all newly deployed Kafka services, but must be created with the Create Kafka Log Directory Free Capacity Check action for existing services following an upgrade.

The chart and trigger are available on the Kafka service > Status page. The action is available in Kafka service > Actions.

Kafka Connect metrics reporter security configurable in Cloudera Manager

New, dedicated Cloudera Manager properties are introduced for the security configuration of the Kafka Connect metrics reporter. As a result, you are no longer required to use advanced security snippets if you want to secure the metrics reporter and its endpoint. The new properties introduced are as follows:
  • Secure Jetty Metrics Port
  • Enable Basic Authentication for Metrics Reporter
  • Jetty Metrics User Name
  • Jetty Metrics Password
A dedicated property to enable TLS/SSL for the metrics reporter is not available. Instead you must select Enable TLS/SSL for Kafka Connect which enables TLS/SSL for the Kafka Connect role including the metrics reporter. For more information regarding these properties, see Cloudera Manager Configuration Properties Reference.

In addition, the Kafka Connect Prometheus Metrics Port property is removed and is replaced by Jetty Metrics Port or Secure Jetty Metrics Port. As a result, the setup steps required to configure Prometheus as the metrics store for SMM are changed. For updated deployment instructions, see Setting up Prometheus for Streams Messaging Manager.

Single Message Transforms (SMT) plugins for binary conversion

Two Cloudera developed SMT plugins are added. These are the ConvertToBytes and ConvertFromBytes plugins, which you can use to convert binary data to or from the Kafka Connect internal data format.

For more information, see the following resources:

Exactly-once semantics (EOS) support for source connectors

EOS support is added for Kafka Connect source connectors. For more information, see Configuring EOS for source connectors.

Rolling restart checks provide a high cluster health guarantees by default

The default value of the Cluster Health Guarantee During Rolling Restart property is changed from none to healthy partitions stay healthy. This property defines what type of checks are performed during a Rolling Restart on the restarted broker. Each setting guarantees a different level of cluster health during Rolling Restarts. With the none setting, no checks are performed. This means that in previous versions no guarantees were provided on cluster health by default.

The new default, healthy partitions stay healthy, ensures a high level of guarantees on cluster health. This setting ensures that no partitions go into an under-min-isr state when a broker is stopped. This is achieved by waiting before each broker is stopped so that all other brokers can catch up with all replicas that are in an at-min-isr state. Additionally, the setting ensures that the restarted broker is accepting requests on its service port before restarting the next broker. This setting ignores partitions which are already in an under-min-isr state. For more information, see Rolling restart checks.

Kafka load balancer is automatically configured with the LDAP handler if LDAP authentication is configured

When a load balancer and LDAP authentication is configured for Kafka, the PLAIN mechanism is automatically added to the enabled authentication mechanisms of the load balancer listener. Additionally, the load balancer is automatically configured to use the LdapPlainServerCallbackHandler as the callback handler.

LDAPS SSL configurations are inherited from the Kafka broker

The SSL configurations of LDAP over SSL (LDAPS) are inherited from the Kafka broker. Previously, the JDK default was used. If the JDK default certificate store contains certificates which were used to setup SSL connection to LDAP, it should be imported to the broker stores.

Aliases for Kafka CLI tools

Aliases are added for the kafka-storage.sh, kafka-cluster.sh, and kafka-features.sh command line tools. These tools can now be called globally with kafka-storage, kafka-cluster, and kafka-features.

Multi-level rack awareness

The rack awareness capabilities of Kafka are improved to support multi-level cluster topologies. As a result, brokers can now be configured to run in a multi-level rack-aware mode. If this mode is enabled, the brokers provide multi-level rack awareness guarantees. These guarantees ensure that topic partition replicas are spread evenly across all levels of the physical infrastructure. For example, in a two-level hierarchy with Data Centers on the top level and racks on the second level, brokers will evenly spread replicas among both available DCs and racks.

The new mode is compatible with follower fetching. If multi-level mode is enabled, a compatible replica selector class is automatically installed. This implementation enables consumers (if configured), to fetch Kafka messages from the replica that is closest to them in the multi-level hierarchy.

Additionally, when Cruise Control is deployed on the cluster, the standard rack-aware goals in Cruise Control’s configuration are replaced with a multi-level rack-aware goal. This goal ensures that Cruise Control optimizations do not violate the multi-level rack awareness guarantees. This goal is currently downstream only, available exclusively in Cloudera distributed Cruise Control. For more information, see the following resources:

AvroConverter support for Kafka Connect logical types

The AvroConverter now converts between Connect and Avro temporal and decimal types.

Connector configurations must by default override the sasl.jaas.config property of the Kafka clients used by the connector

The Require Connectors To Override Kafka Client JAAS Configuration Kafka Connect property is now selected by default. This means that connector configurations must by default contain a sasl.jaas.config entry with an appropriate JAAS configuration that can be used to establish a connection with the Kafka service.

Connect JAAS enforcement now applies to non-override type Kafka client configs

When the Require Connectors To Override Kafka Client JAAS Configuration property is selected, the consumer.sasl. and producer.sasl. configurations are not emitted into the Connect worker configurations anymore. Additionally, the keytab name is randomized and the ${cm-agent:keytab} references in the Connector configurations will stop working.

Kafka Connect now supports Kerberos auth-to-local (ATL) rules with SPNEGO authentication

Kafka Connect now uses the cluster-wide Kerberos auth-to-local (ATL) rules by default. A new configuration property called Kafka Connect SPNEGO Auth To Local Rules is introduced. This property is used to manually specify the ATL rules. During an upgrade, the property is set to DEFAULT to ensure backward compatibility. Following an upgrade, if you want to use the cluster-wide rules, clear the existing value from the Kafka Connect SPNEGO Auth To Local Rules property.

Debezium connector updates

The following updates related to Debezium connectors are introduced:
  • All Debezium connectors shipped with Cloudera Runtime are upgraded to version 1.9.7.

    Existing instances of the connectors are automatically upgraded to the new version during cluster upgrade. Deploying the previously shipped version of the connector is not possible.

  • The Debezium Db2 Source connector is introduced and is available for deployment.
For more information see Kafka Connectors in Runtime or the Debezium documentation.

Parquet support for the S3 Sink connector

Version 2.0.0 of the S3 Sink connector is released. The connector now supports Parquet as an output file data format. The following property changes are made to support Parquet:
  • A new property, Parquet Compression Type, is added.

    This property specifies the compression type used for writing Parquet files. Accepted values are UNCOMPRESSED,SNAPPY, GZIP, LZO, BROTLI, LZ4, and ZSTD.

  • The Output File Data Format property now accepts Parquet as a value.
Existing connectors will continue to function, upgrading them, however, is not possible. If you want to use the new version of the connector, you must deploy a new instance of the connector.

For more information, see S3 Sink connector and S3 Sink properties reference .

Support schema ID encoding in the payload or message header in Stateless NiFi connectors

The Kafka Connect connectors powered by Stateless NiFi that support record processing are updated to support content-encoded schema references for Avro messages. These connectors now properly support integration with Schema Registry and SMM.

This improvement introduces the following changes in the affected connectors.
A new value, HWX Content-Encoded Schema Reference, is introduced for the Schema Access Strategy property
If this value is set, the schema is read from Schema Registry, and the connector expects that the Avro messages contain a content-encoded schema reference. That is, the message contains a schema reference that is encoded in the message content. The new value is introduced for the following connectors:
  • ADLS Sink
  • HDFS Sink
  • HTTP Sink
  • Influx DB Sink
  • JDBC Sink
  • JDBC Source
  • Kudu Sink
  • S3 Sink
The Schema Write Strategy property is removed from the following connectors
  • ADLS Sink
  • HDFS Sink
  • S3 Sink
  • InfluxDB Sink
A new property, Avro Schema Write Strategy is introduced
This property specifies whether and how the record schema is attached to the output data file when the format of the output is Avro. The property supports the following values:
  • Do Not Write Schema: neither the schema nor reference to the schema is attached to the output Avro messages.
  • Embed Avro Schema: the schema is embedded in every output Avro message.
  • HWX Content-Encoded Schema Reference: a reference to the schema (identified by Schema Name) within Schema Registry is encoded in the content of the outgoing Avro messages.

This property is introduced for the following connectors:

  • ADLS Sink
  • HDFS Sink
  • S3 Sink
  • SFTP Source
  • Syslog TCP Source
  • Syslog UDP Source
The minor or major version of all affected connectors is updated
Existing connectors will continue to function, upgrading them, however, is not possible. If you want to use the new version of the connector, you must deploy a new instance of the connector.

For more information, see the documentation for each connector in Kafka Connectors in Runtime and Streams Messaging Reference.