Kafka storage handler and table properties
You use the Kafka storage handler and table properties to specify the query connection and configuration.
Kafka storage handler
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
in queries
to connect to, and transform a Kafka topic into, a Hive table. In the definition of an
external table, the storage handler creates a view over a single Kafka topic. For example,
to use the storage handler to connect to a topic, the following table definition specifies
the storage handler and required table properties: the topic name and broker connection string.CREATE EXTERNAL TABLE kafka_table
(`timestamp` timestamp , `page` string, `newPage` boolean,
added int, deleted bigint, delta double)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
- kafka.topic
- The Kafka topic to connect to
- kafka.bootstrap.servers
- The broker connection string
Storage handler-based optimizations
The storage handler can optimize reads using a filter push-down when executing a query such as the following time-based lookup supported on Kafka 0.11 or later. The Kafka consumer supports seeking on the stream based on an offset, which the storage handler leverages to push down filters over metadata columns.
SELECT COUNT(*) FROM kafka_table
WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES) ;
The storage handler in this example performs seeks based on the Kafka record __timestamp to read only recently arrived data.
The following logical operators and predicate operators are supported in the WHERE clause:
Logical operators: OR
, AND
Predicate operators: <
, <=
, >=
,
>
, =
The storage handler reader optimizes seeks by performing partition pruning to go directly to a particular partition offset used in the WHERE clause.
SELECT COUNT(*) FROM kafka_table
WHERE (`__offset` < 10 AND `__offset` > 3 AND `__partition` = 0)
OR (`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99)
OR (`__offset` = 109);
The storage handler scans partition 0 only, and then read only records between offset 4 and 109.
Kafka metadata
In addition to the user-defined payload schema, the Kafka storage handler appends to the table the following additional columns, which you can use to query the Kafka metadata fields:
__key
- Kafka record key (byte array)
- __partition
- Kafka record partition identifier (int 32)
- __offset
- Kafka record offset (int 64)
- __timestamp
- Kafka record timestamp (int 64)
The partition identifier, record offset, and record timestamp plus a key-value pair constitute a Kafka record. Because the key-value is a 2-byte array, you need to use a Serde classes to transform the array into a set of columns.
Table Properties
Property | Description | Required | Default |
---|---|---|---|
kafka.topic | Kafka topic name to map the table to. | Yes | null |
kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
kafka.write.semantic | Writer semantic, allowed values (NONE, AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |