Core-Storm APIs
The following example constructs a Kafka bolt using core Storm APIs:
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper); HiveBolt hiveBolt = new HiveBolt(hiveOptions);
- Instantiate an Implementation of HiveMapper Interface.
The
storm-hive
streaming bolt uses theHiveMapper
interface to map the names of tuple fields to the names of Hive table columns. Storm provides two implementations:DelimitedRecordHiveMapper
andJsonRecordHiveMapper
. Both implementations take the same arguments.Table 1. HiveMapper Arguments Argument
Data Type
Description
withColumnFields
org.apache.storm.tuple.Fields
The name of the tuple fields that you want to map to table column names.
withPartitionFields
org.apache.storm.tuple.Fields
The name of the tuple fields that you want to map to table partitions.
withTimeAsPartitionField
String
Requests that table partitions be created with names set to system time. Developers can specify any Java-supported date format, such as "YYYY/MM/DD".
The following sample code illustrates how to use
DelimitedRecordHiveMapper
:... DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withPartitionFields(new Fields(partNames)); DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withTimeAsPartitionField("YYYY/MM/DD"); ...
- Instantiate a
HiveOptions
Class with theHiveMapper
Implementation. TheHiveOptions
class configures transactions used by Hive to ingest the streaming data:... HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10); ...
The following table describes all configuration properties for the
HiveOptions
class.Table 2. HiveOptions Class Configuration Properties HiveOptions Configuration Property
Data Type
Description
metaStoreURI
String
Hive Metastore URI. Storm developers can find this value in
hive-site.xml
.dbName
String
Database name
tblName
String
Table name
mapper
Mapper
Two properties that start with
"org.apache.storm.hive.bolt.":
mapper.DelimitedRecordHiveMapper
mapperJsonRecordHiveMapper
withTxnsPerBatch
Integer
Configures the number of desired transactions per transaction batch. Data from all transactions in a single batch form a single compaction file. Storm developers use this property in conjunction with the
withBatchSize
property to control the size of compaction files. The default value is 100.Hive stores data in base files that cannot be updated by HDFS. Instead, Hive creates a set of delta files for each transaction that alters a table or partition and stores them in a separate delta directory. Occasionally, Hive compacts, or merges, the base and delta files. Hive performs all compactions in the background without affecting concurrent reads and writes of other Hive clients.
withMaxOpenConnections
Integer
Specifies the maximum number of open connections. Each connection is to a single Hive table partition. The default value is 500. When Hive reaches this threshold, an idle connection is terminated for each new connection request. A connection is considered idle if no data is written to the table partition to which the connection is made.
withBatchSize
Integer
Specifies the maximum number of Storm tuples written to Hive in a single Hive transaction. The default value is 15000 tuples.
withCallTimeout
Integer
Specifies the interval in seconds between consecutive heartbeats sent to Hive. Hive uses heartbeats to prevent expiration of unused transactions. Set this value to 0 to disable heartbeats. The default value is 240.
withAutoCreatePartitions
Boolean
Indicates whether HiveBolt should automatically create the necessary Hive partitions needed to store streaming data. The default value is true.
withKerberosPrinicipal
String
Kerberos user principal for accessing a secured Hive installation.
withKerberosKeytab
String
Kerberos keytab for accessing a secured Hive installation.
- Instantiate the HiveBolt with the HiveOptions
class:
... HiveBolt hiveBolt = new HiveBolt(hiveOptions); ...
- Before building your topology code, add the following dependency to your topology
pom.xml
file:
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.3</version> </dependency>