Monitoring End-to-End Latency using Streams Messaging ManagerPDF version

Enabling Interceptors

Interceptors publish the metrics to Kafka periodically. Metrics include counts on the producer side, and count, average latency, minimum and maximum latencies on the consumer side.

You need to enable interceptors for consumers, producers, and KafkaStreams applications to enable SMM to fetch the metrics. If you do not enable the interceptors, you can not see any metrics in SMM.

Add the following jar to the classpath of the application or as a dependency in the application:
<dependency>
	<groupId>com.hortonworks.smm</groupId>
	<artifactId>monitoring-interceptors</artifactId>
</dependency>
Perform the following steps to enable consumer interceptor:
  1. Add the interceptor.classes property to consumer configuration that gets passed to the KafkaConsumer constructor.
  2. Configure the client.id property as follows:
    KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers, String groupId, String clientIdentifier) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientIdentifier);
        //Add ConsumerInterceptor like this
        properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                         "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");
    
        return new KafkaConsumer<Integer, String>(properties);
    }
Add the interceptor.classes property to producer configuration that gets passed to the KafkaProducer constructor, as follows:
KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    //Add ProducerInterceptor like this
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                  "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");

    return new KafkaProducer<Integer, String>(properties);
}
Add the producer.interceptor.classes and consumer.interceptor.classes properties to Kafka Streams configurations, as follows:
void startKafkaStreams(StreamsBuilder builder) {
    KafkaStreams kstreams = new KafkaStreams(builder.build(), getKafkaStreamsConfiguration());
    kstreams.start();
}

Properties getKafkaStreamsConfiguration() {
    Properties config = new Properties();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
    config.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);

    //Add producer interceptor like this
    config.put(
            StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
            "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");

    //Add consumer interceptor like this
    config.put(
            StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
            "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");

    return config;
}