Create a table for a Kafka stream

You can create an external table in Apache Hive that represents an Apache Kafka stream to query real-time data in Kafka. You use a storage handler and table properties that map the Hive database to a Kafka topic and broker. If the Kafka data is not in JSON format, you alter the table to specify a serializer-deserializer for another format.

  1. Get the name of the Kafka topic you want to query to use as a table property.
    For example: "kafka.topic" = "wiki-hive-topic"
  2. Construct the Kafka broker connection string.
    For example: "kafka.bootstrap.servers"="kafka.hostname.com:9092"
  3. Create an external table named kafka_table by using 'org.apache.hadoop.hive.kafka.KafkaStorageHandler', as shown in the following example:
    CREATE EXTERNAL TABLE kafka_table
      (`timestamp` timestamp , `page` string,  `newPage` boolean, 
      added int, deleted bigint, delta double)
      STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
      TBLPROPERTIES
      ("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="node2:9092");
  4. If the default JSON serializer-deserializer is incompatible with your data, choose another format in one of the following ways:
    • Alter the table to use another supported serializer-deserializer. For example, if your data is in Avro format, use the Kafka serializer-deserializer for Avro:
      ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe"); 
    • Create an external table that specifies the table in another format. For example, create a table named that specifies the Avro format in the table definition:
      CREATE EXTERNAL TABLE kafka_t_avro
         (`timestamp` timestamp , `page` string,  `newPage` boolean, 
         added int, deleted bigint, delta double)
         STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
         TBLPROPERTIES
         ("kafka.topic" = "test-topic", 
         "kafka.bootstrap.servers"="node2:9092"
         -- STORE AS AVRO IN KAFKA
         "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");