End-to-End-Beispiel für Enumerable-Ereignisquelle und -senke (StreamInsight)

In diesem einfachen End-to-End-Beispiel wird die Verwendung einer Ereignisquelle und -senke dargestellt, von denen die IEnumerable-Schnittstelle zur Erstellung einer vollständigen StreamInsight-Anwendung verwendet wird:

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

In diesem Beispiel wird StreamInsight in Verbindung mit SQL Server und dem ADO.NET Entity Framework verwendet, um auf eine zeitbezogene Abfrage von Vergangenheitsdaten aus der Northwind-Beispieldatenbank zu reagieren. Von der Abfrage werden Zeitintervalle ermittelt, in denen mehr als drei Bestellungen in einer Region aktiv waren.

In diesem Beispiel wird eine IEnumerable-Ereignisquelle verwendet. Die Schritte entsprechen den Schritten zum Verwenden einer Ereignisquelle, von der das IObservable-Element implementiert wird. Zwar werden von einer Observable-Ausgabe die Daten an das Observer-Objekt geleitet, allerdings müssen die Daten nicht vom Consumer-Objekt abgerufen werden, wie dies beim Aufruf von foreach für eine Enumerable-Quelle der Fall wäre.

Schritt 1: Bereitstellen einer Observable- oder Enumerable-Ereignisquelle

Definieren Sie zunächst die Quelldaten für die Abfrage, indem Sie eine LINQ-to-Entities-Abfrage für die Northwind-Datenbank ausgeben. Im Ergebnis (databaseQuery) wird standardmäßig die IEnumerable-Schnittstelle implementiert.

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

Schritt 2: Transformieren der Eingabe in einen Datenstrom und Beschreiben der zeitbezogenen Merkmale des Datenstroms

Transformieren Sie nun das Ergebnis der Abfrage in einen Datenstrom aus Intervallereignissen:

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

In diesem Code wird ein Helfer für die AdvanceTimeSettings-Klasse (IncreasingStartTime) verwendet, um nach allen Ereignissen mit einer delay von 0 (NULL) CTI-Ereignisse einzufügen. Alternativ können Sie StrictlyIncreasingStartTime verwenden, um ein delay von -1 Takt anzugeben, wodurch das CTI einen Takt nach der Startzeit des Ereignisses verschoben wird, oder um UnorderedTimestamps anzugeben, um einen benutzerdefinierten Wert für das delay anzugeben.

Anschließend werden von der CreateInsert-Methode der IntervalEvent-Klasse die Quelldaten in einen Ereignisdatenstrom konvertiert, indem "OrderDate" als die Startzeit des Intervalls, "ShippedDate" als die Endzeit des Intervalls und "ShipRegion" als Nutzlast für das Ereignis bereitgestellt werden.

Eine analoge CreateInsert-Methode wird durch die PointEvent-Klasse bereitgestellt, während die EdgeEvent-Klasse über die CreateStart-Methode und die CreateEnd-Methode verfügt. Alle drei Ereignisklassen verfügen über eine CreateCti-Methode zum prozeduralen Einfügen von CTI-Ereignissen, anstatt durch die deklarative Verwendung des AdvanceTimeSettings-Elements.

Schritt 3: Schreiben einer Abfrage

Als Nächstes wird die zeitberücksichtigende StreamInsight-Abfrage geschrieben, die für die eingehenden Ereignisdatenströme verwendet werden soll:

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

Durch die Abfrage werden die Ereignisse in jedem Zeitintervall nach Region gruppiert. Anschließend werden nur die Intervalle ausgewählt, in denen mehr als drei Ereignisse aktiv waren. Die Ergebnisse werden in einen neuen Datenstrom mit einer völlig unterschiedlichen Nutzlast projiziert, die die Anzahl der aktiven Bestellungen und die Bezeichner für das ShipRegion-Element enthalten.

Weitere Informationen zum Erstellen von Abfragevorlagen finden Sie unter:

Schritt 4: Konvertieren der Abfrageausgabe in eine Observable- oder Enumerable-Ereignissenke

Jetzt wird der Ausgabedatenstrom der Abfrage in ein Enumerable-Ergebnis transformiert:

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

Mit dieser Abfrage werden die CTI-Ereignisse gefiltert und nur die Einfügeereignisse in einen Enumerable-Datenstrom von Intervallereignissen projiziert. Ein neuer anonymer Typ mit vier Feldern enthält die Nutzlast der Ereignisse.

Zusätzlich zur ToIntervalEnumerable-Methode enthalten verknüpfte Erweiterungsmethoden folgende Methoden:

  • ToPointEnumerable und ToEdgeEnumerable

  • ToPointObservable, ToIntervalObservable und ToEdgeObservable

Diese Methode geben ICepEnumerable-Schnittstellen oder ICepObservable-Schnittstellen zurück, von denen die grundlegenden IEnumerable-Schnittstellen und IObservable-Schnittstellen erweitert werden, indem ein Abfragename und eine Abfragebeschreibung zur Ermittlung der Abfrage für die Verwaltung oder zum Debuggen bereitgestellt werden.

Die ICepEnumerable-Schnittstelle oder die ICepObservable-Schnittstelle stellen ebenfalls Helfermethoden bereit, von denen Ausgabeereignisse durch Auswahl (Where) oder Projektion (Select) gefiltert werden. Beispiel:

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

Schritt 5: Verwenden der Ausgabe

Schließlich werden die Ergebnisse der Abfrage verwendet. Aufgrund des verzögerten Auswertungsmodells der typischen LINQ-Anbieter werden Abfragen erst ausgewertet, wenn vom Consumer die Ergebnisse aufgezählt oder beobachtet werden:

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}