What's New in Streams Messaging

Learn about the new Streams Messaging features in Cloudera DataFlow for Data Hub 7.2.16.


Rebase on Kafka 3.1.2
Kafka shipped with this version of Cloudera Runtime is based on Apache Kafka 3.1.2. For more information, see the following upstream resources:
Apache Kafka Notable Changes:
Apache Kafka Release Notes:
Multi-level rack awareness
The rack awareness capabilities of Kafka have been improved to support multi-level cluster topologies. As a result, brokers can now be configured to run in a multi-level rack-aware mode. If this mode is enabled, the brokers provide multi-level rack awareness guarantees. These guarantees ensure that topic partition replicas are spread evenly across all levels of the physical infrastructure. For example, in a two-level hierarchy with Data Centers on the top level and racks on the second level, brokers will evenly spread replicas among both available DCs and racks.

The new mode is compatible with follower fetching. If multi-level mode is enabled, a compatible replica selector class is automatically installed. This implementation enables consumers (if configured), to fetch Kafka messages from the replica that is closest to them in the multi-level hierarchy.

Additionally, when Cruise Control is deployed on the cluster, the standard rack-aware goals in Cruise Control’s configuration are replaced with a multi-level rack-aware goal. This goal ensures that Cruise Control optimizations do not violate the multi-level rack awareness guarantees. This goal is currently downstream only, available exclusively in Cloudera distributed Cruise Control. For more information, see the following resources:
Expose log directory total and usable space through the Kafka API
KAFKA-13958 is backported in Kafka shipped with this version of Runtime. As a result, the Kafka API now exposes metrics regarding the total and usable disk space of log directories. The information on log directory space is collected by SMM and is exposed on the SMM UI. Specifically, you can now view the current log size of topics as well as the total log size and remaining storage space of brokers. For more information on how you can monitor log size metrics on the SMM UI, see Monitoring log size information.
Kafka now accepts OAuthtokens that do not contain a “sub” claim
KAFKA-13730 is backported in Kafka shipped with this version of Runtime. As a result, Kafka now accepts OAuth tokens that do not contain the "sub" claim. If you are using OAuth tokens that do not contain a “sub” claim, the JWT Principal Claim Name For OAuth2 Kafka service property must be configured. This property specifies the claim that contains the client’s principal. For more information on OAuth2 authentication in Kafka, see OAuth2 authentication.
New Kafka connect connectors
The following new Kafka connect connectors are introduced:
  • HDFS Stateless Sink
  • Influx DB Sink
  • Debezium Db2 Source [Technical Preview]

For more information, see Connectors.

Syslog TCP Source connector 2.0.0.
The Syslog TCP Source Kafka Connect connector is updated to version 2.0.0. The following notable changes and improvements are made:
  • Three new properties are added, these are as follows:
    • Max Batch Size

      This property controls the maximum number of messages to add to a single batch of messages. This is a required property. Its default value is 1.

    • Authorized Issuer DN Pattern and Authorized Subject DN Pattern
      These properties allow you to enable authorization for incoming TLS connections. Both properties accept regular expressions as a value. The configured regular expressions are applied against the Distinguished Names of incoming TLS connections. If the Distinguished Names do not match the pattern, the following message is logged and the messages do not get forwarded to Kafka.
      Error: authorization failure
      Both properties are optional and are set to .* by default.
  • The Max Number of TCP Connections property is replaced by the Max Number of Worker Threads property.

    Similarly to Max Number of TCP Connections, Max Number of Worker Threads is also used to specify the number of TCP connections, but instead of exactly specifying the number of allowed connections, you now specify how many worker threads are reserved for TCP connections. Note that a single worker thread is capable of handling multiple connections. This is a required property. Its default value is 2.

  • Existing version 1.0.0. connectors will continue to function, upgrading them, however, is not possible. If you want to use the new version of the connector, you must deploy a new instance of the connector.
  • Deploying a version 1.0.0. instance of the connector is no longer possible.
AvroConverter support for KConnect logical types
The AvroConverter now converts between Connect and Avro temporal and decimal types.
Connect internal topic Ranger policy
A new Ranger policy, connect internal - topic, is generated by default on fresh installations. This policy allows the Kafka and SMM service principals to access Kafka Connect internal topics (connect-configs, connect-offsets, connect-status) and the secret management's storage topic (connect-secrets).
Connector configurations must by default override the sasl.jaas.config property of the Kafka clients used by the connector
The Require Connectors To Override Kafka Client JAAS Configuration Kafka Connect property is now selected by default. This means that connector configurations must by default contain a sasl.jaas.config entry with an appropriate JAAS configuration that can be used to establish a connection with the Kafka service.
Connect JAAS enforcement now applies to non-override type Kafka client configs
When the Require Connectors To Override Kafka Client JAAS Configuration property is selected, the consumer.sasl. and producer.sasl. configurations are not emitted into the Connect worker configurations anymore. Additionally, the keytab name is randomized and the ${cm-agent:keytab} references in the Connector configurations will stop working.

Schema Registry

Schema Registry instances behind load balancer
You can now use load balancer in front of Schema Registry instances. It is very common to have multiple instances of the same application and have a load balancer in front of them. This can be useful for failover reasons in HA environments, and it can also help sharing the load between instances. You can also use load balancer in front of Schema Registry instances in an environment with Kerberos or SSL enabled.
AvroConverter support for KConnect logical types
AvroConverter now converts between Connect and Avro temporal and decimal types.
Support for alternative jersey connectors in SchemaRegistryClient
connector.provider.class can be configured in Schema Registry Client. If it is configured, schema.registry.client.retry.policy should also be configured to be different than default.
This also fixes the issue with some 3rd party load balancers where the client is expected to follow redirects and authenticate while doing that.
Retrieve principal from client's certificate
When two-way TLS authentication is enabled, Schema Registry extracts the principal from the certificate and uses it for authentication or authorization.
Schema Registry CDC support - change default schema compatibility
When a new Avro schema is created and its compatibility is not explicitly set, then a default compatibility value is used. Until now, that value was always BACKWARD. After this change, users on the server side can configure the default value.
Schema Registry with Knox uses round-robin load balancing
When multiple instances of Schema Registry are running, Knox uses round-robin to forward the requests.
Upgraded Avro version to 1.11.1
Avro got upgraded from version 1.9.1 to 1.11.1.

Streams Messaging Manager

Improved alter topic functionalities
You can now increase the number of partitions of a topic (but not decrease). The option is available on the Configs tab on the Topic Details page.
Partition Assignment tab on the Topic Details page
The Assignment tab, on the topic details page, shows the current state of the partitions and replicas of the topic. It shows some topic-level statistics and the replica assignment of all partitions. If rack awareness is being used in the Kafka cluster, the replica assignment is shown in a rack-based view. If the rack IDs follow the format of multi-level rack IDs, the rack IDs are rendered as a hierarchy.
Added sorter functionality to partition lists in SMM UI
The SMM UI contains Brokers and Topics pages where records contain broker or topic specific partition lists and their profile pages as well. All partition list columns become sortable.
SMM UI shows broker rack information
The Brokers page and the Broker Details page now both show the rack ID of the brokers. SMM now also supports a new endpoint: /api/v1/admin/topics/{topicName}/description.
Improved SMM UX for Kafka Connectors configuration
The connector selection and connector configuration workflow steps are now separated into two different steps. Search and autocomplete are now available for Connect configuration keys. The help icon provides detailed information about each configuration key. The data type of the configuration values can be chosen from the options menu. For more information, see Setting connector configurations.
Added partition log-size information to the SMM UI
The SMM UI shows log-size related information about brokers, topics, and partitions. Furthermore, warning messages appear when log directory related errors are reported by Kafka.
SMM connector profile shows connector level error
Errors causing the whole connector to fail are now displayed on the connector profile page if available.
Data explorer allows specifying consumer isolation.level
"/api/v1/admin/topics/{topicName}/partition/{partitionId}/payloads" endpoint has a new parameter: "consumerIsolationLevel". The accepted values are "read_committed" and "read_uncommitted". This sets the "isolation.level" config for the KafkaConsumer used for retrieving messages. The default value is "read_uncommitted".
Additionally, the parameter can be set on the UI as well.
The Data Explorer page has a new design
The Data Explorer now uses the “from offset” and “record limit” parameters to select an offset window to query.
Offset-lag metrics in SMM UI
SMM UI Replications tab now also shows the replication-records-lag metric.
SMM UI Data Explorer shows null values explicitly
Data Explorer in the SMM UI now displays 'null' with italic style applied when the value is null rather than an empty value.
SMM uses a specific REST API to fetch list of topics
SMM Connect page now uses the Connect active topic tracking feature to list the topics used by the Connectors instead of the connector configuration's topic property. Sink Connectors show up based on which topics they consumed from (regardless of whether "topics" or "topics.regex" config was used), and Source Connectors show up based on which topics they produced to.
Improved configuration of SMM Kafka interceptors
New configuration prefix for SMM monitoring interceptor's background producer: "smm.monitoring.interceptor.producer.".
Clients that use either of the SMM monitoring interceptors (MonitoringConsumerInterceptor, MonitoringProducerInterceptor) use a background producer to push client metrics into Kafka every 30 seconds. This background producer now can be configured by passing Producer Configurations to the client that uses the interceptor with the "smm.monitoring.interceptor.producer." prefix. The prefix is trimmed and the remaining part of the configuration is passed to the background producer. For instance, if the user wishes to configure the "batch.size" property for the background producer, the following configuration should be passed: "smm.monitoring.interceptor.producer.batch.size".
There is a different behavior, however, with the "client.id" property. If the user does not provide a configuration to the client id (smm.monitoring.interceptor.producer.client.id), the default is used, which is: "smm-monitoring-interceptor".
SMM Data Explorer shows JSON output in pretty printed format
SMM Data Explorer can show JSON output in pretty printed format by using the "JSON Pretty Print" deserializer.

Streams Replication Manager

Internal SRM topics are now automatically added to the replication deny list
SRM now ignores internal SRM topics when replicating data. If required, this behavior can be disabled by adding srm.internal.topic.exclude.enable=false to Streams Replication Manager's Replication Configs in Cloudera Manager.
Metrics and health checks for the status processor Streams application in SRM Service
SRM Service health tests now show the state of the Connect status processor Streams application.
SRM topic creation timeout increased
Streams Replication Manager internal topic creation timeout property defaults are increased to 20s to tolerate intermittent issues at startup.
Replication status metrics are enhanced with task information
The SRM Service now reports the status of the connectors and tasks in the replication status with detailed descriptions. The status and detailed descriptions can be viewed on the Replications tab of SMM UI.
New metric replication-records-lag
The SRM Service now reports a new metric on its REST API called replication-records-lag. This metric provides information regarding the replication lag based on offsets. The metric is available both on the cluster and the topic level.
Raw metrics are compressed using LZ4
SRM internal metric producers from now on use LZ4 compression by default. LZ4 was chosen as it provides the best combination in terms of compression speed and performance. As a result, Cloudera recommends that you use LZ4. If required, however, you can change the compression by doing the following:
  1. Add the following configuration entries to Streams Replication Manager's Replication Configs
  2. Add the following to Additional Configs For Streams Application Running Inside SRM Service
For more information regarding, metrics, monitoring, as well as raw metric collection and aggregation, see Streams Replication Manager monitoring and metrics .

Cruise Control

Rebasing Cruise Control to 2.5.85
Cruise Control is rebased to 2.5.85 version to be compatible with Apache Kafka 3.x version.
Adding MultiLevelRackAwareGoal to Cruise Control
As the currently available RackAwareGoal in Cruise Control does not support multi level rack awareness, a new goal was created to ensure that the replicas are assigned respecting the rules of multiple level racks.

The new goal is named com.cloudera.kafka.cruisecontrol.analyzer.goals.MultiLevelRackAwareDistributionGoal.