Enabling interceptors
You need to enable interceptors for consumers, producers, and KafkaStreams applications to enable Streams Messaging Manager (SMM) to fetch the metrics. If you do not enable the interceptors, you can not see any metrics in SMM.
Interceptors publish the metrics to Kafka periodically. Metrics include counts on the producer side, and count average latency, and minimum and maximum latencies on the consumer side.
Steps to enable interceptors in your application
Add the following jar to the classpath of the application or as a dependency in the
application:
<dependency>
<groupId>com.hortonworks.smm</groupId>
<artifactId>monitoring-interceptors</artifactId>
</dependency>
Steps to enable consumer interceptor
Perform the following steps to enable consumer interceptor:
- Add the
interceptor.classes
property to consumer configuration that gets passed to the KafkaConsumer constructor. - Configure the
client.id
property as follows:KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers, String groupId, String clientIdentifier) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientIdentifier); //Add ConsumerInterceptor like this properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor"); return new KafkaConsumer<Integer, String>(properties); }
Steps to enable producer interceptor
Add the
interceptor.classes
property to producer
configuration that gets passed to the KafkaProducer constructor, as
follows:KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//Add ProducerInterceptor like this
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");
return new KafkaProducer<Integer, String>(properties);
}
Steps to enable interceptors in KafkaStreams applications
Add the
producer.interceptor.classes
and
consumer.interceptor.classes
properties to Kafka Streams
configurations, as
follows:void startKafkaStreams(StreamsBuilder builder) {
KafkaStreams kstreams = new KafkaStreams(builder.build(), getKafkaStreamsConfiguration());
kstreams.start();
}
Properties getKafkaStreamsConfiguration() {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
config.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
//Add producer interceptor like this
config.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");
//Add consumer interceptor like this
config.put(
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");
return config;
}