You can change streaming data and include the changes in a stream. You extract a Kafka input topic, transform the record in Hive, and load a Hive table back into a Kafka record.
This task assumes that you already queried live data from Kafka. When you
transform the record in the Hive execution engine, you compute a moving average over
a window of one minute. The resulting record that you write back to another Kafka
topic is named moving_avg_wiki_kafka_hive.
.
-
Create an external table to represent the Hive data that you want to load into Kafka.
CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive
(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "moving_avg_wiki_kafka_hive",
"kafka.bootstrap.servers"="kafka.hostname.com:9092",
-- STORE AS AVRO IN KAFKA
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
-
Insert data that you select from the Kafka topic back into the Kafka record.
INSERT INTO TABLE moving_avg_wiki_kafka_hive
SELECT `channel`, `namespace`, `page`, `timestamp`,
AVG(delta) OVER (ORDER BY `timestamp` ASC ROWS BETWEEN 60 PRECEDING AND CURRENT ROW) AS avg_delta,
null AS `__key`, null AS `__partition`, -1 AS `__offset`, to_epoch_milli
(CURRENT_TIMESTAMP) AS `__timestamp`
FROM l15min_wiki;
The timestamps of the selected data are converted to milliseconds since epoch for clarity.