Apache Storm Component Guide
Also available as:
PDF
loading table of contents...

Trident Operations

The Trident Stream class provides a number of methods that modify the content of a stream. The Stream.each() method is overloaded to allow the application of two types of operations: filters and functions.

For a complete list of methods in the Stream class, see the Trident JavaDoc.

Filters

Trident filters provide a way to exclude tuples from a Stream based on specific criteria. Implementing a Trident filter involves extending BaseFilter and implementing the isKeep() method of the Filter interface:

boolean isKeep(TridentTuple tuple);

The isKeep() method takes a TridentTuple as input and returns a boolean. If isKeep() returns false, the tuple is dropped from the stream; otherwise the tuple is kept.

For example, to exclude words with fewer than three characters from the word count, you could apply the following filter implementation to the stream:

public class ShortWordFilter extends BaseFilter {

          public boolean isKeep(TridentTuple tuple) {
              String word = tuple.getString(0);
              return word.length() > 3;
          }
      }

Functions

Trident functions are similar to Storm bolts, in that they consume individual tuples and optionally emit new tuples. An important difference is that tuples emitted by Trident functions are additive. Fields emitted by Trident functions are added to the tuple and existing fields are retained. The Split function in the word count example illustrates a function that emits additional tuples:

  public class Split extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
      String sentence = tuple.getString(0);
      for (String word : sentence.split(" ")) {
        collector.emit(new Values(word));
      }
    }
  }

Note that the Split function always processes the first (index 0) field in the tuple. It guarantees this because of the way that the function was applied using the Stream.each() method:

stream.each(new Fields("sentence"), new Split(), new Fields("word"))

The first argument to the each() method can be thought of as a field selector. Specifying “sentence” tells Trident to select only that field for processing, thus guaranteeing that the “sentence” field will be at index 0 in the tuple.

Similarly, the third argument names the fields emitted by the function. This behavior allows both filters and functions to be implemented in a more generic way, without depending on specific field naming conventions.