Kafka storage handler and table properties
You use the Kafka storage handler and table properties to specify the query connection and configuration.
Kafka storage handler
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
in queries
to connect to, and transform a Kafka topic into, a Hive table. In the definition of an
external table, the storage handler creates a view over a single Kafka topic. For example,
to use the storage handler to connect to a topic, the following table definition specifies
the storage handler and required table properties: the topic name and broker connection
string.CREATE EXTERNAL TABLE kafka_table
(`timestamp` timestamp , `page` string, `newPage` boolean,
added int, deleted bigint, delta double)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
You set the following table properties forwith the Kafka storage handler:- kafka.topic
- The Kafka topic to connect to
- kafka.bootstrap.servers
- The broker connection string
Storage handler-based optimizations
The storage handler can optimize reads using a filter push-down when you run a query such as the following time-based lookup supported on Kafka 0.11 or later:
SELECT COUNT(*) FROM kafka_table
WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES) ;
The Kafka consumer supports seeking on the stream based on an offset, which the storage handler leverages to push down filters over metadata columns. The storage handler in the example above performs seeks based on the Kafka record __timestamp to read only recently arrived data.
The following logical operators and predicate operators are supported in the WHERE clause:
Logical operators: OR
, AND
Predicate operators: <
, <=
, >=
,
>
, =
The storage handler reader optimizes seeks by performing partition pruning to go directly to a particular partition offset used in the WHERE clause:
SELECT COUNT(*) FROM kafka_table
WHERE (`__offset` < 10 AND `__offset` > 3 AND `__partition` = 0)
OR (`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99)
OR (`__offset` = 109);
The storage handler scans partition 0 only, and then read only records between offset 4 and 109.
Kafka metadata
In addition to the user-defined payload schema, the Kafka storage handler appends to the table some additional columns, which you can use to query the Kafka metadata fields:
__key
- Kafka record key (byte array)
- __partition
- Kafka record partition identifier (int 32)
- __offset
- Kafka record offset (int 64)
- __timestamp
- Kafka record timestamp (int 64)
The partition identifier, record offset, and record timestamp plus a key-value pair constitute a Kafka record. Because the key-value is a 2-byte array, you must use SerDe classes to transform the array into a set of columns.
Table Properties
Property | Description | Required | Default |
---|---|---|---|
kafka.topic | Kafka topic name to map the table to | Yes | null |
kafka.bootstrap.servers | Table property indicating the Kafka broker connection string | Yes | null |
kafka.serde.class | Serializer and Deserializer class implementation | No | org.apache.hadoop.hive.serde2.JsonSerDe |
hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in milliseconds. (This is independent of internal Kafka consumer timeouts.) | No | 5000 (5 Seconds) |
hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations | No | 6 |
hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata | No | 30000 (30 Seconds) |
kafka.write.semantic | Writer semantic with allowed values of NONE, AT_LEAST_ONCE, EXACTLY_ONCE | No | AT_LEAST_ONCE |