Writing Data to HDFS with the Storm-HDFS Connector
The storm-hdfs
connector supports the following key features:
HDFS 2.x
High availability-enabled clusters
Text and sequence files
Configurable directory and file names
Customizable synchronization, rotation policies, and rotation actions
Tuple failure if HDFS update fails
The Trident API
Writing to a Kerberized Hadoop cluster (for more information, see Configuring Connectors for a Secure Cluster)
The primary classes of the storm-hdfs
connector are
HdfsBolt
and SequenceFileBolt
, both located in the
org.apache.storm.hdfs.bolt
package. Use the HdfsBolt
class to write text data to HDFS and the SequenceFileBolt
class to write
binary data.
Storm developers specify the following information when instantiating the bolt:
Table 1.14. HdfsBolt Methods
HdfsBolt Method | Description |
---|---|
| Specifies the target HDFS URL and port number. |
| Specifies the delimiter that indicates a boundary between data records. Storm developers can
customize by writing their own implementation of the
|
| Specifies when to stop writing to a data file and begin writing to another. Storm developers
can customize by writing their own implementation of the |
| Specifies how frequently to flush buffered data to the HDFS filesystem. This action enables
other HDFS clients to read the synchronized data, even as the
Storm client continues to write data. Storm developers can
customize by writing their own implementation of the
|
| Specifies the name of the data file. Storm developers can customize by writing their own
interface of the Example: |
Example: Cluster Without High Availability ("HA")
The following example writes pipe-delimited files to the HDFS path
hdfs://localhost:8020/foo
. After every 1,000 tuples it will synchronize with
the filesystem, making the data visible to other HDFS clients. It will rotate the files when
they reach 5 MB in size.
Note that the HdfsBolt is instantiated with an HDFS URL and port number.
```java // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // Synchronize the filesystem after every 1000 tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // Rotate data files when they reach 5 MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Use default, Storm-generated file names FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); // Instantiate the HdfsBolt HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); ```
Example: HA-Enabled Cluster
The following example shows how to modify the previous example for an HA-enabled cluster.
Here the HdfsBolt is instantiated with a nameservice ID, instead of using an HDFS URL and port number.
... HdfsBolt bolt = new HdfsBolt() .withFsURL("hdfs://myNameserviceID") .withFileNameFormat(fileNameformat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); ...
To obtain the nameservice ID, check the dfs.nameservices
property in your
hdfs-site.xml
file; nnha
in the following example:
<property> <name>dfs.nameservices</name> <value>nnha</value> </property>
Trident API
The storm-hdfs
connector supports the Trident API. Hortonworks
recommends that Storm developers use the trident
API unless your
application requires sub-second latency.
The Trident API implements a StateFactory
class with an API that resembles the
methods from the storm-code
API as shown in the following code sample:
... Fields hdfsFields = new Fields("field1", "field2"); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPrefix("trident") .withExtension(".txt") .withPath("/trident"); RecordFormat recordFormat = new DelimitedRecordFormat() .withFields(hdfsFields); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options = new HdfsState.HdfsFileOptions() .withFileNameFormat(fileNameFormat) .withRecordFormat(recordFormat) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:8020"); StateFactory factory = new HdfsStateFactory().withOptions(options); TridentState state = stream.partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
See the javadoc for the Trident API, included with the storm-hdfs
connector, for more information.
Limitations
Directory and file names changes are limited to a prepackaged file name format based on a timestamp.