Hortonworks Streaming Analytics Manager User Guide
Also available as:
PDF

Creating UDAFs

About This Task

User Defined Aggregate Functions (UDAF) allow you to add custom aggregate functions to SAM. Once you create and register UDAFs they are available for use in the Aggregate processor. Use these steps to create a new UDAF.

Steps

  1. Create a UDAF in which A is the type of aggregate, V is the type of value, and R is the result type:

    public interface UDAF<A, V, R> {
       A init();
       A add(A aggregate, V val);
       R result(A aggregate);
    }
    

    init returns the initial value for the aggregate.

    The add method is invoked with the current aggregate and the value for each of the events in the window. add aggregates the current value and returns the updated aggregate.

    The result function takes the final aggregated value and returns the result.

  2. If your aggregate function requires two parameters, you must also createUDAF2:

    public interface UDAF2<A, V1, V2, R> {
       A init();
       A add(A aggregate, V1 val1, V2 val2);
       R result(A aggregate);
    }
    

    The add function is passed as the current value of the aggregate and two values instead of one.

Example

In the following example, you want to compute the average values of a particular field for events within a window. To do that, you define an average aggregate function by implementing the UDAF interface:

// Here the aggregate is a pair that holds the running sum and the count of elements seen so far
// The values are integers and the result is a double.
public class MyAvg implements UDAF<Pair<Integer, Integer>, Integer, Double> {
 
// Here we initialize the aggregate and return its initial value (sum = 0 and count = 0).
@Override
public Pair<Integer, Integer> init() { return Pair.of(0, 0); }
 
// Here we update the sum and count values in the aggregate and return the updated aggregate
@Override
public Pair<Integer, Integer> add(Pair<Integer, Integer> agg, Integer val) {
       return Pair.of(agg.getKey() + val, agg.getValue() + 1);
   }

// Here we return the value of the sum divided by the count which is the average of the aggregated values.
   @Override
   public Double result(Pair<Integer, Integer> agg) {
       return (double) agg.getKey() / agg.getValue();
   }
}