Creating and configuring the HBaseSinkFunction
You must configure the HBaseSinkFunction with Table names to have HBase as a sink. The HBase table needs to be created before the streaming job is submitted. You should also configure the operation buffering parameters to make sure that every data coming from Flink is buffered into HBase.
The HBase sink instance is always created as a subclass of the
HBaseSinkFunction
. When users create the subclass they have to provide
required and optional parameters through the constructor of the superclass, the
HBaseSinkFunction
itself.
Required parameters:
- Table name (the table itself must be created before the streaming job starts)
Optional parameters:
- Hadoop Configuration object for setting up the HBase client
HBaseOptions
for minimal connection configuration
The optional parameters are configured automatically by the Cloudera platform and should only be used for setting up custom HBase connections.
To configure the operation buffering parameters, you need to use the
HBaseSinkFunction.setWriteOptions()
method. You can set the following
configuration parameters using the HBaseWriteOptions
object:setBufferFlushMaxSizeInBytes
: Maximum byte size of the buffered operations before flushingsetBufferFlushMaxRows
: Maximum number of operations buffered before flushingsetBufferFlushIntervalMillis
: Maximum time interval before flushing
See the following example for setting up an HBase sink running on the Cloudera
platform:
// Define a new HBase sink for writing to the ITEM_QUERIES table
HBaseSinkFunction<QueryResult> hbaseSink = new HBaseSinkFunction<QueryResult>("ITEM_QUERIES") {
@Override
public void executeMutations(QueryResult qresult, Context context, BufferedMutator mutator) throws Exception {
// For each incoming query result we create a Put operation
Put put = new Put(Bytes.toBytes(qresult.queryId));
put.addColumn(Bytes.toBytes("itemId"), Bytes.toBytes("str"), Bytes.toBytes(qresult.itemInfo.itemId));
put.addColumn(Bytes.toBytes("quantity"), Bytes.toBytes("int"), Bytes.toBytes(qresult.itemInfo.quantity));
mutator.mutate(put);
}
};
// Configure our sink to not buffer operations for more than a second (to reduce end-to-end latency)
hbaseSink.setWriteOptions(HBaseWriteOptions.builder()
.setBufferFlushIntervalMillis(1000)
.build()
);
// Add the sink to our query result streamqueryResultStream.addSink(hbaseSink);