User-defined Aggregates and Operators

 

The examples in this topic show how to extend the set of window-based operations in StreamInsight LINQ operators with user-defined aggregates (UDA) and user-defined operators (UDO). These extensions are defined over event windows and return zero or more result events. A user-defined aggregate or user-defined operator must be compiled into an assembly that can be accessed by the StreamInsight server in the same way that adapters are provided and used at run time.

StreamInsight also offers user-defined stream operators as another extensibility mechanism. User-defined stream operators are defined over the event stream directly instead of over event windows.

User-defined Aggregates

A user-defined aggregate is used on top of a window specification to aggregate over the events in that window and produce a single result value. A UDA takes as its input a CEP window (that is the result of a hopping, snapshot window, or count-based window operator) that contains a set of CEP events and outputs a single return value (a CLR type that maps to one of the StreamInsight primitive types). For more information about windows, see Using Event Windows.

You can implement UDAs that are more complex in their functionality, than the simpler aggregates similar to count, sum, and average provided by StreamInsight. One such example, computing time weighted averages, is discussed in a later section.

User-defined Operators

A user-defined operator is used on top of a window specification to process the events in the window and produce a set of one or more resulting events. A UDO takes as its input a CEP window (that is the result of a hopping, snapshot window, or count window operator) that contains a set of CEP events and outputs a set of CEP events or a set of CEP payloads.

A UDO can be used when you need the computation to generate or impact whole events, including their time stamps, for each window. An example is the setting of a status field of an event in addition to calculating an aggregation, where the status depends on the aggregation result and another parameter. For example, a UDO might produce a single event for each window that contains a payload field that has the aggregation result and a status field that indicates whether the aggregation result violated some constraint.

Time-Sensitivity in UDAs and UDOs

You can define UDAs and UDOs to be time-insensitive or time-sensitive, based on your choice of the base class to implement these operators.

Time-insensitive UDAs and UDOs do not expect to be passed whole events including their time stamps. Instead, they only consider a set of one or more payload fields from the events in the defined window. Also, the current window start and end time are not passed to them.

Time-sensitive UDAs and UDOs are passed a set of events for each window including their time stamps and the window start and end times. Whether a UDA or UDO is time-sensitive is determined by the respective base class from which the UDA or UDO author derives the implementation.

Implementing User-defined Aggregates

The author of a UDA has the following responsibilities:

  • Provide the actual implementation of the UDA.

  • Provide the extension method for LINQ to enable a query writer to use the UDA.

To implement a UDA, the user derives from the suitable base class: CepAggregate for time-insensitive UDAs or CepTimeSensitiveAggregate for time-sensitive UDAs.

The class derivation requires the instantiation of input and output type parameters. The input type represents either the whole payload (if the UDA needs to be able to look at the entire set of payload fields in the course of its computation) or a CLR type that maps to a corresponding primitive type in the StreamInsight type system (in the scenario where a singleton field is the input to the UDA). The output type in both cases must be must be a CLR type that maps to a corresponding primitive type.

Apart from the event data, an optional configuration structure at query start time can be passed to the constructor of the UDA class, if this is intended by the UDA author. If such a constructor is provided by the UDA author, the engine will call it accordingly at runtime, with the configuration provided by the caller of the UDA in LINQ.

Both time-insensitive as well as time-sensitive UDAs receive the payloads as an unordered set. In the case of a time-sensitive UDA, the timestamps of the events are additionally associated with each payload. Moreover, a window descriptor that defines the window start and end times is passed to the UDA.

User-defined Aggregate Examples

The following example implements a time-insensitive UDA. It expects a set of integer event fields. The optional configuration structure is not specified for this example implementation, hence the class does not need a specific constructor.

  
public class Median : CepAggregate<int, int>  
{  
    public override int GenerateOutput(IEnumerable<int> eventData)  
    {  
        var sortedData = eventData.OrderBy(e => e);  
        int medianIndex = (int)sortedData.Count() / 2;  
        return sortedData.Count() % 2 == 0 ?  
            (sortedData.ElementAt(medianIndex) + sortedData.ElementAt(medianIndex - 1)) / 2 :  
            sortedData.ElementAt(medianIndex);  
    }  
}  
  

In addition to implementing the UDA, the you must provide an extension method for LINQ to enable the query writer to use the UDA. The extension method is a signature that enables the query author to use the aggregate and compile the query. Through an attribute, the StreamInsight LINQ provider can refer to the actual class that contains the UDA implementation, as shown in the following example.

public static class MyUDAExtensionMethods  
{  
    [CepUserDefinedAggregate(typeof(Median))]  
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)  
    {  
           throw CepUtility.DoNotCall();  
    }  
}  

Here, there must be an implementation of a UDA through a class Median, which implements a UDA that operates on a single field of type int and returns a value of type int. The expression in the signature of the function represents the mapping from the event type of the input stream to a single integer value. Notice that the extension method will never be executed, therefore, the CepUtility.DoNotCall() in its body. Based on this specification, the UDA can be used in LINQ, as shown in the following example.

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))  
select new { f1 = w.Med(e => e.val) }  

The lambda expression argument maps the event payload to an integer value that will be the input to the UDA. In this case, the median over the values of the event field val will be calculated for each window.

Next, consider the example of a time insensitive UDA that has configuration information. It expects a whole payload of type Trade as input and returns values of type double. This example also includes the corresponding extension method:

public class Trade  
{  
    public double Volume { get; set; }  
    public double Price { get; set; }  
}  
  
public class Vwap : CepAggregate<Trade, double>  
{  
    double weight;  
  
    /// <summary>  
    /// Constructor for parameterized UDA  
    /// </summary>  
    public Vwap(double w)  
    {  
        weight = w;  
    }  
  
    public override double GenerateOutput(IEnumerable<Trade> events)  
    {  
        double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);  
  
        return vwap * weight;  
    }  
}  
  
static public partial class UDAExtensionMethods  
{  
    [CepUserDefinedAggregate(typeof(Vwap))]  
    public static double vwap(this CepWindow<Trade> window, double w)  
    {  
        throw CepUtility.DoNotCall();  
    }  
}  

Because the entire payload will be the input, no lambda expression is specified by the extension method. The only parameter to the UDA is the value for the configuration (which is of double here):

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))  
             select new { f1 = w.vwap(2.5) }  

Next, consider the example of a time sensitive UDA with configuration information. The UDA is a time-weighted average with interval events interpreted as a step function (that is, with each interval valid until the next one). Similar to the previous example, it does not expect the whole payload as input, but only values of type double.

Be aware that, even though the event payloads are reduced to double values, the input set is still defined as a set of interval events, instead of a set of payloads as was the case for the time-insenstivie UDA. This is needed to include the timestamps because the UDA is specified as time-sensitive. Moreover, the window itself is given in the form of a WindowDescription object, which has a start time and an end time property. These timestamps are specified in UTC time. Also notice that UdaConfig is a class or structure that must be serializable through DataContractSerializer.

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>  
{  
    UdaConfig _udaConfig;  
    public TimeWeightedAverage(UdaConfig udaConfig)  
    {  
        _udaConfig = udaConfig;  
    }  
  
    public override double GenerateOutput(IEnumerable<IntervalEvent<double>> events,  
                                          WindowDescriptor windowDescriptor)  
    {  
        double avg = 0;  
        foreach (IntervalEvent<double> intervalEvent in events)  
        {  
            avg += intervalEvent.Payload * (intervalEvent.EndTime -   
                                            intervalEvent.StartTime).Ticks;  
        }  
        avg = avg / (windowDescriptor.EndTime -   
                     windowDescriptor.StartTime).Ticks;  
        return avg * udaConfig.Multiplier;  
    }  
}  

Where UDAConfig is

public class UDAConfig  
{  
    public double Multiplier { get; set; }  
}  

The extension method now also includes the following configuration structure:

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]  
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)  
{  
    throw CepUtility.DoNotCall();  
}  

The configuration becomes another parameter in the extension method:

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))  
         select new w.TimeWeightedAverage (e => e.dval,  
                            new UdaConfig(){ Multiplier = 5 });   

The examples discussed so far consider scenarios where the event is typed. That is, the payload types are already known at the time of implementation of the UDA. The following example implements a UDA that has a generic input type in which the input type is passed into the UDA only at runtime.

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>  
{  
    public GenericInputUda(SampleUdaConfig config)  
    {  
        // ...  
    }  
  
    public override bool GenerateOutput(IEnumerable<TInput> payloads)  
    {  
        // ...  
    }  
}  

Implementing User-defined Operators

The author of a UDO has the following responsibilities:

  • Provide the actual implementation of the UDO.

  • Provide the extension method for LINQ to enable a query writer to use the UDO.

To implement a UDO, the user derives from the suitable base class: CepOperator for time-insensitive UDOs or CepTimeSensitiveOperator. The class derivation requires the instantiation of the input and output type parameter. The input type always represents the whole payload. The output type is either a set of payloads or a set of events, depending on the selected base class.

In addition to the event data, you can pass an optional configuration structure at query start time to the constructor of the UDO class, if this is intended by the UDO author. If a constructor is provided by the UDO author, the engine calls it accordingly at runtime with the configuration provided by the caller of the UDO in LINQ.

Both time-insensitive and time-sensitive UDOs receive the payloads as an unordered set. In the case of a time-sensitive UDO, the timestamps of the events are additionally associated with each payload. Moreover, a window descriptor that defines the window start and end times is passed to the UDO.

CTI Behavior in User-Defined Operators

UDOs will change Current Time Increments (CTI) in the following way: When a window is still "open", that is, no CTI has been received with a time stamp after the window end time, all CTIs that fall within the window are changed to the window start time. This ensures that the output of the UDO, which can potentially contain user-defined time-stamps, can change as long as the window is still open.

Example User-defined Operator Implementations

The following example implements a time-insensitive UDO that does not have configuration information.

public class SampleUDO : CepOperator<Input, Output>  
{  
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)  
    {  
        Output output = new Output();  
        output.total = 0;  
        output.status = "good";  
  
        foreach (Input payload in payloads)  
        {  
            output.total += payload.Value;  
            if (payload.Flag == 4)  
            {  
                output.status = "bad";  
                break;  
            }  
        }  
        List<Output> outputCollection = new List<Output>();  
        outputCollection.Add(output);  
        return outputCollection;  
    }  
}  

The following example shows how to change the signature to a time-sensitive UDO that accepts configuration information.

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>  
{  
    public GenericOutputUdo(SampleUdoConfig config)  
    {  
        ...  
    }  
  
    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(  
                             IEnumerable<IntervalEvent<InputEventType>> payloads,  
                             WindowDescriptor windowDescriptor)  
    {  
        ...  
    }  
}  

Example Extension Methods for User-defined Operators

In addition to implementing the UDO, the UDO author must provide an extension method for LINQ to enable the query writer to use the UDO. The extension method is a signature that enables the query author to use the operator and compile the query. Through an attribute, the LINQ provider can refer to the actual class that contains the UDO implementation, as shown in the following example.

[CepUserDefinedOperator(typeof(SampleUDO))]  
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)  
{  
    throw CepUtility.DoNotCall();  
}  
  

This UDO could now be used in the following way.

  
from w in inputStream.Snapshot
()  
from y in w.UserDefinedOperator
(()=>new Udo
())  
select y  
  

Or:

  
from w in inputStream.Snapshot
()  
from y in w.UdoMacro
(1)  
select y  
  

Note


For CEPStream streams, use:

var newstream = from w in inputStream.Snapshot()  
                select w.MyUDO();  

The following example shows the extension method and use for a UDO that has a configuration structure, referring to an implementation that is contained in a class named SampleUDOwithConfig.

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]  
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)  
{  
    throw CepUtility.DoNotCall();  
}  
  
var newstream = from w in inputStream.SnapshotWindow()  
                select w.MyUDO(new UDOConfig());  

Culture-specific event field properties

Extensions such as UDOs, UDAs, and UDFs can be regarded as interfaces between the CEP domain with its type system and the .Net CLR. For some applications, it is desirable to be able to pass culture information through this interface. For UDAs and UDOs, the extension author can implement an additional interface, IDeclareEventProperties, that allows for the inspection or setting of cultural properties on event fields. To implement this interface, you must provide a function DeclareEventProperties, which returns an object of CepEventType, that can carry culture information for its fields, as shown in the following example:

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties  
{  
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)  
    {  
        ...  
    }  
  
    public CepEventType DeclareEventProperties(CepEventType outputEventType)  
    {  
        // assuming string field 'loc' in type Input  
        // assuming string fields 'firstName' and 'location' in type Output  
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");  
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;  
        return outputEventType;  
    }  
}  

This example UDO takes input events of type Input and produces events of type Output. The type Output has string fields which the UDO author explicitly wants to annotate with a certain culture information. The culture named zh-CN is applied to the output field firstName, whereas the output field location is annotated with the same culture that is associated with the field loc in the input event type of the UDO. For every event that is produced by the UDO at runtime, these cultures are applied to its fields before the event is inserted into the output stream of the UDO.

The same interface also exists for user-defined aggregates. Since aggregates have a single return value only, in order to apply culture-specific information to such a field, the IDeclareEventProperties interface wraps the return value into a CepEventType with a single field, in order to provide a way to annotate that field with CEP-specific event properties.

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties  
{  
    public override string GenerateOutput(IEnumerable<Input> events)  
    {  
        ...  
    }  
  
    public CepEventType DeclareEventProperties(CepEventType outputEventType)  
    {  
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");  
        return outputEventType;  
    }  
}  

Here, the string representing the result of the aggregate is wrapped into a CepEventType, so that the UDA author can set the CultureInfo property on that field. This culture information will be propagated to the actual event field that receives the aggregation result in the LINQ query, where the UDA is used.

See Also

Using StreamInsight LINQ
Using Event Windows