Developing Apache Storm Applications
Also available as:
PDF

Trident Windowing Implementation Details

For information about org.apache.storm.trident.Stream, see the Apache javadoc for Trident streams.

The following example shows a basic implementation of WindowStoreFactory for HBase, using HBaseWindowsStoreFactory and HBaseWindowsStore. It can be extended to address other use cases.

/**
 * Factory to create instances of {@code WindowsStore}.
 */
public interface WindowsStoreFactory extends Serializable {
       public WindowsStore create();
}

    /**
     * Store for storing window related entities like windowed tuples, triggers etc.
     *
     */
public interface WindowsStore extends Serializable {

public Object get(String key);

public Iterable<Object> get(List<String> keys);

public Iterable<String> getAllKeys();

public void put(String key, Object value);

public void putAll(Collection<Entry> entries);

public void remove(String key);

public void removeAll(Collection<String> keys);

public void shutdown();

    /**
     * This class wraps key and value objects which can be passed to {@code putAll} method.
     */
public static class Entry implements Serializable {
    public final String key;
    public final Object value;
...
}

A windowing operation in a Trident stream is a TridentProcessor implementation with the following lifecycle for each batch of tuples received:

// This is invoked when a new batch of tuples is received.
void startBatch(ProcessorContext processorContext);

// This is invoked for each tuple of a batch.
void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple);

// This is invoked for a batch to make it complete. All the tuples of this batch 
would have been already invoked with #execute(ProcessorContext processorContext, String streamId, TridentTuple tuple)
void finishBatch(ProcessorContext processorContext);

Each tuple is received in window operation through WindowTridentProcessor#execute ( ProcessorContext processorContext, String streamId, TridentTuple tuple). These tuples are accumulated for each batch.

When a batch is finished, associated tuple information is added to the window, and tuples are saved in the configured WindowsStore. Bolts for respective window operations fire a trigger according to the specified windowing configuration (like tumbling/sliding count or time). These triggers compute the aggregated result according to the given Aggregator. Results are emitted as part of the current batch, if it exists.

When a trigger is fired outside WindowTridentProcessor#finishBatch invocation, those triggers are stored in the given WindowsStore, and are emitted as part of the next immediate batch from that window’s processor.