Chapter 4. Streaming Data to Hive
Both the core-storm and Trident APIs support streaming data directly to Apache Hive using Hive transactions. Data committed in a transaction is immediately available to Hive queries from other Hive clients. Storm developers stream data to existing table partitions or configure the streaming Hive bolt to dynamically create desired table partitions. Use the following steps to perform this procedure:
Instantiate an implementation of the
HiveMapper
Interface.Instantiate a
HiveOptions
class with theHiveMapper
implementation.Instantiate a
HiveBolt
with theHiveOptions
class.
Note | |
---|---|
Currently, data may be streamed only into bucketed tables using the ORC file format. |
Core-storm API
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper); HiveBolt hiveBolt = new HiveBolt(hiveOptions);
Trident API
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withTimeAsPartitionField("YYYY/MM/DD"); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10) StateFactory factory = new HiveStateFactory().withOptions(hiveOptions); TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
The rest of this topic describes these steps in greater detail.
Instantiate an Implementation of HiveMapper Interface
The storm-hive streaming bolt uses the HiveMapper
interface to map the names of tuple fields to the names of Hive table columns. Storm provides two implementations: DelimitedRecordHiveMapper
and JsonRecordHiveMapper
. Both implementations take the same arguments.
Table 4.1. HiveMapper Arguments
Argument | Data Type | Description |
---|---|---|
|
| The name of the tuple fields that you want to map to table column names. |
|
| The name of the tuple fields that you want to map to table partitions. |
|
| Requests that table partitions be created with names set to system time. Developers can specify any Java-supported date format, such as "YYYY/MM/DD". |
The following sample code illustrates how to use DelimitedRecordHiveMapper
:
... DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withPartitionFields(new Fields(partNames)); DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withTimeAsPartitionField("YYYY/MM/DD"); ...
Instantiate a HiveOptions Class with the HiveMapper Implementation
Use the HiveOptions
class to configure the transactions used by Hive to ingest the streaming data, as illustrated in the following code sample.
... HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10); ...
The following table describes all configuration properties for the HiveOptions
class.
Table 4.2. HiveOptions Class Configuration Properties
HiveOptions Configuration Property | Data Type | Description |
---|---|---|
|
| Hive Metastore URI. Storm developers can find this value in |
|
| Database name |
|
| Table name |
|
| Two properties that start with "org.apache.storm.hive.bolt.":
|
|
| Configures the number of desired transactions per transaction batch. Data from all transactions in a single batch form a single compaction file.
Storm developers use this property in conjunction with the Hive stores data in base files that cannot be updated by HDFS. Instead, Hive creates a set of delta files for each transaction that alters a table or partition and stores them in a separate delta directory. Occasionally, Hive compacts, or merges, the base and delta files. Hive performs all compactions in the background without affecting concurrent reads and writes of other Hive clients. See Transactions for more information about Hive compactions. |
|
| Specifies the maximum number of open connections. Each connection is to a single Hive table paritition. The default value is 500. When Hive reaches this threshold, an idle connection is terminated for each new connection request. A connection is considered idle if no data is written to the table partition to which the connection is made. |
|
| Specifies the maximum number of Storm tuples written to Hive in a single Hive transaction. The default value is 15000 tuples. |
|
| Specifies the interval in seconds between consecutive heartbeats sent to Hive. Hive uses heartbeats to prevent expiration of unused transactions. Set this value to 0 to disable heartbeats. The default value is 240. |
|
| Indicates whether HiveBolt should automatically create the necessary Hive partitions needed to store streaming data. The default value is true. |
|
| Kerberos user principal for accessing a secured Hive installation. |
|
| Kerberos keytab for accessing a secured Hive installation. |
Instantiate the HiveBolt with the HiveOptions class
The next step is to instantiate the Hive streaming bolt. The core-storm and Trident APIs use different classes, as demonstrated in the following code samples:
Core-storm API
... HiveBolt hiveBolt = new HiveBolt(hiveOptions); ...
Trident API
... StateFactory factory = new HiveStateFactory().withOptions(hiveOptions); TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields()); ...