You can get useful information from a table of Kafka data by running typical queries,
such as counting the number of records streamed within an interval of time or defining a
view of streamed data over a period of time.
This task requires Kafka 0.11 or later to support time-based lookups and prevent full stream scans.
This task assumes you created a table named kafka_table for a Kafka
stream.
-
List the table properties and all the partition or offset information for the topic.
DESCRIBE EXTENDED kafka_table;
-
Count the number of Kafka records that have timestamps within the past 10
minutes.
SELECT COUNT(*) FROM kafka_table
WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
Such a time-based seek requires Kafka 0.11 or later, which has a Kafka broker
that supports time-based lookups; otherwise, this query leads to a full stream
scan.
-
Define a view of data consumed within the past 15 minutes and mask specific columns.
CREATE VIEW last_15_minutes_of_kafka_table AS SELECT `timestamp`, `user`, delta,
ADDED FROM kafka_table
WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES) ;
-
Create a dimension table.
CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
-
Join the view of the stream over the past 15 minutes to
user_table
, group by gender, and compute aggregates over
metrics from fact table and dimension tables.
SELECT SUM(added) AS added, SUM(deleted) AS deleted, AVG(delta) AS delta, AVG(age) AS avg_age , gender
FROM last_15_minutes_of_kafka_table
JOIN user_table ON `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
GROUP BY gender LIMIT 10;
-
Perform a classical user retention analysis over the Kafka stream consisting of
a stream-to-stream join that runs adhoc queries on a view defined over the past
15 minutes.
-- Stream join over the view itself
-- Assuming l15min_wiki is a view of the last 15 minutes
SELECT COUNT( DISTINCT activity.`user`) AS active_users,
COUNT(DISTINCT future_activity.`user`) AS retained_users
FROM l15min_wiki AS activity
LEFT JOIN l15min_wiki AS future_activity ON activity.`user` = future_activity.`user`
AND activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ;
-- Stream-to-stream join
-- Assuming wiki_kafka_hive is the entire stream.
SELECT floor_hour(activity.`timestamp`), COUNT( DISTINCT activity.`user`) AS active_users,
COUNT(DISTINCT future_activity.`user`) as retained_users
FROM wiki_kafka_hive AS activity
LEFT JOIN wiki_kafka_hive AS future_activity ON activity.`user` = future_activity.`user`
AND activity.`timestamp` = future_activity.`timestamp` - interval '1' hour
GROUP BY floor_hour(activity.`timestamp`);