Query live data from Kafka
You can get useful information from a table of Kafka data by running typical queries, such as counting the number of records streamed within an interval of time and defining a view of streamed data over a period of time.
-
List the table properties and all the partition or offset information for the topic.
DESCRIBE EXTENDED kafka_table;
-
Count the number of Kafka records having timestamps within the last 10
minutes.
SELECT COUNT(*) FROM kafka_table WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
Such a time-based seek requires Kafka 0.11 or later, which has a Kafka broker that supports time-based lookups; otherwise, this query leads to a full stream scan. -
Define a view of data consumed within the last 15 minutes and mask specific columns.
CREATE VIEW last_15_minutes_of_kafka_table AS SELECT `timestamp`, `user`, delta, ADDED FROM kafka_table WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES) ;
-
Create a dimension table.
CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
-
Join the view of the stream over the last 15 minutes to the user_table, group by gender, and compute aggregates over metrics from fact table and dimension tables.
SELECT SUM(added) AS added, SUM(deleted) AS deleted, AVG(delta) AS delta, AVG(age) AS avg_age , gender FROM last_15_minutes_of_kafka_table JOIN user_table ON `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user` GROUP BY gender LIMIT 10;
-
Perform a classical user retention analysis over the Kafka stream that does a
stream-to-stream join to run adhoc queries on a view defined over the past 15
minutes.
-- Stream join over the view itself -- Assuming l15min_wiki is a view of the last 15 minutes SELECT COUNT( DISTINCT activity.`user`) AS active_users, COUNT(DISTINCT future_activity.`user`) AS retained_users FROM l15min_wiki AS activity LEFT JOIN l15min_wiki AS future_activity ON activity.`user` = future_activity.`user` AND activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ; -- Stream-to-stream join -- Assuming wiki_kafka_hive is the entire stream. SELECT floor_hour(activity.`timestamp`), COUNT( DISTINCT activity.`user`) AS active_users, COUNT(DISTINCT future_activity.`user`) as retained_users FROM wiki_kafka_hive AS activity LEFT JOIN wiki_kafka_hive AS future_activity ON activity.`user` = future_activity.`user` AND activity.`timestamp` = future_activity.`timestamp` - interval '1' hour GROUP BY floor_hour(activity.`timestamp`);