What's New in Streams Messaging

Learn about the new Streams Messaging features in Cloudera DataFlow for Data Hub 7.2.14.

Kafka

Rebase on Kafka 2.8.0

Kafka shipped with this version of Cloudera Runtime is based on Apache Kafka 2.8.0. For more information, see the following upstream resources:

Apache Kafka Notable Changes:
Apache Kafka Release Notes:
Kafka broker rolling restart checks
Cloudera Manager can now be configured to perform different types of checks on the Kafka brokers during a rolling restart. Using these checks can ensure that the brokers remain healthy during and after a rolling restart. As a result of this change, Kafka rolling restarts may take longer than in previous versions. This is true even if you disable the rolling restart checks. For more information, see Rolling restart checks.
Http Metrics Report Exclude Filter introduced for Kafka
A new property, Http Metrics Report Exclude Filter (kafka.http.metrics.reporter.exclude.filter), is introduced for the Kafka service. This property can be used to specify a regular expression that is used to filter metrics. Any metric matching the specified regular expression is not reported by Cloudera Manager. As a result, these metrics are also not displayed in SMM. Use JMX metric names when configuring this property.
Bootstrap servers are automatically configured for Kafka Connect
The Bootstrap Servers property of the Kafka Connect role is now automatically configured to include the bootstrap servers of its co-located Kafka brokers. This is only done if the property is left empty (default). You can provide custom value for this property if you want to override the default host:port pairs that Kafka Connect uses when it establishes a connection with the Kafka brokers.
Kafka Connect Ranger Authorizer
A Ranger plugin is introduced for Kafka Connect that implements the Authorizer interface. A new service type is now also introduced in Ranger called kafka-connect. By default it includes the cm_kafka_connect resource-based service which includes policies that provide default access. The default resource-based service that is created for Kafka Connect can be configured using the 'Ranger service' name for the Kafka Connect service (ranger_plugin_kafka_connect_service_name) Kafka service property.
Kafka Connect in DataHub [Technical Preview]
Kafka Connect can now be provisioned in CDP Public Cloud with Data Hub. The default Streams Messaging cluster definitions are updated to include Kafka Connect. For more information, see Streams Messaging cluster layout, Creating your first Streams Messaging cluster, and Scaling Kafka Connect.
Stateless NiFi Source and Sink [Technical Preview]
The Stateless NiFi Source and Sink connectors enable you to run NiFi dataflows within Kafka Connect. Using these connectors can grant you access to a number of NiFi features without having the need to deploy or maintain NiFi on your cluster. For more information on the connectors, best practices on building dataflows to use with these connectors, as well as information on how to deploy the connectors, see Stateless NiFi Source and Sink.
New Cloudera developed Kafka Connect connectors [Technical Preview]
In addition to the introduction of the Stateless NiFi Source and Sink, 12 new Cloudera developed connectors are available for use with Kafka Connect. These are powered by the Stateless NiFi engine and run Cloudera developed dataflows. They provide an out-of-the box solution for some of the most common use cases for moving data in or out of Kafka. For more information, see Connectors in the Kafka Connect documentation.
Kafka multiple Availability Zone support [Technical Preview]
Kafka can now be deployed in multiple Availability Zones in CDP Public Cloud. When using the multi Availability Zone feature, CDP ensures that Kafka replicates partitions across brokers in different availability zones. For more information, see Deploying CDP in multiple AWS availability zones.
The default replication factor for Kafka Connect internal topics is increased to 3
The default replication factor for Kafka Connect internal topics was set to 1. The replication factor is increased to 3. The following properties are affected:
  • Offset Storage Topic Replication Factor(offset.storage.replication.factor)
  • Configuration Storage Topic Replication Factor(config.storage.replication.factor))
  • Status Storage Topic Replication Factor (status.storage.replication.factor)

Schema Registry

Support added for JSON schemas in Schema Registry
The JSON type schema format is now supported.

Earlier, only Avro type schema format was supported out of the box.

Cloudera Manager supports rolling restarts of HA enabled Schema Registry
Schema Registry service can now be rolling restarted using Cloudera Manager.
Added Import tool for Schema Registry schemas
Schemas stored in Schema Registry can be exported to a JSON file. The exported JSON file can then be imported into another Schema Registry database. During an import, SchemaMetadata, SchemaBranch, and SchemaVersion objects are put into the database. These objects retain their ID as well as a number of other properties that are available in the JSON file used for import. This way, serializing and deserializing protocols can continue to function without any change and Schema Registry clients can seamlessly switch between different Schema Registry instances. Both import and export operations are done using the Schema Registry API.

Streams Messaging Manager

Reactive Lineage fetching from Kafka producer cache
You can now visualize the lineage between producers and consumers in SMM. Lineage information helps you to understand how the message is moving from a producer to a consumer group and which topics or partitions are part of that flow. Lineage between clients and topics or partitions are now shown using the new lineage endpoints. For more information, see Monitoring lineage information.
New endpoint added to fetch lineage for a topic
The /api/v1/admin/lineage/partitions/{topic} endpoint used to fetch which producers have produced into the queried topic, and which consumerGroup's members have consumed from it. Now when you click on a topic to fetch the lineage on the UI, this endpoint is used.
New endpoint added to fetch lineage information for a consumerGroup
The /api/v1/admin/lineage/consumerGroups/{consumerGroupId} endpoint used to fetch which topics the members of that consumerGroup have consumed from, and also what producers have produced into those topics. Now when you click on a group on the UI to fetch the lineage, this endpoint is used.
New endpoint added to fetch lineage information for a topicPartition
The /api/v1/admin/lineage/partitions/{topic}/{partition} endpoint is used to fetch which producers have produced into that queried topicPartition and which consumerGroup members have consumed from that topicPartition. Now when you click on a topicPartition to fetch the lineage on the UI, this endpoint is used.
New endpoint added to fetch lineage information for a producer
The /api/v1/admin/lineage/lineage/producers/{producerId} endpoint is used to fetch which topics the queried producer has produced into, and which consumerGroups members have consumed from those topics. Now when you click on a producer to get the lineage on the UI, this endpoint is used.
On selecting the partition on Overview page, the new lineage endpoint should be called
Lineage between clients and topics or partitions are now shown using the new lineage endpoints. Remember that when checking the lineage (connected clients) for a TopicPartition, only the recently connected clients will be shown.
Support added for multiple replication targets in SMM
SMM now supports SRM replication flows targeting remote clusters making use of the new v2 SRM APIs.
Remote Replication flows available under the /api/v2/admin/replication-stats APIs. The UI is now configured to make use of these new APIs.
Introduced multi-target replication monitoring support in alerts
  • SMM now adopted the new V2 SRM Service endpoints upon which alerting is based on.

  • In SMM when configuring alerts for replications in the UI now source and target clusters can be defined, as opposed to the previous configuring panel, where only the source cluster could be defined (since the target cluster was fixed to be the colocated Kafka cluster).

  • Old alerts will still function, however editing them can only be done using the new format, where source and target clusters have to be defined.

  • IMPORTANT: For alerts involving remote SRM cluster queries set the execution interval to at the very least a minute (preferably more).

SMM authenticates to SRM Service

SMM now automatically configures Basic Authentication when connecting to SRM and the service dependency based auto-configuration is in use.

For manual SRM connectivity configurations, Basic Auth configurations were added (Streams Replication Manager Basic Authentication, Streams Replication Manager Basic Authentication Username, Streams Replication Manager Basic Authentication Password).

SMM Cache-Control is part of default SMM REST Server API's responses' headers
The new SMM configuration named cache.control.http.response.header.value allows to configure the Cache-Control header's value for certain endpoints. Configure it in the following key-value like fashion:
  • The key is the path prefix to the endpoints where the Cache-Control header should be added.
  • The value is the value of Cache-Control header.

In order to turn off functionalities provided by the Cache-Control header just delete the entries, or set the value to no-store.

Added helper tooltips to SMM UI
SMM now provides more informative tooltips (hover over the table headers and labels) for most of its elements in the web UI.
Removed Consumer Rate graphs
The lag rate graph is removed from the UI.
The lag rate values are removed from the /api/v1/admin/metrics/aggregated/groups/{groupName} and /api/v1/admin/metrics/aggregated/groups endpoints.
SMM is automatically integrated with co-located Kafka Connect
To monitor and manage Kafka Connect in SMM, a number of SMM service properties must be configured. These are the following:
  • Kafka Connect Host
  • Kafka Connect Port
  • Kafka Connect Protocol

From now on, these properties are automatically configured if Kafka Connect Host is left empty (default). This means that the SMM service automatically configures itself to connect to its co-located Kafka Connect instance. You can provide custom values for all properties if you want to override the defaults.

Streams Replication Manager

SRM Driver monitoring using Cloudera Manager
Cloudera Manager’s ability to monitor the SRM Driver, its replications, and the overall health of SRM is improved. Most notably, the health status of SRM is based on the health of the network and the availability of replication sources and targets. As a result of this improvement, two new metrics, a new health test, and several new configuration properties are introduced for the SRM Driver in Cloudera Manager.
New metrics and health test
The new metrics are as follows:
  • SRM Driver Distributed Herder Status (streams_replication_manager_distributed_herder_status)
  • Aggregated Status Code of SRM Driver Replication Flows (streams_replication_manager_aggregated_herder_status)

The distributed metric describes the status of individual replications. The aggregate metric provides the aggregate status of all replications.

The new health test is called DISTRIBUTED_HERDER_STATUS. This health test is based on the aggregate metric and provides information on the overall status of SRM and its replications.

New properties
The new monitoring related properties are as follows:
  • Path for driver plugins (plugin.path)
  • Enable HTTP(S) Metrics Reporter (mm.metrics.servlet.enable)
  • SSL Encryption for the Metrics Reporter (metrics.jetty.server.ssl.enabled)
  • HTTP Metrics Reporter Port (metrics.jetty.server.port)
  • HTTPS Metrics Reporter Port (metrics.jetty.server.secureport)
  • Enable Basic Authentication for Metrics Reporter (metrics.jetty.server.authentication.enabled)
  • Metrics Reporter User Name (metrics.jetty.server.auth.username)
  • Metrics Reporter Password (metrics.jetty.server.auth.password)
For more information, see Cloudera Manager Configuration Properties Reference.
SRM Driver now automatically retries setting up replications for unavailable target Kafka clusters
Previously, if any of the Kafka clusters that were targeted by the SRM Driver were unavailable at startup, the SRM Driver stopped. As a result of an improvement, the SRM Driver now instead sets up replications for all target Kafka clusters that are available and continuously retries to set up replication for unavailable clusters. Retry behaviour is configurable in Cloudera Manager. The new properties related to retry behaviour are as follows:
  • Retry Count for SRM Driver (mm.replication.restart.count)
  • Retry Delay for SRM Driver (mm.replication.restart.delay.ms)
For more information see, Cloudera Manager Configuration Properties Reference or Configuring SRM Driver retry behaviour.
Disabled replications can now be fully deactivated by configuring heartbeat emission
As a result of the rebase to Kafka 2.8 (KAFKA-10710), an improvement is introduced in connection with heartbeat emission. From now on, you can fine tune your deployment and fully deactivate any unnecessary replications that are set up by default by configuring heartbeat emission. This can help with minimizing any performance overhead caused by unnecessary replications.

To support this change, an improvement was made for the SRM service in Cloudera Manager. A dedicated configuration property, Enable Heartbeats, is introduced. You can use this property to configure emit.heartbeats.enabled on a global level directly in Cloudera Manager. Replication level overrides are still supported. This can be done by adding emit.heartbeats.enabled with a valid replication prefix to Streams Replication Manager's Replication Configs. For more information on configuring heartbeat emission, see Configuring SRM Driver heartbeat emission.

IdentityReplicationPolicy now available
The version of Apache Kafka shipped with this release of Cloudera Runtime includes KAFKA-9726. As a result, the IdentityReplicationPolicy is available for use with Streams Replication Manager. This replication policy does not rename remote (replicated) topics. Streams Replication Manager can be configured to use this replication policy by adding the following entry to Streams Replication Manager's Replication Configs:
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
For more information, see KAFKA-9726.
SRM configuration properties can be configured globally for Connect workers and Connect connectors
The SRM Driver now accepts configuration properties prefixed with the workers. and connectors. prefixes. Configuration properties added to Streams Replication Manager's Replication Configs that use these prefixes are applied globally to all Connect workers or Connect connectors that the SRM Driver creates. For more information regarding the prefixes, see Understanding SRM properties, their configuration and hierarchy. For more information on Connect workers and connectors, see Streams Replication Manager Architecture.
SRM Service Basic Authentication support
The SRM Service can now be secured using Basic Authentication. Once Basic Authentication is set up and enabled, the REST API of the SRM Service becomes secured. Any clients or services that connect to the REST API will be required to present valid credentials for access. Configuration is done in Cloudera Manager using SRM configuration properties and external accounts. For more information, see Configuring Basic Authentication for the SRM Service.
SRM automatically creates a Basic Authentication credential for co-located services
SRM automatically creates a Basic Authentication credential for co-located services (users can change the credentials using SRM Service Co-Located Service Username and SRM Service Co-Located Service User Password). When Basic Authentication is enabled, this user is automatically accepted by the SRM Service. For more information, see Configuring Basic Authentication for the SRM Service.
SRM Service Remote Querying no longer in technical preview
SRM Service Remote Querying was introduced in a previous release of Cloudera Runtime as a technical preview feature. Starting with this release, Remote Querying is ready for use in production environments. This is the result of Basic Authentication being introduced for the SRM Service and SMM supporting multi-target alerting.

For more information on Remote Querying, see Remote Querying and Configuring Remote Querying. For more information on how to set up Basic Authentication for Remote Querying, see Configuring Basic Authentication for Remote Querying.

The SRM Driver can now write the origin offset into the record header
SRM now supports a diagnostic feature in which the source offset of the replicated records are written into the headers. The feature can be turned on by setting copy.source.offset.in.header.enabled to true. When enabled, the source offset is written into a header named mm2-source-offset in binary format. The schema of the header payload is available in the connect:mirror-client package, the class name is org.apache.kafka.connect.mirror.SourceOffsets. This feature is only recommended for diagnostic purposes, as the header change increases the size of the replica topic.

Cruise Control

Cruise Control 2.5.66 Rebase
Cruise Control in Cloudera Runtime is rebased from 2.0.100 to the 2.5.66 version. The main feature changes include ZooKeeper TLS/SSL support and the Cruise Control Metric Reporter support.
ZooKeeper TLS/SSL support for Cruise Control
When TLS is enabled on the cluster, Cruise Control automatically uses the Zookeeper for secure communication.
Cruise Control Metric Reporter support
Beside the Cloudera Manager Metrics Reporter, the Kafka based Cruise Control Metrics Reporter can also be used. The configuration needs to be set manually, and further adjustments are needed when changing the default Metrics Reporter.
Replacing RackAwareGoal to RackAwareDistributionGoal
The default RackAwareGoal is too strict to allow multiple replicas of the same partition to be placed into a single rack. This means that if there is an outage in one of the availability zones, Cruise Control cannot execute the reassignment. Using the RackAwareDistributionGoal improves this functionality of Cruise Control, as it allows multiple replicas of a partition to be placed into a single rack. This can be achieved as long as the replicas of each partition can be evenly distributed across the racks.