Integrating Hive and Kafka
Also available as:
PDF

Query live data from Kafka

You can get useful information from a table of Kafka data by running typical queries, such as counting the number of records streamed within an interval of time and defining a view of streamed data over a period of time.

This task builds on the previous task in which you created a table named kafka_table for a Kafka stream.
  1. List the table properties and all the partition or offset information for the topic.
    DESCRIBE EXTENDED kafka_table;
  2. Count the number of Kafka records having timestamps within the last 10 minutes.
    SELECT COUNT(*) FROM kafka_table 
      WHERE `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);  
    Such a time-based seek requires Kafka 0.11 or later, which has a Kafka broker that supports time-based lookups; otherwise, this query leads to a full stream scan.
  3. Define a view of data consumed within the last 15 minutes and mask specific columns.
    CREATE VIEW last_15_minutes_of_kafka_table AS SELECT  `timestamp`, `user`, delta, 
      ADDED FROM kafka_table 
      WHERE `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES) ; 
  4. Create a dimension table.
    CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
  5. Join the view of the stream over the last 15 minutes to the user_table, group by gender, and compute aggregates over metrics from fact table and dimension tables.
    SELECT SUM(added) AS added, SUM(deleted) AS deleted, AVG(delta) AS delta, AVG(age) AS avg_age , gender 
      FROM last_15_minutes_of_kafka_table  
      JOIN user_table ON `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
      GROUP BY gender LIMIT 10;
  6. Perform a classical user retention analysis over the Kafka stream that does a stream-to-stream join to run adhoc queries on a view defined over the past 15 minutes.
    -- Stream join over the view itself
    -- Assuming l15min_wiki is a view of the last 15 minutes
    SELECT  COUNT( DISTINCT activity.`user`) AS active_users, 
    COUNT(DISTINCT future_activity.`user`) AS retained_users
    FROM l15min_wiki AS activity
    LEFT JOIN l15min_wiki AS future_activity ON activity.`user` = future_activity.`user`
    AND activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ; 
                        
    --  Stream-to-stream join
    -- Assuming wiki_kafka_hive is the entire stream.
    SELECT floor_hour(activity.`timestamp`), COUNT( DISTINCT activity.`user`) AS active_users, 
    COUNT(DISTINCT future_activity.`user`) as retained_users
    FROM wiki_kafka_hive AS activity
    LEFT JOIN wiki_kafka_hive AS future_activity ON activity.`user` = future_activity.`user`
    AND activity.`timestamp` = future_activity.`timestamp` - interval '1' hour 
    GROUP BY floor_hour(activity.`timestamp`);