Writing transformed Hive data to Kafka

You can change streaming data and include the changes in a stream. You extract a Kafka input topic, transform the record in Hive, and load a Hive table back into a Kafka record.

This task assumes that you already queried live data from Kafka. When you transform the record in the Hive execution engine, you compute a moving average over a window of one minute. The resulting record that you write back to another Kafka topic is named moving_avg_wiki_kafka_hive.

.
  1. Create an external table to represent the Hive data that you want to load into Kafka.
    CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive 
    (`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
    STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
    TBLPROPERTIES
      ("kafka.topic" = "moving_avg_wiki_kafka_hive",
      "kafka.bootstrap.servers"="kafka.hostname.com:9092",
      -- STORE AS AVRO IN KAFKA
      "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
  2. Insert data that you select from the Kafka topic back into the Kafka record.
    INSERT INTO TABLE moving_avg_wiki_kafka_hive 
    SELECT `channel`, `namespace`, `page`, `timestamp`, 
      AVG(delta) OVER (ORDER BY `timestamp` ASC ROWS BETWEEN  60 PRECEDING AND CURRENT ROW) AS avg_delta, 
      null AS `__key`, null AS `__partition`, -1 AS `__offset`, to_epoch_milli
    (CURRENT_TIMESTAMP) AS `__timestamp`
    FROM l15min_wiki;              
    The timestamps of the selected data are converted to milliseconds since epoch for clarity.