Query live data from Kafka
You can get useful information from a table of Kafka data by running typical queries, such as counting the number of records streamed within an interval of time or defining a view of streamed data over a period of time.
You can get useful information from a table of Kafka data by running typical queries, such as counting the number of records streamed within an interval of time or defining a view of streamed data over a period of time.
DESCRIBE EXTENDED kafka_table;
SELECT COUNT(*) FROM kafka_table WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
CREATE VIEW last_15_minutes_of_kafka_table AS SELECT `timestamp`, `user`, delta, ADDED FROM kafka_table WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES) ;
CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
user_table
, group by gender, and compute aggregates over
metrics from fact table and dimension tables.
SELECT SUM(added) AS added, SUM(deleted) AS deleted, AVG(delta) AS delta, AVG(age) AS avg_age , gender FROM last_15_minutes_of_kafka_table JOIN user_table ON `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user` GROUP BY gender LIMIT 10;
-- Stream join over the view itself -- Assuming l15min_wiki is a view of the last 15 minutes SELECT COUNT( DISTINCT activity.`user`) AS active_users, COUNT(DISTINCT future_activity.`user`) AS retained_users FROM l15min_wiki AS activity LEFT JOIN l15min_wiki AS future_activity ON activity.`user` = future_activity.`user` AND activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ; -- Stream-to-stream join -- Assuming wiki_kafka_hive is the entire stream. SELECT floor_hour(activity.`timestamp`), COUNT( DISTINCT activity.`user`) AS active_users, COUNT(DISTINCT future_activity.`user`) as retained_users FROM wiki_kafka_hive AS activity LEFT JOIN wiki_kafka_hive AS future_activity ON activity.`user` = future_activity.`user` AND activity.`timestamp` = future_activity.`timestamp` - interval '1' hour GROUP BY floor_hour(activity.`timestamp`);