Enabling interceptors

You need to enable interceptors for consumers, producers, and KafkaStreams applications to enable Streams Messaging Manager (SMM) to fetch the metrics. If you do not enable the interceptors, you can not see any metrics in SMM.

Interceptors publish the metrics to Kafka periodically. Metrics include counts on the producer side, and count average latency, and minimum and maximum latencies on the consumer side.

Steps to enable interceptors in your application

Add the following jar to the classpath of the application or as a dependency in the application:
<dependency>
	<groupId>com.hortonworks.smm</groupId>
	<artifactId>monitoring-interceptors</artifactId>
</dependency>

Steps to enable consumer interceptor

Perform the following steps to enable consumer interceptor:
  1. Add the interceptor.classes property to consumer configuration that gets passed to the KafkaConsumer constructor.
  2. Configure the client.id property as follows:
    KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers, String groupId, String clientIdentifier) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientIdentifier);
        //Add ConsumerInterceptor like this
        properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                         "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");
    
        return new KafkaConsumer<Integer, String>(properties);
    }

Steps to enable producer interceptor

Add the interceptor.classes property to producer configuration that gets passed to the KafkaProducer constructor, as follows:
KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    //Add ProducerInterceptor like this
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                  "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");

    return new KafkaProducer<Integer, String>(properties);
}

Steps to enable interceptors in KafkaStreams applications

Add the producer.interceptor.classes and consumer.interceptor.classes properties to Kafka Streams configurations, as follows:
void startKafkaStreams(StreamsBuilder builder) {
    KafkaStreams kstreams = new KafkaStreams(builder.build(), getKafkaStreamsConfiguration());
    kstreams.start();
}

Properties getKafkaStreamsConfiguration() {
    Properties config = new Properties();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
    config.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);

    //Add producer interceptor like this
    config.put(
            StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
            "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");

    //Add consumer interceptor like this
    config.put(
            StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
            "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");

    return config;
}