Perform ETL by ingesting data from Kafka into Hive
You can extract, transform, and load a Kafka record into Hive in a single transaction.
- 
                Create a table to represent source Kafka record offsets.CREATE TABLE kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);
- 
                Initialize the table.
                INSERT OVERWRITE TABLE kafka_table_offsets SELECT `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP FROM wiki_kafka_hive GROUP BY `__partition`, CURRENT_TIMESTAMP;
- 
                Create the destination table.
                CREATE TABLE orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint, `timestamp` timestamp , `page` string, `user` string, `diffurl` string, `isrobot` boolean, added int, deleted int, delta bigint ) STORED AS ORC;
- 
                Insert Kafka data into the ORC table.
                FROM wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table ON (ktable.`__partition` = offset_table.partition_id AND ktable.`__offset` > offset_table.max_offset ) INSERT INTO TABLE orc_kafka_table SELECT `__partition`, `__offset`, `__timestamp`, `timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta INSERT OVERWRITE TABLE kafka_table_offsets SELECT `__partition`, max(`__offset`), CURRENT_TIMESTAMP GROUP BY `__partition`, CURRENT_TIMESTAMP;
- 
                Check the insertion.
                SELECT MAX(`koffset`) FROM orc_kafka_table LIMIT 10; SELECT COUNT(*) AS c FROM orc_kafka_table GROUP BY partition_id, koffset HAVING c > 1;
- Repeat step 4 periodically until all the data is loaded into Hive.
