You can change streaming data and include the changes in a stream. You extract a Kafka input topic, transform the record in Hive, and load a Hive table back into a Kafka record.
        This task assumes that you already queried live data from Kafka. When you
                transform the record in the Hive execution engine, you compute a moving average over
                a window of one minute. The resulting record that you write back to another Kafka
                topic is named moving_avg_wiki_kafka_hive.
.
        - 
                Create an external table to represent the Hive data that you want to load into Kafka.
                
CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive 
(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
  ("kafka.topic" = "moving_avg_wiki_kafka_hive",
  "kafka.bootstrap.servers"="kafka.hostname.com:9092",
  -- STORE AS AVRO IN KAFKA
  "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 
             - 
                Insert data that you select from the Kafka topic back into the Kafka record.
                
INSERT INTO TABLE moving_avg_wiki_kafka_hive 
SELECT `channel`, `namespace`, `page`, `timestamp`, 
  AVG(delta) OVER (ORDER BY `timestamp` ASC ROWS BETWEEN  60 PRECEDING AND CURRENT ROW) AS avg_delta, 
  null AS `__key`, null AS `__partition`, -1 AS `__offset`, to_epoch_milli
(CURRENT_TIMESTAMP) AS `__timestamp`
FROM l15min_wiki;              
The timestamps of the selected data are converted to milliseconds since epoch for clarity.