Perform ETL by ingesting data from Kafka into Hive
You can extract, transform, and load a Kafka record into Hive in a single transaction.
-
Create a table to represent source Kafka record offsets.
CREATE TABLE kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);
-
Initialize the table.
INSERT OVERWRITE TABLE kafka_table_offsets SELECT `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP FROM wiki_kafka_hive GROUP BY `__partition`, CURRENT_TIMESTAMP;
-
Create the destination table.
CREATE TABLE orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint, `timestamp` timestamp , `page` string, `user` string, `diffurl` string, `isrobot` boolean, added int, deleted int, delta bigint ) STORED AS ORC;
-
Insert Kafka data into the ORC table.
FROM wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table ON (ktable.`__partition` = offset_table.partition_id AND ktable.`__offset` > offset_table.max_offset ) INSERT INTO TABLE orc_kafka_table SELECT `__partition`, `__offset`, `__timestamp`, `timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta INSERT OVERWRITE TABLE kafka_table_offsets SELECT `__partition`, max(`__offset`), CURRENT_TIMESTAMP GROUP BY `__partition`, CURRENT_TIMESTAMP;
-
Check the insertion.
SELECT MAX(`koffset`) FROM orc_kafka_table LIMIT 10; SELECT COUNT(*) AS c FROM orc_kafka_table GROUP BY partition_id, koffset HAVING c > 1;
- Repeat step 4 periodically until all the data is loaded into Hive.