Integrating Hive and Kafka
Also available as:
PDF

Kafka storage handler and table properties

You use the Kafka storage handler and table properties to specify the query connection and configuration.

Kafka storage handler

You specify 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' in queries to connect to, and transform a Kafka topic into, a Hive table. In the definition of an external table, the storage handler creates a view over a single Kafka topic. For example, to use the storage handler to connect to a topic, the following table definition specifies the storage handler and required table properties: the topic name and broker connection string.
CREATE EXTERNAL TABLE kafka_table
   (`timestamp` timestamp , `page` string,  `newPage` boolean, 
   added int, deleted bigint, delta double)
   STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
   TBLPROPERTIES
   ("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
You set the following table properties forwith the Kafka storage handler:
kafka.topic
The Kafka topic to connect to
kafka.bootstrap.servers
The broker connection string

Storage handler-based optimizations

The storage handler can optimize reads using a filter push-down when you execute a query such as the following time-based lookup supported on Kafka 0.11 or later:

SELECT COUNT(*) FROM kafka_table 
   WHERE `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES) ; 

The Kafka consumer supports seeking on the stream based on an offset, which the storage handler leverages to push down filters over metadata columns. The storage handler in the example above performs seeks based on the Kafka record __timestamp to read only recently arrived data.

The following logical operators and predicate operators are supported in the WHERE clause:

Logical operators: OR, AND

Predicate operators: <, <=, >=, >, =

The storage handler reader optimizes seeks by performing partition pruning to go directly to a particular partition offset used in the WHERE clause:

SELECT COUNT(*)  FROM kafka_table 
  WHERE (`__offset` < 10 AND `__offset` > 3 AND `__partition` = 0) 
  OR (`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99) 
  OR (`__offset` = 109);

The storage handler scans partition 0 only, and then read only records between offset 4 and 109.

Kafka metadata

In addition to the user-defined payload schema, the Kafka storage handler appends to the table some additional columns, which you can use to query the Kafka metadata fields:

__key
Kafka record key (byte array)
__partition
Kafka record partition identifier (int 32)
__offset
Kafka record offset (int 64)
__timestamp
Kafka record timestamp (int 64)

The partition identifier, record offset, and record timestamp plus a key-value pair constitute a Kafka record. Because the key-value is a 2-byte array, you must use SerDe classes to transform the array into a set of columns.

Table Properties

You use certain properties in the TBLPROPERTIES clause of a Hive query that specifies the Kafka storage handler.
Property Description Required Default
kafka.topic Kafka topic name to map the table to Yes null
kafka.bootstrap.servers Table property indicating the Kafka broker connection string Yes null
kafka.serde.class Serializer and Deserializer class implementation No org.apache.hadoop.hive.serde2.JsonSerDe
hive.kafka.poll.timeout.ms Parameter indicating Kafka Consumer poll timeout period in milliseconds. (This is independent of internal Kafka consumer timeouts.) No 5000 (5 Seconds)
hive.kafka.max.retries Number of retries for Kafka metadata fetch operations No 6
hive.kafka.metadata.poll.timeout.ms Number of milliseconds before consumer timeout on fetching Kafka metadata No 30000 (30 Seconds)
kafka.write.semantic Writer semantic with allowed values of NONE, AT_LEAST_ONCE, EXACTLY_ONCE No AT_LEAST_ONCE