Perform ETL by ingesting data from Kafka into Hive

You can extract, transform, and load a Kafka record into Hive in a single transaction.

  1. Create a table to represent source Kafka record offsets.
    CREATE TABLE kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);                
  2. Initialize the table.
    INSERT OVERWRITE TABLE kafka_table_offsets 
    SELECT `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP 
    FROM wiki_kafka_hive 
    GROUP BY `__partition`, CURRENT_TIMESTAMP;
  3. Create the destination table.
    CREATE TABLE orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint,
      `timestamp` timestamp , `page` string, `user` string, `diffurl` string, 
      `isrobot` boolean, added int, deleted int, delta bigint
    ) STORED AS ORC;
  4. Insert Kafka data into the ORC table.
    FROM wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
    ON (ktable.`__partition` = offset_table.partition_id 
    AND ktable.`__offset` > offset_table.max_offset )
    INSERT INTO TABLE orc_kafka_table 
    SELECT `__partition`, `__offset`, `__timestamp`,
      `timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
    INSERT OVERWRITE TABLE kafka_table_offsets 
    SELECT `__partition`, max(`__offset`), CURRENT_TIMESTAMP 
    GROUP BY `__partition`, CURRENT_TIMESTAMP;
  5. Check the insertion.
    SELECT MAX(`koffset`) FROM orc_kafka_table LIMIT 10;
    
    SELECT COUNT(*) AS c FROM orc_kafka_table 
    GROUP BY partition_id, koffset HAVING c > 1;
  6. Repeat step 4 periodically until all the data is loaded into Hive.