Benutzerdefinierte Datenstromoperatoren

Mit einem benutzerdefinierten Datenstromoperator kann die benutzerdefinierte Verarbeitung von Ereignisdatenströmen festgelegt werden.

Verwendungsmuster

In einer Abfrage wird ein benutzerdefinierter Datenstromoperator mit der Scan-Erweiterungsmethode von CepStream aufgerufen. Sie stellen den Eingabedatenstrom und den Anfangszustand für den Operator wie im folgenden Beispiel bereit.

var query = input.Scan(new SmoothingOperator(0.7));

Der Autor des Operators leitet eine neue Klasse vom abstrakten CepPointStreamOperator-Typ oder vom abstrakten CepEdgeStreamOperator-Typ ab. Der neue Typ kapselt den Zustandsautomaten des Operators. Ein Aufruf des Konstruktors dieses Typs wird an den Scan-Operator übergeben, um den Anfangszustand des Operators einzurichten.

Merkmale eines benutzerdefinierten Datenstromoperators

Mit einem benutzerdefinierten Datenstromoperator kann der Benutzer verfahrensorientiert mit dem Ereignisdatenstrom interagieren. Dabei stellt er eine Grenze zwischen dem StreamInsight-Ereignisverarbeitungsmodul und der CLR dar, ähnlich wie Adapter und benutzerdefinierte Operatoren oder Aggregate. In allen diesen Fällen erfüllen das Modul sowie der Entwickler einen Vertrag in Bezug auf die zeitlichen Eigenschaften eines Datenstroms. Das StreamInsight-Modul gibt für den benutzerdefinierten Datenstromoperator die folgenden Garantien, um ein deterministisches Verhalten für eine spezifische Folge von Abfrageeingabeereignissen sicherzustellen:

  • Ein benutzerdefinierter Datenstromoperator garantiert den geordneten Empfang von Ereignissen nach ihrer Synchronisierungszeit (Startzeit für Punktereignisse und Start-Edge-Ereignisse, Endzeit für End-Edge-Ereignisse). Intervallereignisse werden nicht unterstützt, da sie über keine Darstellung der Synchronisierungszeit verfügen, weil jedes Ereignis zwei Zeitpunkte (Start und Ende) darstellt.

  • Nur Einfügeereignisse werden an einen benutzerdefinierten Datenstromoperator übergeben. CTI (Current Time Increment)-Ereignisse im eingehenden Datenstrom sind transparent für den benutzerdefinierten Datenstromoperator, sie bestimmen jedoch immer noch, auf welche Weise der benutzerdefinierte Datenstromoperator den Zeitverlauf wahrnimmt (siehe NextCti unten).

  • Ein benutzerdefinierter Datenstromoperator kann durch StreamInsight deaktiviert werden, in Abhängigkeit davon, ob er zugelassen wird (siehe IsEmpty unten). Ein deaktivierter benutzerdefinierter Datenstromoperator kann von StreamInsight wiederverwendet werden.

  • Jedes die Einfügeereignis bewirkt einen Aufruf von ProcessEvent, gefolgt von der Abfrage der NextCti-Eigenschaft und der IsEmpty-Eigenschaft.

Eingabe und Ausgabe eines benutzerdefinierten Datenstromoperators

Ein benutzerdefinierter Datenstromoperator verarbeitet zu einem bestimmten Zeitpunkt jeweils ein Eingabeereignis. Als Reaktion auf die einzelnen Eingabeereignisse können 0-*-Ausgabeereignisse erzeugt werden. Als Reaktion auf eine Eingabe kann auch der interne Status des Operators aktualisiert werden. Ein Eingabeereignis kann entweder ein CTI (auf Anforderung des Operators zum Angeben des Zeitverlaufs generiert) oder ein Insert sein. Eingaben werden mit zeitbezogenen Anmerkungen versehen.

Im Unterschied dazu stellt ein Ausgabeereignis lediglich eine Ereignisnutzlast dar. Es gibt keine Möglichkeit, Ausgabeereignisse mit Zeitstempeln zu versehen oder CTIs in den Ausgabedatenstrom einzufügen. Ausgabeereignisse werden als Punktereignisse generiert, mit Zeitstempeln, die auf den Zeitstempeln der zugehörigen Eingabeereignisse basieren.

Behandeln der Zeit in einem benutzerdefinierten Datenstromoperator

Wenn Sie einen neuen benutzerdefinierten Datenstromoperator erstellen, muss im Code lediglich die Nutzlast der Ereignisse verarbeitet werden. Die Zeit wird ausschließlich von StreamInsight verarbeitet. Eingabeereignisse werden in ihrer jeweiligen Reihenfolge empfangen. Der Zeitstempel jedes Ausgabeereignisses basiert auf dem Zeitstempel des entsprechenden Eingabeereignisses. Wenn beispielsweise ein Edge-End-Ereignis ein Ausgabeereignis auslöst, erhält dieses Ausgabeereignis den Zeitstempel des Edge-End-Ereignisses. Daher kann der Operator durch die Zeit beeinflusst werden, er kann sie jedoch nicht steuern.

Der benutzerdefinierte Datenstromoperator empfängt CTIs nicht direkt aus dem Eingabedatenstrom in seiner ProcessEvent()-Methode, er kann jedoch über die NextCti-Eigenschaft auf den Zeitverlauf reagieren. Diese Eigenschaft wird vom Modul nach jedem Aufruf von ProcessEvent() abgerufen. Der benutzerdefinierte Datenstromoperator kann einen Zeitstempel zurückgeben, der den nächsten empfangenen CTI-Zeitstempel als Aufruf von ProcessEvent() angibt.

Nur die CTIs, die durch Festlegen der NextCti-Eigenschaft angefordert wurden, werden an ProcessEvent übergeben. Diese CTIs werden nicht über die Grenzen des benutzerdefinierten Datenstromoperators hinaus weitergegeben.

Implementieren eines benutzerdefinierten Datenstromoperators

Leiten Sie zum Erstellen eines neuen benutzerdefinierten Datenstromoperators eine neue Klasse von der abstrakten CepPointStreamOperator-Basisklasse oder der abstrakten CepEdgeStreamOperator-Basisklasse ab.

  • Wenn Sie von der abstrakten CepPointStreamOperator-Basisklasse ableiten, erkennt der Operator Eingabeereignisse als Punktereignisse. Wenn es sich bei den Ereignissen jedoch nicht um Punktereignisse handelt, ist dies kein Fehler. Vom Operator werden lediglich ihre Startzeiten erkannt.

  • Wenn Sie von der abstrakten CepEdgeStreamOperator-Basisklasse ableiten, sind für den Operator sowohl Start-Edge als auch End-Edge für Eingabeereignisse sichtbar.

Überschreiben Sie in der abgeleiteten Klasse die folgenden Eigenschaften und Methoden:

  • ProcessEvent-Methode. Generiert eine Ausgabe und aktualisiert den internen Status des Operators als Reaktion auf jedes Eingabeereignis. ProcessEvent empfängt ein Eingabeereignis und kann null oder mehr Ausgabenutzlasten zurückgeben.

  • IsEmpty-Eigenschaft. Gibt an, ob der interne Status des Operators leer ist. Wenn true, kann das StreamInsight-Abfragemodul die Operatorinstanz verwerfen, um die Arbeitsspeicherauslastung zu verringern.

  • Optional die NextCti-Methode. Gibt den nächsten Zeitpunkt an, zu dem ein CTI-Ereignis an den Operator gesendet wird. Durch Überschreiben dieser Eigenschaft kann der benutzerdefinierte Operator zu einem bestimmten Zeitpunkt in der Zukunft eine Ausgabe erzeugen oder nach Ablauf eines Anwendungszeitintervalls angeben, dass sein interner Status leer ist.

Die abgeleitete Klasse muss außerdem die WCF-Serialisierung implementieren. Weitere Informationen finden Sie unter Vorgehensweise: Erstellen eines grundlegenden Datenvertrags für eine Klasse oder Struktur

Kommunizieren des StreamInsight-Moduls mit dem Operator

Für jede Instanz des Operators wird die ProcessEvent-Methode mit den Ereignissen aufgerufen, die nach ihrer Synchronisierungszeit geordnet sind. Für ein Punktereignis oder CTI ist die Synchronisierungszeit die gültige Startzeit. Für ein Edge-Ereignis ist die Synchronisierungszeit die gültige Startzeit für Start-Edges bzw. die gültige Endzeit für End-Edges.

Nach jedem Aufruf der ProcessEvent-Methode werden die IsEmpty-Eigenschaft und die NextCti-Eigenschaft abgerufen.

Wenn der Operator NextCti überschreibt, garantiert das Modul, dass es sich bei dem nächsten vom Operator verarbeiteten Ereignis entweder um ein Einfügeereignis mit einer Synchronisierungszeit handelt, die kleiner als der Wert von NextCti ist, oder um ein CTI mit dem Wert von NextCti als Startzeit. Wenn der Operator einen NextCti-Wert zurückgibt, der kleiner als die oder gleich der Synchronisierungszeit des letzten verarbeiteten Ereignisses ist, wird dieser ignoriert. Die NextCti-Eigenschaft ermöglicht dem Operator das „Übersetzen“ des Zeitverlaufs des Eingabedatenstroms in seinen eigenen Takt (in Form dieser internen CTIs) und das anschließende entsprechende Reagieren auf diesen Verlauf.

Operatoren werden nur als Reaktion auf Einfügeereignisse aktiviert. CTIs hingegen lösen keine Aktivierung aus. Ein Operator wird deaktiviert, wenn er den Wert true für IsEmpty zurückgibt.

An einem beliebigen Punkt kann das Modul die Serialisierung des Operators ausführen und seinen Verweis auf den Operator freigeben. Wenn der Operator zu einem späteren Zeitpunkt deserialisiert wird, wird er an der Position fortgesetzt, an der er angehalten wurde.

Beispiele für benutzerdefinierte Datenstromoperatoren

Exponentielle Glättung

Dieser benutzerdefinierte Datenstromoperator behandelt einen Datenstrom von Punktereignissen als Folge von Werten und wendet die exponentielle Glättung an. Beachten Sie, dass ein Verweis auf System.Runtime.Serialization erforderlich ist.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Mustervergleich

Dieses einfache Beispiel für einen Mustervergleich veranschaulicht eine alternative Verwendung von IsEmpty und NextCti. In diesem Beispiel sucht der Operator nach einem Ereignis mit dem Wert 1,0, auf das innerhalb von 30 Sekunden kein Ereignis mit dem Wert 2,0 folgt. (Dieses Beispiel soll nützliche Konzepte in benutzerdefinierten Datenstromoperatoren veranschaulichen. In einer tatsächlichen Anwendung ist dieses Muster einfach genug, um es mithilfe von integrierten Operatoren in StreamInsight zu implementieren.)

Im vorherigen Beispiel wurde mit NextCti die Lebensdauer eines Operators gesteuert. In diesem Beispiel wird ebenfalls NextCti zu diesem Zweck verwendet, darüber hinaus wird jedoch mit NextCti eine Ausgabe als Reaktion auf den Zeitverlauf erzeugt.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Definieren einer Hilfsmethode zum Vereinfachen der Verwendung

Möglicherweise möchten Sie die Verwendung des Operators in einer Abfrage vereinfachen. Für den Autor der Abfrage wäre es beispielsweise einfacher, input.Smooth(0.5) anstelle von input.Scan(new SmoothingOperator(0.5)) zu schreiben.

Sie können dieses vereinfachte Muster aktivieren, indem Sie eine benutzerdefinierte Erweiterungsmethode wie die Folgende erstellen:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }