What's New in Streams Messaging
Learn about the new Streams Messaging features in Cloudera DataFlow for Data Hub 7.2.16.
Kafka
- Rebase on Kafka 3.1.2
- Kafka shipped with this version of Cloudera Runtime is based on Apache Kafka 3.1.2. For more information, see the following upstream resources:
- Multi-level rack awareness
- The rack awareness capabilities of Kafka have been improved to support multi-level
cluster topologies. As a result, brokers can now be configured to run in a multi-level
rack-aware mode. If this mode is enabled, the brokers provide multi-level rack awareness
guarantees. These guarantees ensure that topic partition replicas are spread evenly
across all levels of the physical infrastructure. For example, in a two-level hierarchy
with Data Centers on the top level and racks on the second level, brokers will evenly
spread replicas among both available DCs and racks.
The new mode is compatible with follower fetching. If multi-level mode is enabled, a compatible replica selector class is automatically installed. This implementation enables consumers (if configured), to fetch Kafka messages from the replica that is closest to them in the multi-level hierarchy.
Additionally, when Cruise Control is deployed on the cluster, the standard rack-aware goals in Cruise Control’s configuration are replaced with a multi-level rack-aware goal. This goal ensures that Cruise Control optimizations do not violate the multi-level rack awareness guarantees. This goal is currently downstream only, available exclusively in Cloudera distributed Cruise Control. For more information, see the following resources:
- Expose log directory total and usable space through the Kafka API
- KAFKA-13958 is backported in Kafka shipped with this version of Runtime. As a result, the Kafka API now exposes metrics regarding the total and usable disk space of log directories. The information on log directory space is collected by SMM and is exposed on the SMM UI. Specifically, you can now view the current log size of topics as well as the total log size and remaining storage space of brokers. For more information on how you can monitor log size metrics on the SMM UI, see Monitoring log size information.
- Kafka now accepts OAuthtokens that do not contain a “sub” claim
- KAFKA-13730 is backported in Kafka shipped with this version of Runtime. As a result, Kafka now accepts OAuth tokens that do not contain the "sub" claim. If you are using OAuth tokens that do not contain a “sub” claim, the JWT Principal Claim Name For OAuth2 Kafka service property must be configured. This property specifies the claim that contains the client’s principal. For more information on OAuth2 authentication in Kafka, see OAuth2 authentication.
- New Kafka connect connectors
- The following new Kafka connect connectors are introduced:
- HDFS Stateless Sink
- Influx DB Sink
- Debezium Db2 Source [Technical Preview]
For more information, see Connectors.
- Syslog TCP Source connector 2.0.0.
- The Syslog TCP Source Kafka Connect connector is updated to version 2.0.0. The
following notable changes and improvements are made:
- Three new properties are added, these are as follows:
Max Batch Size
This property controls the maximum number of messages to add to a single batch of messages. This is a required property. Its default value is 1.
Authorized Issuer DN Pattern
andAuthorized Subject DN Pattern
These properties allow you to enable authorization for incoming TLS connections. Both properties accept regular expressions as a value. The configured regular expressions are applied against the Distinguished Names of incoming TLS connections. If the Distinguished Names do not match the pattern, the following message is logged and the messages do not get forwarded to Kafka.
Both properties are optional and are set toError: authorization failure
.*
by default.
- The
Max Number of TCP Connections
property is replaced by theMax Number of Worker Threads
property.Similarly to
Max Number of TCP Connections
,Max Number of Worker Threads
is also used to specify the number of TCP connections, but instead of exactly specifying the number of allowed connections, you now specify how many worker threads are reserved for TCP connections. Note that a single worker thread is capable of handling multiple connections. This is a required property. Its default value is 2. - Existing version 1.0.0. connectors will continue to function, upgrading them, however, is not possible. If you want to use the new version of the connector, you must deploy a new instance of the connector.
- Deploying a version 1.0.0. instance of the connector is no longer possible.
- Three new properties are added, these are as follows:
- AvroConverter support for KConnect logical types
- The AvroConverter now converts between Connect and Avro temporal and decimal types.
- Connect internal topic Ranger policy
- A new Ranger policy,
connect internal - topic
, is generated by default on fresh installations. This policy allows the Kafka and SMM service principals to access Kafka Connect internal topics (connect-configs, connect-offsets, connect-status) and the secret management's storage topic (connect-secrets). - Connector configurations must by default override the sasl.jaas.config property of the Kafka clients used by the connector
- The Require Connectors To Override Kafka Client JAAS
Configuration Kafka Connect property is now selected by default. This
means that connector configurations must by default contain a
sasl.jaas.config
entry with an appropriate JAAS configuration that can be used to establish a connection with the Kafka service. - Connect JAAS enforcement now applies to non-override type Kafka client configs
- When the Require Connectors To Override Kafka Client JAAS
Configuration property is selected, the
consumer.sasl.
andproducer.sasl.
configurations are not emitted into the Connect worker configurations anymore. Additionally, the keytab name is randomized and the${cm-agent:keytab}
references in the Connector configurations will stop working.
Schema Registry
- Schema Registry instances behind load balancer
- You can now use load balancer in front of Schema Registry instances. It is very common to have multiple instances of the same application and have a load balancer in front of them. This can be useful for failover reasons in HA environments, and it can also help sharing the load between instances. You can also use load balancer in front of Schema Registry instances in an environment with Kerberos or SSL enabled.
- AvroConverter support for KConnect logical types
- AvroConverter now converts between Connect and Avro temporal and decimal types.
- Support for alternative jersey connectors in SchemaRegistryClient
- connector.provider.class can be configured in Schema Registry Client. If it is configured, schema.registry.client.retry.policy should also be configured to be different than default.
- Retrieve principal from client's certificate
- When two-way TLS authentication is enabled, Schema Registry extracts the principal from the certificate and uses it for authentication or authorization.
- Schema Registry CDC support - change default schema compatibility
- When a new Avro schema is created and its compatibility is not explicitly set, then a default compatibility value is used. Until now, that value was always BACKWARD. After this change, users on the server side can configure the default value.
- Schema Registry with Knox uses round-robin load balancing
- When multiple instances of Schema Registry are running, Knox uses round-robin to forward the requests.
- Upgraded Avro version to 1.11.1
- Avro got upgraded from version 1.9.1 to 1.11.1.
Streams Messaging Manager
- Improved alter topic functionalities
- You can now increase the number of partitions of a topic (but not decrease). The option is available on the Configs tab on the Topic Details page.
- Partition Assignment tab on the Topic Details page
- The Assignment tab, on the topic details page, shows the current state of the partitions and replicas of the topic. It shows some topic-level statistics and the replica assignment of all partitions. If rack awareness is being used in the Kafka cluster, the replica assignment is shown in a rack-based view. If the rack IDs follow the format of multi-level rack IDs, the rack IDs are rendered as a hierarchy.
- Added sorter functionality to partition lists in SMM UI
- The SMM UI contains Brokers and Topics pages where records contain broker or topic specific partition lists and their profile pages as well. All partition list columns become sortable.
- SMM UI shows broker rack information
- The Brokers page and the Broker Details page now both show the rack ID of the brokers. SMM now also supports a new endpoint: /api/v1/admin/topics/{topicName}/description.
- Improved SMM UX for Kafka Connectors configuration
- The connector selection and connector configuration workflow steps are now separated into two different steps. Search and autocomplete are now available for Connect configuration keys. The help icon provides detailed information about each configuration key. The data type of the configuration values can be chosen from the options menu. For more information, see Setting connector configurations.
- Added partition log-size information to the SMM UI
- The SMM UI shows log-size related information about brokers, topics, and partitions. Furthermore, warning messages appear when log directory related errors are reported by Kafka.
- SMM connector profile shows connector level error
- Errors causing the whole connector to fail are now displayed on the connector profile page if available.
- Data explorer allows specifying consumer isolation.level
- "/api/v1/admin/topics/{topicName}/partition/{partitionId}/payloads" endpoint has a new parameter: "consumerIsolationLevel". The accepted values are "read_committed" and "read_uncommitted". This sets the "isolation.level" config for the KafkaConsumer used for retrieving messages. The default value is "read_uncommitted".
- The Data Explorer page has a new design
- The Data Explorer now uses the “from offset” and “record limit” parameters to select an offset window to query.
- Offset-lag metrics in SMM UI
- SMM UI Replications tab now also shows the replication-records-lag metric.
- SMM UI Data Explorer shows null values explicitly
- Data Explorer in the SMM UI now displays 'null' with italic style applied when the value is null rather than an empty value.
- SMM uses a specific REST API to fetch list of topics
- SMM Connect page now uses the Connect active topic tracking feature to list the topics used by the Connectors instead of the connector configuration's topic property. Sink Connectors show up based on which topics they consumed from (regardless of whether "topics" or "topics.regex" config was used), and Source Connectors show up based on which topics they produced to.
- Improved configuration of SMM Kafka interceptors
- New configuration prefix for SMM monitoring interceptor's background producer: "smm.monitoring.interceptor.producer.".
- SMM Data Explorer shows JSON output in pretty printed format
- SMM Data Explorer can show JSON output in pretty printed format by using the "JSON Pretty Print" deserializer.
Streams Replication Manager
- Internal SRM topics are now automatically added to the replication deny list
- SRM now ignores internal SRM topics when replicating data. If required, this behavior
can be disabled by adding
srm.internal.topic.exclude.enable=false
to Streams Replication Manager's Replication Configs in Cloudera Manager. - Metrics and health checks for the status processor Streams application in SRM Service
- SRM Service health tests now show the state of the Connect status processor Streams application.
- SRM topic creation timeout increased
- Streams Replication Manager internal topic creation timeout property defaults are increased to 20s to tolerate intermittent issues at startup.
- Replication status metrics are enhanced with task information
- The SRM Service now reports the status of the connectors and tasks in the replication status with detailed descriptions. The status and detailed descriptions can be viewed on the Replications tab of SMM UI.
- New metric replication-records-lag
- The SRM Service now reports a new metric on its REST API called
replication-records-lag
. This metric provides information regarding the replication lag based on offsets. The metric is available both on the cluster and the topic level. - Raw metrics are compressed using LZ4
- SRM internal metric producers from now on use LZ4 compression by default. LZ4 was
chosen as it provides the best combination in terms of compression speed and
performance. As a result, Cloudera recommends that you use LZ4. If required, however,
you can change the compression by doing the following:
- Add the following configuration entries to Streams Replication
Manager's Replication
Configs
workers.cloudera.metrics.reporter.producer.compression.type=[***COMPRESSION***] connectors.cloudera.metrics.reporter.producer.compression.type=[***COMPRESSION***]
- Add the following to Additional Configs For Streams Application Running
Inside SRM
Service
producer.compression.type=[***COMPRESSION***]
- Add the following configuration entries to Streams Replication
Manager's Replication
Configs
Cruise Control
- Rebasing Cruise Control to 2.5.85
- Cruise Control is rebased to 2.5.85 version to be compatible with Apache Kafka 3.x version.
- Adding MultiLevelRackAwareGoal to Cruise Control
- As the currently available RackAwareGoal in Cruise Control does not support multi
level rack awareness, a new goal was created to ensure that the replicas are assigned
respecting the rules of multiple level racks.
The new goal is named
com.cloudera.kafka.cruisecontrol.analyzer.goals.MultiLevelRackAwareDistributionGoal.
For more information, see the following resources: