What's New in Streams Messaging

Learn about the new Streams Messaging features in Cloudera DataFlow for Data Hub 7.2.15.

Streams Messaging cluster definitions and templates

Streams Messaging High Availability cluster definition and template
Three new cluster definitions are introduced for Streams Messaging. The new definitions are as follows:
  • Streams Messaging High Availability for AWS
  • Streams Messaging High Availability for Azure (Technical Preview)
  • Streams Messaging High Availability for Google Cloud (Technical Preview)
Additionally, a new template called CDP - Streams Messaging High Availability is also introduced. You can use the template and definitions to deploy highly available Streams Messaging clusters that leverage multiple availability zones and ensure that functionality is not degraded when a single availability zone has an outage. For more information regarding cluster layout, see Streams Messaging cluster layout. For more information on how to deploy a cluster with the new definition, see Creating your first Streams Messaging cluster.


Enable JMX Authentication by default
JMX Authentication is now enabled by default for the Kafka service. Randomly generated passwords are now set for both the JMX monitor (read only access) and control (read and write access) users. The default passwords can be changed at any time using the Password of User with read-only Access to the JMX agent and the Password of user with read-write access to the JMX agent Kafka service properties. Additionally, JMX authentication can be turned off using the Enable Authenticated Communication with the JMX Agent property.
OAuth2 authentication available for Kafka
Oauth2 authentication support is added for the Kafka service. You can now configure Kafka brokers to authenticate clients using Oauth2. For more information, see OAuth2 authentication.
HSTS header is included by default in Kafka Connect REST API responses
Kafka Connect REST API responses now include the HSTS header by default.
Kafka load balancer support
The Kafka service can now be provided with a host of a load balancer that is used to balance connection bootstraps between multiple brokers. The host can be configured using the Kafka Broker Load Balancer Host property. Additionally, if a host is configured, the Kafka service configures a listener for accepting requests from the load balancer. This port is customizable using the Kafka Broker Load Balancer Listener Port property. Using these properties configures your Kafka service in a way that clients can connect to the brokers without encountering ticket mismatch issues in Kerberized environments or TLS/SSL hostname verification failures.
Importing Kafka entities into Atlas
Kafka topics and clients can now be imported into Atlas as entities (metadata) using a new action available for the Kafka service in Cloudera Manager. The new action is available at Kafka service>Actions>Import Kafka Topics Into Atlas. The action serves as a replacement/alternative for the kafka-import.sh tool. For more information, see Importing Kafka entities into Atlas.
Debezium Connector support
The following change data capture (CDC) connectors are added to Kafka Connect:
  • Debezium MySQL Source
  • Debezium Postgres Source
  • Debezium SQL Server Source
  • Debezium Oracle Source

Each of the connectors require CDP specific steps before they can be deployed. For more information, see Connectors.

Secure Kafka Connect
Kafka Connect is now generally available and can be used in production environments. This is the result of multiple changes, improvements, and new features related to Kafka Connect security including the following:
SPNEGO authentication for the Kafka Connect REST API
You can secure the Kafka Connect REST API by enabling SPNEGO authentication. If SPNEGO authentication is enabled, only users authenticated with Kerberos are able to access and use the REST API. Additionally, if Ranger authorization is enabled for the Kafka service, authenticated users will only be able perform the operations that they are authorized for. For more information, see Configuring SPNEGO Authentication and trusted proxies for the Kafka Connect REST API.
Kafka Connect Authorization model
An authorization model is introduced for Kafka Connect. Implementations are pluggable and it is up to the implementation how the capabilities of the model are utilized. The authorization model is implemented by default in Ranger. For more information about the model, see Kafka Connect authorization model. For more information about the Ranger integration of the model, see Kafka Connect Ranger integration.
Kafka Connect connector configurations can now be secured
A new feature called Kafka Connect Secrets Storage is introduced. This feature enables you to mark properties within connector configurations as a secret. If a property is marked as a secret, the feature stores and handles the value of that property in a secure manner. For more information, see Kafka Connect Secrets Storage.
Kafka Connect Connectors can be configured to override the JAAS, and restrict the usage of the Worker principal
Kafka Connect now allows users to force Connectors to override the JAAS configuration of the Kafka connection, and also forbid using the same Kerberos credentials as the Connect worker is using. For more information, see Configuring connector JAAS configuration and Kerberos principal overrides
Nexus allow list for Stateless NiFi Source and Sink connectors
A new configuration property, List Of Allowed Nexus Repository Urls, is introduced for the Kafka service. This property enables you to specify a list of allowed Nexus repositories that Kafka Connect connectors are allowed to connect to when fetching NiFi extensions. Configuring an allow list using the property can harden the security Kafka Connect deployment. For more information, see Configuring a Nexus repository allow list.

Schema Registry

Added OAuth support for Schema Registry client authentication
You can use OAuth2 JSON Web Token (JWT) in Schema Registry for authentication. Authorization continues to be implemented in Ranger, however, you can obtain the principal from a JWT token.
Added a findAllSchemas() method to the Schema Registry Client code
Provides a findAllSchemas() method which enumerates all schemas contained in the schema registry, returned as a list of SchemaMetadataInfo. This is useful if you only need to enumerate all schemas by name, without incurring the additional overhead of the findAggregatedSchemas() method.
Support for reading keys from JWK
Keys can be stored in JWK. The validation is done by matching with the "kid" property in JWT. If "kid" is not given then we match on the algorithm.
Added JWT validation filter
Added Servlet filter which checks if the incoming requests contain a valid authentication token.
SchemaRegistryClient gets token from OAuth Server with clientId/secret
Schema Registry Client can be configured to use OAuth2 authentication. The following parameters need to be added when creating a Schema Registry Client:
  • "schema.registry.auth.type" = "oauth2" (default value is kerberos)
  • "schema.registry.oauth.client.id" (ClientId for OAuth2 server)
  • "schema.registry.oauth.secret" (Secret for OAuth2 server)
  • "schema.registry.oauth.server.url" (REST API endpoint of OAuth2 server)
Support added for RSA and HMAC certificates
Added support for JWT signed by either RSA or HMAC.

Streams Messaging Manager

Improvement in the Connect tab of the SMM UI
You can now deploy Kafka Connect connector configurations containing secret properties which will be stored in an encrypted storage (by default in Kafka). The deployed configuration will only contain references to these secrets. With this comes the need to mark properties as secret on the Streams Messaging Manager user interface so, a new connector creation form is introduced, which supports it. You can import configurations and populate the form automatically.
Kafka Connect improvement
  • In NiFi connectors, you can now provide file path or URL for the flow.snapshot or alternatively you can upload it from file.
  • You can now import Connector Configurations as a whole instead of adding individual configurations.
  • Connector configuration validation errors are now correlated with individual config key.
  • Sensitive properties are now hidden from the SMM UI and support is added to set properties as sensitive.
Partition dimension removal in SMM
The partition dimensions of the producer ("/api/v2/admin/metrics/aggregated/producers") and consumer ("/api/v2/admin/metrics/aggregated/groups") metrics are removed from the SMM cache, and are not exposed anymore through the API. This made the SMM memory footprint smaller, relieved some of the load from the metric store, and the network traffic became smaller. With this change, you get a cleaner, and easily readable API, and the UI is snappier, and faster than before.
The version of the /api/v1/admin/metrics/aggregated/* and /api/v1/admin/lineage/* endpoints have been changed to /api/v2/admin/aggregated and /api/v2/admin/lineage. With this change, the response objects are changed as well.
For the /lineage endpoints a common lineage response object is introduced in v2 as opposed to the specific (and different) objects in the experimental v1 endpoint.
For the /aggregated/* endpoints, the partition level metrics (that were in the wrappedPartitionMetrics field) are removed. Partition level metrics have been removed from the /aggregate/producers and /aggregated/producers/{producerClientId} but they are still available in the corresponding /metrics/producers and /metrics/producers/{producerId} endpoints.
Stateless Sink and Source should populate Key/Value Converters
SMM UI Connector Creation page now contains a default key/value converter to the StatelessNiFiSource or StatelessNiFiSink connectors.
Added API to enrich a sample configuration
Streams Messaging Manager API /connector-templates/config/enhance is added, which accepts a sample connector configuration and enhances it with the properties that are probably needed for that connector.
Add "emit.consumer.metrics" config to SMM CSD, and remove (now) unused SMON host/port configs
Removed "cm.metrics.service.monitor.host" and "cm.metrics.service.monitor.port" configurations from Streams Messaging Manager.

These no longer have to be configured as SMM automatically detects ServiceMonitor's location and emits the ConsumerGroup metrics into it.

Added "emit.consumer.metrics" configuration to Streams Messaging Manager.

In case this flag is disabled, Streams Messaging Manager does not emit historic ConsumerGroup metrics into ServiceMonitor, meaning historic metrics (for group Lag and CommittedOffset) would not be available for Groups in SMM. These metrics are used to populate the charts at the bottom of the ConsumerGroupDetail page, or accessed through the "api/v2/admin/metrics/consumers/group/{groupId}" REST API endpoint.

Increase SMM version to 2.3
SMM version is increased.
SMM UI should show the replication status tooltip
Streams Messaging Manager now shows tooltip for the replication status.
On the Overview page adjust the lineage information shown
On the Overview page, when a Producer or a Consumer is selected, an arrow points to the topic(s) it produced to or consumed from instead of the partitions.

Streams Replication Manager

SRM now creates all internal topics with correct configurations at startup

The internal topics used by SRM are now automatically created with correct configurations at startup. These are the metrics topics, the topics used by the srm-control tool, and the topics used by the SRM Service for service discovery. Additionally, SRM also verifies that the topics are created with correct configurations. If the topics are not configured as expected, SRM fails to start. This improvement fixes CDPD-31745.

Increase the default replication factor of internal topics to 3
The internal topics used by SRM are now created with a replication factor of 3 by default. As a result, SRM is now more resistant to host failures. Additionally, Cruise Control can now automatically heal SRM’s internal topics in the event of a single host failure.
SRM now waits for latest offset syncs and does not set the consumer offset into the future
The MirrorCheckpointConnector now checks the latest message in the offset sync topic at startup, and does not emit a checkpoint message until it has read from the beginning all the messages prior and including that last message.

As a part of this improvement, a new configuration property, emit.checkpoints.end.offset.protection is introduced. When this property is enabled, the MirrorCheckpointTask checks the end offset of the replicated topic prior to emitting a checkpoint, and limits the replicated offset to be maximum that value. With this behavior enabled, SRM no longer encounters an issue where in certain situations the replicated offset could be higher than the end offset of the replicated topic, producing a negative lag. The property is enabled by default, but can be configured using the Streams Replication Manager's Replication Configs property.

Cruise Control

Configuration property for HTTP Strict Transport Security
There is a new configuration property for Cruise Control that enables Strict Transport Security header in the web server responses when SSL is enabled. By default, the configuration is enabled, and when TLS is enabled, Cruise Control sets the Strict Transport Security policy in the web server responses.