Enabling interceptors
You need to enable interceptors for consumers, producers, and KafkaStreams applications to enable Streams Messaging Manager (SMM) to fetch the metrics. If you do not enable the interceptors, you can not see any metrics in SMM.
Interceptors publish the metrics to Kafka periodically. Metrics include counts on the producer side, and count average latency, and minimum and maximum latencies on the consumer side.
Steps to enable interceptors in your application
Add the following jar to the classpath of the application or as a dependency in the
                application:
        <dependency>
	<groupId>com.hortonworks.smm</groupId>
	<artifactId>monitoring-interceptors</artifactId>
</dependency>Steps to enable consumer interceptor
Perform the following steps to enable consumer interceptor:
        - Add the 
interceptor.classesproperty to consumer configuration that gets passed to the KafkaConsumer constructor. - Configure the 
client.idproperty as follows:KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers, String groupId, String clientIdentifier) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientIdentifier); //Add ConsumerInterceptor like this properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor"); return new KafkaConsumer<Integer, String>(properties); } 
Steps to enable producer interceptor
Add the 
        interceptor.classes property to producer
                configuration that gets passed to the KafkaProducer constructor, as
                follows:KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    //Add ProducerInterceptor like this
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
       "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");
    return new KafkaProducer<Integer, String>(properties);
}Steps to enable interceptors in KafkaStreams applications
Add the 
            
        producer.interceptor.classes and
                    consumer.interceptor.classes properties to Kafka Streams
                configurations, as
                follows:void startKafkaStreams(StreamsBuilder builder) {
    KafkaStreams kstreams = new KafkaStreams(builder.build(), getKafkaStreamsConfiguration());
    kstreams.start();
}
Properties getKafkaStreamsConfiguration() {
    Properties config = new Properties();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
    config.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
    //Add producer interceptor like this
    config.put(
      StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
      "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");
    //Add consumer interceptor like this
    config.put(
      StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
      "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");
    return config;
}