7.1. Writing Data to HDFS with the Storm-HDFS Connector

The storm-hdfs connector supports the following key features:

  • HDFS 2.x

  • High availability-enabled clusters

  • Text and sequence files

  • Configurable directory and file names

  • Customizable synchronization, rotation policies, and rotation actions

  • Tuple failure if HDFS update fails

  • The Trident API

  • Writing to a Kerberized Hadoop cluster (for more information, see Configuring Connectors for a Secure Cluster)

The primary classes of the storm-hdfs connector are HdfsBolt and SequenceFileBolt, both located in the org.apache.storm.hdfs.bolt package. Use the HdfsBolt class to write text data to HDFS and the SequenceFileBolt class to write binary data.

For more information about the HdfsBolt class, refer to the Apache Storm HdfsBolt API documentation.

Storm developers specify the following information when instantiating the bolt:

 

Table 1.14. HdfsBolt Methods

HdfsBolt Method

Description

withFsUrl

Specifies the target HDFS URL and port number.

withRecordFormat

Specifies the delimiter that indicates a boundary between data records. Storm developers can customize by writing their own implementation of the org.apache.storm.hdfs.format.RecordFormat interface. Use the provided org.apache.storm.hdfs.format. DelimitedRecordFormat class as a convenience class for writing delimited text data with delimiters such as tabs, comma-separated values, and pipes. The storm-hdfs bolt uses the RecordFormat implementation to convert tuples to byte arrays, so this method can be used with both text and binary data.

withRotationPolicy

Specifies when to stop writing to a data file and begin writing to another. Storm developers can customize by writing their own implementation of the org.apache.storm.hdfs.rotation. FileSizeRotationSizePolicy interface.

withSyncPolicy

Specifies how frequently to flush buffered data to the HDFS filesystem. This action enables other Hive clients to read the synchronized data, even as the Storm client continues to write data. Storm developers can customize by writing their own implementation of the org.apache.storm.hdfs.sync.SyncPolicy interface.

withFileNameFormat

Specifies the name of the data file. Storm developers can customize by writing their own interface of the org.apache.storm.hdfs.format.FileNameFormat interface. The provided org.apache.storm.hdfs.format. DefaultFileNameFormat creates file names with the following naming format: {prefix}-{componentId}-{taskId}-{rotationNum}-{timestamp}-{extension}.

Example: MyBolt-5-7-1390579837830.txt.


Example: Cluster Without High Availability ("HA")

The following example writes pipe-delimited files to the HDFS path hdfs://localhost:8020/foo. After every 1,000 tuples it will synchronize with the filesystem, making the data visible to other HDFS clients. It will rotate the files when they reach 5 MB in size.

Note that the HdfsBolt is instantiated with an HDFS URL and port number.

```java
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// Synchronize the filesystem after every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// Rotate data files when they reach 5 MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");


// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:8020")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);
```      

Example: HA-Enabled Cluster

The following example shows how to modify the previous example for an HA-enabled cluster.

Here the HdfsBolt is instantiated with a nameservice ID, instead of using an HDFS URL and port number.

...
HdfsBolt bolt = new HdfsBolt()
           .withFsURL("hdfs://myNameserviceID")
           .withFileNameFormat(fileNameformat)
           .withRecordFormat(format)
           .withRotationPolicy(rotationPolicy)
           .withSyncPolicy(syncPolicy);
...

To obtain the nameservice ID, check the dfs.nameservices property in your hdfs-site.xml file; nnha in the following example:

<property>
  <name>dfs.nameservices</name>
  <value>nnha</value> 
</property>

Trident API

The storm-hdfs connector supports the Trident API. Hortonworks recommends that Storm developers use the trident API unless your application requires sub-second latency.

The Trident API implements a StateFactory class with an API that resembles the methods from the storm-code API as shown in the following code sample:

...
Fields hdfsFields = new Fields("field1", "field2");
 
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
 .withPrefix("trident")
 .withExtension(".txt")
 .withPath("/trident");
 
RecordFormat recordFormat = new DelimitedRecordFormat()
 .withFields(hdfsFields);
 
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
 
HdfsState.Options options = new HdfsState.HdfsFileOptions()
 .withFileNameFormat(fileNameFormat)
 .withRecordFormat(recordFormat)
 .withRotationPolicy(rotationPolicy)
 .withFsUrl("hdfs://localhost:8020");
 
StateFactory factory = new HdfsStateFactory().withOptions(options);
 
TridentState state = stream.partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());

See the javadoc for the Trident API, included with the storm-hdfs connector, for more information.

Limitations

Directory and file name changes are limited to a prepackaged file name format based on a timestamp.