Aggregationen

In den Beispielen in diesem Thema wird gezeigt, wie Aggregationsvorgänge über Ereignisfenster ausgeführt werden. Folgende Aggregationsfunktionen werden unterstützt:

  • Durchschnitt über ein numerisches Nutzlastfeld (avg)

  • Summe über ein Nutzlastfeld (sum)

  • Minimum über einem Nutzlastfeld (min).

  • Maximum über einem Nutzlastfeld (max).

  • Anzahl von Ereignissen (count)

Aggregationen sind satzbasierte Vorgänge. Das heißt, sie führen Berechnungen über Teilmengen von Daten aus. Diese Teilmengen werden als Ereignisfenster angegeben, wobei Ereignisse entlang der Zeitachse gruppiert werden. Nach dieser Definition können Aggregationen nur auf Fenster und nicht auf gewöhnliche Ereignisdatenströme angewendet werden. Sie werden als Erweiterungsmethoden für CepWindowStream<T> bereitgestellt. Weitere Informationen zu Fenstern finden Sie unter Verwenden von Ereignisfenstern.

Mithilfe der gleichen Syntax wie für einen Projektionsvorgang werden die Ergebnisse der Aggregationsvorgänge zu Skalarwert-Nutzlastfeldern. Aggregationen über Fenster werden inkrementell berechnet. Das bedeutet, dass vorherige Aggregationsergebnisse aktualisiert werden, wenn ein Ereignis im Aggregationsfenster eintrifft oder dieses verlässt. Beachten Sie, dass sich dies auf die Zahlengenauigkeit auswirken kann.

avg, sum, min und max akzeptieren einen Eingabeparameter, der den zu aggregierenden Wert darstellt. Normalerweise ist dies ein Verweis auf ein Ereignisfeld. Der Eingabeparameter wird als Lambda-Ausdruck angegeben, wie in den folgenden Beispielen dargestellt. Das count-Aggregat zählt vollständige Ereignisse im Fenster und verfügt daher über keinen Parameter.

Die Behandlung von Nullwerten ist konsistent mit gleichen Funktionen in Transact-SQL:

  • In sum wird NULL als 0 (Null) behandelt.

  • In min sind alle Werte kleiner als NULL.

  • In max sind alle Werte größer als NULL.

  • In TopK ist NULL immer kleiner als alle anderen Werte, wenn Ereignisse einem Nutzlastfeld entsprechend geordnet werden.

Die Funktionen für Minimum und Maximum können für alle Typen verwendet werden, die einen Vergleich implementieren, beispielsweise numerische, lexikografische (Zeichenfolge) oder temporäre (datetime) Typen.

Beispiele

Die folgenden Beispiele zeigen, wie Aggregatfunktionen auf Ereignisse angewendet werden, die in Springende Fenster und Momentaufnahmefenster definiert wurden. Beachten Sie, dass in dieser Version Anzahlfenster nicht mit integrierten Aggregaten oder TopK verwendet werden können.

A. Aggregation über ein springendes Fenster

Ein springendes Fenster wird im Zeitverlauf in einem regelmäßigen Zeitraum und mit regelmäßiger Frequenz wiederholt. Beispielsweise entspricht das Aggregieren von Ereignisdaten innerhalb einer Stunde mit einer alle fünf Minuten erfolgenden Neuberechnung einem springenden Fenster mit einer Fenstergröße von 1 Stunde und einer Sprunggröße von 5 Minuten, wie im folgenden Beispiel gezeigt.

// Assuming the following input event type for inputStream:
public class MyPayload 
{ 
    public int i; 
    public float f; 
}
var avgHourly = from win in inputStream.HoppingWindow(TimeSpan.FromHours(1), TimeSpan.FromMinutes(5))
                  select new { hourlyavg = win.Avg(e => e.f) }; 

Der Fensteroperator wird auf den Eingabedatenstrom angewendet und ergibt einen CepWindowStream<T>. Jedes win-Element in diesem Datenstrom stellt ein Fenster dar, das Ereignisse enthält.

Im folgenden Beispiel werden Ereignisse innerhalb jeder vollen Stunde gezählt. Dabei wird ein rollierendes Fenster verwendet. Dies ist die vereinfachte Version eines springenden Fensters, bei der die Sprunggröße gleich der Fenstergröße ist.

var countHourly = from win in hourStream.TumblingWindow(TimeSpan.FromHours(1))
                  select new { count = win.Count() };

B. Aggregation über ein Momentaufnahmefenster

Im folgenden Beispiel wendet die from-Klausel ein Momentaufnahmefenster auf den Datenstrom inputStream an. Im Beispiel werden die Sum-Aggregationsergebnisse dem Nutzlastfeld e.i zugewiesen, und die Avg-Aggregationsergebnisse werden auf Grundlage des Nutzlastfelds e.f berechnet. Außerdem wird das COUNT-Aggregat verwendet. Im Beispiel wird auch gezeigt, wie mehrere Aggregationen in der gleichen Anweisung kombiniert werden. Sie werden alle in Bezug auf das gleiche Fenster berechnet.

// Assuming the following input event type for inputStream:
public class MyPayload 
{ 
    public int i; 
    public float f; 
}

// Multiple aggregations over a snapshot window
var snapshotAgg = from w in inputStream.Snapshot()
                  select new { sum = w.Sum(e => e.i),
                               avg = w.Avg(e => e.f),
                               count = w.Count() };

Sie können Aggregate in komplexe Ausdrücke einbetten, und ein Ausdruck kann mehr als ein Aggregat enthalten, wie im folgenden Beispiel gezeigt.

var result = from w in inputStream.Snapshot()
             select new { ratio = w.Sum(e => e.i) / w.Sum(e => e.f) };

Siehe auch

Konzepte

Zeitstempeländerungen

Verwenden von Ereignisfenstern

Springende Fenster

Anzahlfenster

Momentaufnahmefenster