Trident Operations
The Trident Stream class provides a number of methods that modify the content of a
stream. The Stream.each()
method is overloaded to allow the application
of two types of operations: filters and functions.
For a complete list of methods in the Stream class, see the Trident JavaDoc.
Filters
Trident filters provide a way to exclude tuples from a Stream based on
specific criteria. Implementing a Trident filter involves extending
BaseFilter
and implementing the isKeep()
method of
the Filter interface:
boolean isKeep(TridentTuple tuple);
The isKeep()
method takes a TridentTuple as input and returns a
boolean. If isKeep()
returns false
, the tuple is
dropped from the stream; otherwise the tuple is kept.
For example, to exclude words with fewer than three characters from the word count, you could apply the following filter implementation to the stream:
public class ShortWordFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { String word = tuple.getString(0); return word.length() > 3; } }
Functions
Trident functions are similar to Storm bolts, in that they consume individual
tuples and optionally emit new tuples. An important difference is that tuples
emitted by Trident functions are additive. Fields emitted by Trident functions
are added to the tuple and existing fields are retained. The Split
function in the word count example illustrates a function that emits additional
tuples:
public class Split extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } }
Note that the Split
function always processes the first (index 0)
field in the tuple. It guarantees this because of the way that the function was
applied using the Stream.each()
method:
stream.each(new Fields("sentence"), new Split(), new Fields("word"))
The first argument to the each()
method can be thought of as a
field selector. Specifying “sentence” tells Trident to select only that field
for processing, thus guaranteeing that the “sentence” field will be at index 0
in the tuple.
Similarly, the third argument names the fields emitted by the function. This behavior allows both filters and functions to be implemented in a more generic way, without depending on specific field naming conventions.