Benutzerdefinierte Aggregate und Operatoren

Die Beispiele in diesem Thema zeigen, wie der Satz fensterbasierter Vorgänge in StreamInsight-LINQ-Operatoren mit benutzerdefinierten Aggregaten (UDA) und benutzerdefinierten Operatoren (UDO) erweitert werden kann. Diese Erweiterungen werden für Ereignisfenster definiert und geben null oder mehr Ergebnisereignisse zurück. Ein benutzerdefiniertes Aggregat oder ein benutzerdefinierter Operator muss in eine Assembly kompiliert werden, auf die vom StreamInsight-Server in gleicher Weise zugegriffen werden kann, auf die auch Adapter zur Verfügung gestellt und während der Laufzeit verwendet werden.

StreamInsight stellt als zusätzlichen Erweiterungsmechanismus außerdem benutzerdefinierte Datenstromoperatoren bereit. Benutzerdefinierte Datenstromoperatoren werden für den Ereignisdatenstrom definiert, nicht für Ereignisfenster.

Benutzerdefinierte Aggregate

Ein benutzerdefiniertes Aggregat wird über einer Fensterspezifikation verwendet, um über den Ereignissen in diesem Fenster zu aggregieren und einen einzelnen Ergebniswert zu erzeugen. Ein UDA erfordert als Eingabe ein CEP-Fenster (d. h. das Ergebnis eines Operators für ein springendes, ein Momentaufnahmefenster oder ein anzahlbasiertes Fenster), das einen Satz CEP-Ereignisse enthält und einen einzelnen Rückgabewert (einen CLR-Typ, der einem der StreamInsight-Primitivtypen zugeordnet wird) ausgibt. Weitere Informationen zu Fenstern finden Sie unter Verwenden von Ereignisfenstern.

Sie können UDAs implementieren, die in ihrer Funktionalität komplexer sind als die count, sum und average ähnlichen einfacheren Aggregate, die von StreamInsight bereitgestellt werden. Ein solches Beispiel, das Berechnen von zeitlich gewichteten Durchschnitten, wird in einem späteren Abschnitt erläutert.

Benutzerdefinierte Operatoren

Ein benutzerdefinierter Operator wird über einer Fensterspezifikation verwendet, um die Ereignisse im Fenster zu verarbeiten und einen Satz mit einem oder mehreren resultierenden Ereignisse zu erzeugen. Ein UDO erfordert ein CEP-Fenster als Eingabe (d. h. das Ergebnis eines Operators für ein springendes, ein Momentaufnahmefenster oder Anzahlfenster), das einen Satz an CEP-Ereignissen enthält und einen Satz an CEP-Ereignissen oder einen Satz von CEP-Nutzlasten ausgibt.

Ein UDO kann verwendet werden, wenn die Berechnung ganze Ereignisse für jedes Fenster, einschließlich der Zeitstempel, generieren oder Auswirkungen darauf haben soll. Ein Beispiel ist die Einstellung eines Statusfelds eines Ereignisses zusätzlich zur Berechnung einer Aggregation, wobei der Status vom Aggregationsergebnis und einem anderen Parameter abhängt. Ein UDO könnte z. B. ein einzelnes Ereignis für jedes Fenster erzeugen, das ein Nutzlastfeld enthält, welches über das Aggregationsergebnis und ein Statusfeld verfügt, das angibt, ob das Aggregationsergebnis gegen einige Einschränkungen verstoßen hat.

Zeitsensibilität in UDAs und UDOs

Sie können UDAs und UDOs als zeitunempfindlich oder zeitempfindlich definieren, abhängig von der Auswahl der Basisklasse zum Implementieren dieser Operatoren.

Zeitunempfindliche UDAs und UDOs erwarten nicht, dass ganze Ereignisse einschließlich der Zeitstempel übergeben werden. Stattdessen betrachten sie nur einen Satz von einem oder mehr Nutzlastfeldern von den Ereignissen im definierten Fenster. Außerdem wird die aktuelle Fensterstart- und -endzeit nicht an sie übergeben.

Den zeitempfindlichen UDAs und UDOs wird pro Fenster ein Satz von Ereignissen übergeben. Zu den Informationen gehören auch die Zeitstempel und die Start- und Endzeiten der Fenster. Ob ein UDA oder UDO zeitempfindlich ist, wird von der jeweiligen Basisklasse bestimmt, von der der UDA- oder UDO-Autor die Implementierung ableitet.

Implementieren benutzerdefinierter Aggregate

Der Autor eines UDA hat die folgenden Verantwortungen:

  • Bereitstellung der tatsächlichen Implementierung des UDA.

  • Bereitstellung der Erweiterungsmethode für LINQ, um es einem Abfrageautor zu ermöglichen, das UDA zu verwenden.

Um ein UDA zu implementieren, leitet der Benutzer von der geeigneten Basisklasse ab: CepAggregate für zeitunempfindliche UDAs oder CepTimeSensitiveAggregate für zeitempfindliche UDAs.

Die Klassenableitung erfordert die Instanziierung der Eingabe- und Ausgabetypparameter. Der Eingabetyp stellt entweder die gesamte Nutzlast dar (wenn das UDA in der Lage sein muss, den ganzen Satz von Nutzlastfeldern im Verlauf seiner Berechnung zu betrachten) oder einen CLR-Typ, der im StreamInsight-Typsystem einem entsprechenden primitiven Typ zugeordnet ist (in dem Szenario, in dem ein Singletonfeld die Eingabe für das UDA ist). Der Ausgabetyp muss in beiden Fällen ein CLR-Typ sein, der einem entsprechenden primitiven Typ zugeordnet wird.

Neben den Ereignisdaten kann eine optionale Konfigurationsstruktur zur Abfragestartzeit an den Konstruktor der UDA-Klasse übergeben werden, wenn dies vom UDA-Autor beabsichtigt ist. Wenn ein solcher Konstruktor vom UDA-Autor bereitgestellt wird, ruft das Modul ihn entsprechend zur Laufzeit auf, mit der in LINQ vom Aufrufer des UDA bereitgestellten Konfiguration.

Zeitunempfindliche sowie zeitempfindliche UDAs empfangen die Nutzlasten als ungeordneten Satz. Im Fall eines zeitempfindlichen UDA sind jeder Nutzlast zusätzlich die Zeitstempel der Ereignisse zugeordnet. Darüber hinaus wird ein Fensterdeskriptor, der die Start- und Endzeiten für Fenster definiert, an das UDA übergeben.

Beispiele für benutzerdefinierte Aggregate

Im folgenden Beispiel wird ein zeitunempfindliches UDA implementiert. Es erwartet einen Satz ganzzahliger Ereignisfelder. Die optionale Konfigurationsstruktur wird für diese Beispielimplementierung nicht angegeben, daher benötigt die Klasse keinen bestimmten Konstruktor.

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

Zusätzlich zur Implementierung des UDA müssen Sie eine Erweiterungsmethode für LINQ bereitstellen, um es dem Abfrageautor zu ermöglichen, das UDA zu verwenden. Die Erweiterungsmethode ist eine Signatur, die dem Abfrageautor ermöglicht, das Aggregat zu verwenden und die Abfrage zu kompilieren. Durch ein Attribut kann der StreamInsight-LINQ-Anbieter auf die tatsächliche Klasse verweisen, die die UDA-Implementierung enthält, wie im folgenden Beispiel gezeigt wird.

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

Hier muss eine Implementierung eines UDA durch einen Klassen-Median vorhanden sein, der ein UDA implementiert, das in einem einzelnen Feld des Typs int operiert und einen Wert vom Typ int zurückgibt. Der Ausdruck in der Signatur der Funktion stellt die Zuordnung vom Ereignistyp des Eingabedatenstroms zu einem einzelnen ganzzahligen Wert dar. Beachten Sie, dass die Erweiterungsmethode nie ausgeführt wird, daher der CepUtility.DoNotCall() in ihrem Textkörper. Auf Grundlage dieser Spezifikation kann das UDA in LINQ verwendet werden, wie im folgenden Beispiel gezeigt wird.

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

Das Argument des Lambda-Ausdrucks ordnet die Ereignisnutzlast einem ganzzahligen Wert zu, der die Eingabe des UDA darstellt. In diesem Fall wird der Median über den Werten des Ereignisfelds val für jedes Fenster berechnet.

Betrachten Sie danach das Beispiel für ein zeitunempfindliches UDA, das Konfigurationsinformationen aufweist. Es erwartet eine vollständige Nutzlast des Typs Trade als Eingabe und gibt Werte des Typs double zurück. Dieses Beispiel beinhaltet auch die zugehörige Erweiterungsmethode:

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

Da die gesamte Nutzlast die Eingabe ist, wird kein Lambda-Ausdruck von der Erweiterungsmethode angegeben. Der einzige Parameter an das UDA ist der Wert für die Konfiguration (in diesem Fall double):

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

Betrachten Sie danach das Beispiel für ein zeitempfindliches UDA, das Konfigurationsinformationen aufweist. Das UDA ist ein zeitlich gewichteter Durchschnitt mit Intervallereignissen, die als Stufenfunktion (d. h. jedes Intervall ist bis zum nächsten gültig) interpretiert werden. Ähnlich wie im vorherigen Beispiel wird nicht die ganze Nutzlast als Eingabe erwartet, sondern nur die Werte des Typs double.

Beachten Sie, dass das Eingabeset immer noch als Satz von Intervallereignissen (und nicht wie beim zeitunempfindlichen UDA als Satz von Nutzlasten) definiert ist, obwohl die Ereignisnutzlasten auf double-Werte reduziert werden. Dies ist nötig, damit die Zeitstempel berücksichtigt werden, da das UDA als zeitempfindlich angegeben wurde. Darüber hinaus wird das Fenster selbst in Form eines WindowDescription-Objekts bereitgestellt, das eine Start- und eine Endzeiteigenschaft aufweist. Diese Zeitstempel werden als UTC-Zeit angegeben. Beachten Sie weiterhin, dass UdaConfig eine Klasse oder eine Struktur ist, die über DataContractSerializer serialisierbar sein muss.

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

Wobei für UDAConfig gilt

public class UDAConfig
{
    public double Multiplier { get; set; }
}

Die Erweiterungsmethode schließt jetzt auch die folgende Konfigurationsstruktur ein:

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

Die Konfiguration wird zu einem anderen Parameter in der Erweiterungsmethode:

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

In den bisherigen Beispielen werden Szenarien betrachtet, bei denen das Ereignis eingegeben wird. Dies bedeutet, dass die Nutzlasttypen zum Zeitpunkt der Implementierung des UDA bereits bekannt sind. Im folgenden Beispiel wird ein UDA mit einem generischen Eingabetyp implementiert, wobei der Eingabetyp nur zur Laufzeit in das UDA übergeben wird.

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

Implementieren benutzerdefinierter Operatoren

Der Autor eines UDO hat die folgenden Verantwortungen:

  • Bereitstellung der tatsächlichen Implementierung des UDO.

  • Bereitstellung der Erweiterungsmethode für LINQ, um es einem Abfrageautor zu ermöglichen, den UDO zu verwenden.

Um einen UDO zu implementieren leitet der Benutzer von der geeigneten Basisklasse ab: CepOperator oder CepTimeSensitiveOperator für zeitunempfindliche UDOs. Die Klassenableitung erfordert die Instanziierung des Eingabe- und Ausgabetypparameters. Der Eingabetyp stellt immer die ganze Nutzlast dar. Der Ausgabetyp ist entweder ein Satz von Nutzlasten oder ein Satz von Ereignissen, abhängig von der ausgewählten Basisklasse.

Neben den Ereignisdaten können Sie eine optionale Konfigurationsstruktur zur Abfragestartzeit an den Konstruktor der UDO-Klasse übergeben, wenn dies vom UDO-Autor beabsichtigt ist. Wenn ein Konstruktor vom UDO-Autor bereitgestellt wird, ruft das Modul ihn entsprechend zur Laufzeit auf, mit der in LINQ vom Aufrufer des UDO bereitgestellten Konfiguration.

Sowohl zeitunempfindliche als auch zeitempfindliche UDOs empfangen die Nutzlasten als ungeordneten Satz. Im Fall eines zeitempfindlichen UDO sind jeder Nutzlast zusätzlich die Zeitstempel der Ereignisse zugeordnet. Darüber hinaus wird ein Fensterdeskriptor, der die Start- und Endzeiten für Fenster definiert, an das UDO übergeben.

CTI-Verhalten in benutzerdefinierten Operatoren

UDOs ändern Aktuelle Zeitinkremente (CTI) auf folgende Weise: Wenn ein Fenster noch "offen" ist, das heißt, wenn kein CTI mit einem Zeitstempel nach der Fensterendzeit empfangen wurde, werden alle CTIs, die in dieses Fenster fallen, zur Fensterstartzeit geändert. Dadurch wird sichergestellt, dass sich die Ausgabe des UDO, die potenziell benutzerdefinierte Zeitstempel enthalten kann, solange ändern kann, wie das Fenster noch offen ist.

Beispiel für die Implementierung benutzerdefinierter Operatoren

Im folgenden Beispiel wird ein zeitunempfindlicher UDO implementiert, der nicht über eine Konfigurationsstruktur verfügt.

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

Im folgenden Beispiel wird gezeigt, wie die Signatur in einen zeitempfindlichen UDO geändert wird, der Konfigurationsinformationen annimmt.

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

Beispiel für Erweiterungsmethoden für benutzerdefinierte Operatoren

Zusätzlich zur Implementierung des UDOs muss der UDO-Autor eine Erweiterungsmethode für LINQ bereitstellen, um es dem Abfrageautor zu ermöglichen, den UDO zu verwenden. Die Erweiterungsmethode ist eine Signatur, die es dem Abfrageautor ermöglicht, den Operator zu verwenden und die Abfrage zu kompilieren. Durch ein Attribut kann der LINQ-Anbieter auf die tatsächliche Klasse verweisen, die die UDO-Implementierung enthält, wie im folgenden Beispiel gezeigt wird.

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

Dieser UDO kann nun auf folgende Art verwendet werden.

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

Im folgenden Beispiel werden die Erweiterungsmethode und die Verwendung für einen UDO mit einer Konfigurationsstruktur veranschaulicht, die auf eine Implementierung verweist, die in einer Klasse mit dem Namen SampleUDOwithConfig enthalten ist.

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

Kulturspezifische Ereignisfeldeigenschaften

Erweiterungen wie UDOs, UDAs und UDFs können als Schnittstellen zwischen der CEP-Domäne mit ihrem Typsystem und der .NET-CLR betrachtet werden. Für einige Anwendungen ist die Möglichkeit erwünscht, Kulturinformationen über diese Schnittstelle zu übergeben. Für UDAs und UDOs kann der Erweiterungsautor eine zusätzliche Schnittstelle implementieren (IDeclareEventProperties), die die Überprüfung oder das Festlegen kultureller Eigenschaften für Ereignisfelder ermöglicht. Um diese Schnittstelle zu implementieren, müssen Sie die Funktion DeclareEventProperties bereitstellen, die ein Objekt vom CepEventType zurückgibt, das Kulturinformationen für die Felder enthalten kann, wie im folgenden Beispiel gezeigt:

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

In diesem Beispiel-UDO werden Eingabeereignisse des Typs Input angenommen und Ereignisse des Typs Output erzeugt. Der Typ Output verfügt über Zeichenfolgenfelder, die der UDO-Autor explizit mit bestimmten Kulturinformationen kommentieren möchte. Die Kultur mit dem Namen zh-CN wird für das Ausgabefeld firstName angewendet, wohingegen das Ausgabefeld-location mit der gleichen Kultur kommentiert wird, die dem Feld loc im Eingabeereignistyp des UDO zugeordnet ist. Für jedes Ereignis, das zur Laufzeit vom UDO erzeugt wird, werden diese Kulturen auf die Felder angewendet, bevor das Ereignis in den Ausgabedatenstrom des UDO eingefügt wird.

Die gleiche Schnittstelle ist auch für benutzerdefinierte Aggregate vorhanden. Da Aggregate nur über einen einzelnen Rückgabewert verfügen, schließt die IDeclareEventProperties-Schnittstelle zum Anwenden von kulturspezifischen Informationen auf ein solches Feld den Rückgabewert in einen CepEventType mit einem einzelnen Feld ein, um eine Möglichkeit bereitzustellen, dieses Feld mit CEP-spezifischen Ereigniseigenschaften zu kommentieren.

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

Hier wird die Zeichenfolge, die das Ergebnis des Aggregats darstellt, in einen CepEventType eingeschlossen, damit der UDA-Autor die CultureInfo-Eigenschaft für dieses Feld festlegen kann. Diese Kulturinformationen werden an das eigentliche Ereignisfeld weitergegeben, das das Aggregationsergebnis in der LINQ-Abfrage empfängt, in der das UDA verwendet wird.

Siehe auch

Konzepte

Verwenden von Ereignisfenstern

Andere Ressourcen

Schreiben von Abfragevorlagen in LINQ