User-Defined Stream Operators

A user-defined stream operator lets you define custom processing of event streams.

Usage Pattern

Within a query, you call a user-defined stream operator by using the Scan extension method of the CepStream. You provide the input stream and the initial state for the operator, as in the following example.

var query = input.Scan(new SmoothingOperator(0.7));

The author of the operator derives a new class from the abstract CepPointStreamOperator or CepEdgeStreamOperator types. The new type encapsulates the state machine of the operator. A call to the constructor of this type is passed to the scan operator to establish the initial state for the operator.

Characteristics of a User-Defined Stream Operator

A user-defined stream operator lets the user interact with the event stream in a procedural way. As such, it represents a boundary between the StreamInsight event processing engine and the CLR, similar to adapters and user-defined operators or aggregates. In all these cases, the engine as well as the developer are fulfilling a contract about the temporal properties of a stream. The StreamInsight engine makes the following guarantees to the user-defined stream operator in order to behave deterministically for a specific sequence of query input events:

  • A user-defined stream operator is guaranteed to receive events ordered by their sync time (start time for point and start edge events, end time for end edge events). Interval events are not supported because they do not have a straightforward sync-time ordered representation, as each event indicates two points in time – a start and an end.

  • Only insert events are passed to a user-defined stream operator. Current Time Increment events (CTIs) in the incoming stream are transparent to the user-defined stream operator, but they still determine how the user-defined stream operator perceives the passage of time (see NextCti below).

  • A user-defined stream operator can be deactivated by StreamInsight depending on whether it allows it (see IsEmpty below). A deactivated user-defined stream operator can be recycled by StreamInsight.

  • Each insert event causes ProcessEvent to be called, followed by the polling of the NextCti and IsEmpty properties.

Input and Output of a User-Defined Stream Operator

A user-defined stream operator processes one input event at a time. In response to each input event, it may produce 0-* output events. The operator may also update its internal state in response to an input. An input event can be either a CTI – generated at the request of the operator to indicate the passage of time – or an insert. Inputs are temporally annotated.

In contrast, an output event is simply an event payload. There is no opportunity to timestamp output events or inject CTIs into the output stream. Output events are generated as point events, with timestamps that are based on the timestamps of the corresponding input events.

Handling Time in a User-Defined Stream Operator

When you create a new user-defined stream operator, your code only has to process the payload of events. Time is handled exclusively by StreamInsight. Input events are received in order. The timestamp of each output event is based on the timestamp of the corresponding input event. For example, if an edge end event triggers and output event, then that output event receives the timestamp of the edge end event. Therefore the operator can be influenced by time, but cannot control it.

The user-defined stream operator does not receive CTIs directly from the input stream in its ProcessEvent() method, but it has the ability to react to the passage of time through the NextCti property. This property is polled by the engine after every call to ProcessEvent(). The user-defined stream operator can return a time stamp that indicates the next CTI timestamp it will receive as a call into ProcessEvent().

Only those CTIs that have been requested by setting the NextCti property will be passed to ProcessEvent. These CTIs will not be propagated outside of the user-defined stream operator.

Implementing a User-Defined Stream Operator

To create a new user-defined stream operator, derive a new class from the abstract CepPointStreamOperator or CepEdgeStreamOperator base classes.

  • If you derive from the abstract CepPointStreamOperator base class, then the operator sees input events as point events. However it is not an error if the events are not in fact point events. The operator sees only their start times.

  • If you derive from the abstract CepEdgeStreamOperator base class, then the operator sees both start and end edges for input events.

In your derived class, you override the following properties and methods:

  • ProcessEvent method. Generates output and updates the internal state of the operator in response to each input event. ProcessEvent receives one input event and can return zero or more output payloads.

  • IsEmpty property. Indicates whether the internal state of the operator is empty. When true, the StreamInsight query engine may discard the operator instance to reduce memory utilization.

  • Optionally, the NextCti method. Indicates the next point in time at which a CTI event will be sent to the operator. Overriding this property lets the user-defined operator produce output at a specific point in the future, or indicate that its internal state is empty after some application time interval has elapsed.

The derived class must also implement WCF serialization. For more information, see How to: Create a Basic Data Contract for a Class or Structure

How the StreamInsight Engine Interacts with the Operator

For each instance of the operator, the ProcessEvent method is called with events in sync time order. For a point event or CTI, the sync time is the valid start time. For an edge event, the sync time is the valid start time for start edges or the valid end time for end edges.

After every call to the ProcessEvent method, the IsEmpty and NextCti properties are polled.

When the operator overrides NextCti, the engine guarantees that the next event processed by the operator will either be an insert event with a sync time less than the value of NextCti, or a CTI with the value of NextCti as its start time. If the operator returns a NextCti value that is less than or equal to the sync time of the last processed event, it is ignored. The NextCti property allows the operator to “translate” the progress of time in the input stream into its own rhythm (in the form of these internal CTIs) and then react to this progress accordingly.

Operators are activated in response to insert events only. CTIs do not trigger activation. An operator is deactivated when it returns true from IsEmpty.

At any point, the engine may choose to serialize the operator and releasing its reference to it. When the operator is later deserialized, it is expected to pick up where it left off.

Examples of User-Defined Stream Operators

Exponential Smoothing

This user-defined stream operator treats a stream of point events as a sequence of values and applies exponential smoothing. Note that a reference to System.Runtime.Serialization is required.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Pattern Matching

This simple pattern matching example illustrates an alternate use of IsEmpty and NextCti. In this example, the operator looks for an event with value 1.0 that is not followed by an event with value 2.0 within thirty seconds. (This example is provided to illustrate useful concepts in user-defined stream operators. In an actual application, this pattern is simple enough to be implemented by using built-in operators in StreamInsight.)

The previous example used NextCti to control the lifetime of an operator. This example also uses NextCti for this purpose, but in addition it uses NextCti to produce output in response to the passage of time.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Defining a Helper Method to Simplify Usage

You may want to simplify the usage of the operator in a query. For example, it would be more convenient for the query author to write input.Smooth(0.5) than input.Scan(new SmoothingOperator(0.5)).

You can enable this simplified pattern by creating a custom extension method like the following:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }