StreamInsight-End-to-End-Beispiel

In diesem Thema werden die verschiedenen Komponenten und Schritte zum Erstellen von StreamInsight-Anwendungen sowie ein End-to-End-Beispiel für eine Anwendung beschrieben. In einer StreamInsight-Anwendung werden Ereignisquellen, Ereignissenken und Abfragen kombiniert, um ein komplexes Ereignisverarbeitungsszenario zu implementieren. Die StreamInsight-API bietet eine Vielzahl von Schnittstellen, um verschiedene Steuer- und Komplexitätsebenen beim Erstellen und Warten von Ereignisverarbeitungsanwendungen zu unterstützen. 

Die kleinste Einheit einer Anwendungsbereitstellung ist eine Abfrage, die gestartet und beendet werden kann. Die folgende Abbildung zeigt eine Methode zum Erstellen einer Abfrage. Die Ereignisquelle wird durch einen Eingabeadapter dargestellt. Der Adapter übergibt einen Datenstrom aus Ereignissen an die Operatorstruktur, die die gewünschte Abfragelogik darstellt (vom Designer in Form einer Abfragevorlage angegeben). Der verarbeitete Ereignisdatenstrom führt dann in eine Ereignissenke, in der Regel ein Ausgabeadapter.

Abfrage mit Eingabe- und Ausgabeadaptern

Entwickler, die nicht mit komplexer Ereignisverarbeitungsterminologie vertraut sind, sollten sich unter StreamInsight-Serverkonzepte und StreamInsight-Serverarchitektur informieren.

Anwendungsprozess

In diesem Abschnitt werden die typischen Schritte zum Erstellen einer End-to-End-Anwendung durchgegangen.

Instanziieren einer Serverinstanz und einer Anwendung

Der Prozess beginnt mit der Instanziierung einer StreamInsight-Serverinstanz und einer Anwendung.

server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Ein Server muss mit einem Instanznamen erstellt werden, der durch den StreamInsight-Installationsvorgang (im vorherigen Beispiel MyInstance) auf dem Computer registriert wurde. Weitere Informationen finden Sie unter Installation (StreamInsight).

Eine Anwendung stellt eine Definitionseinheit auf dem Server dar, die andere Metadatenentitäten enthält.

Im vorherigen Beispiel wird eine Serverinstanz im gleichen Prozess erstellt. Eine andere häufig verwendete Bereitstellung ist jedoch, eine Verbindung mit einem Remoteserver herzustellen und dort an einer vorhandenen Anwendung zu arbeiten. Das folgende Beispiel zeigt, wie eine Verbindung mit einem Remoteserver hergestellt und auf eine vorhandene Anwendung zugegriffen wird.

server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];

Weitere Informationen zum lokalen Server und Remoteserver finden Sie unter Veröffentlichen auf dem StreamInsight-Server und Herstellen einer Verbindung mit dem Server.

Erstellen eines Eingabedatenstroms

Danach wird ein Eingabedatenstrom auf Grundlage einer vorhandenen Adapterimplementierung erstellt. Genauer gesagt, muss die Adapterfactory wie im folgenden Beispiel dargestellt angegeben werden.

var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

Hiermit wird ein CepStream-Objekt erstellt, das einen Ereignisdatenstrom darstellt, der (sobald die Abfrage gestartet wird) von einem durch die angegebene Factoryklasse instanziierten Adapter erzeugt wird. Dem Datenstrom wird ein Name zugewiesen, der später verwendet werden kann, um datenstromspezifische Diagnosen abzurufen. Darüber hinaus wird eine Instanz der Konfigurationsstruktur für die Adapterfactory bereitgestellt. Die Konfigurationsstruktur übergibt laufzeitspezifische Informationen an die Factory sowie die gewünschte Ereignisform (Ereignismodell). Weitere Informationen darüber, wie die Factory diese Parameter verwendet, finden Sie unter Erstellen von Eingabe- und Ausgabeadaptern.

Definieren der Abfrage

Das CepStream-Objekt wird als Grundlage für die Definition der eigentlichen Abfragelogik verwendet. Die Abfrage verwendet LINQ als Abfragespezifikationssprache:

var filtered = from e in inputstream
               where e.Value > 95
               select e;

In diesem Beispiel wird davon ausgegangen, dass die im vorherigen Beispiel zum Erstellen des Eingabedatenstrom-Objekts definierte Klasse oder Struktur mit dem Namen MyDataType ein Feld mit dem Namen Value enthält. Diese Definition wird in einen Filteroperator übersetzt, der alle Ereignisse aus dem Datenstrom löscht, die das Filterprädikat where e.Value > 95 nicht erfüllen. Weitere Informationen zu LINQ-Abfrageoperatoren finden Sie unter Schreiben von Abfragevorlagen in LINQ.

Erstellen eines Ausgabeadapters

Zu diesem Zeitpunkt lautet der Typ der Variable filtered immer noch CepStream. So kann der Datenstrom in eine Abfrage umgewandelt werden, die gestartet werden kann. Zum Erzeugen einer Abfrageinstanz, die gestartet werden kann, muss ein Ausgabeadapter angegeben werden, wie im folgenden Beispiel dargestellt.

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

Wie beim Eingabedatenstrom ist für den Ausgabeadapter das Angeben einer Ausgabeadapterfactory, eines Konfigurationsobjekts, der gewünschten Ausgabedatenstromform und einer temporären Reihenfolge erforderlich.

Durch die Ereignisformspezifikation wird bei der Abfrageausgabe die jeweilige Ereignisform sichergestellt:

  1. EventShape.Point: Jede Ergebnisereignislebensdauer wird auf ein Punktereignis reduziert.

  2. EventShape.Interval: Jedes Ergebnisereignis wird als Intervallereignis interpretiert. Das bedeutet, dass die Ausgabe nur erfolgt, wenn für die vollständige Lebensdauer von einem aktuellen Zeitinkrementereignis (CTI) ein Commit ausgeführt wird.

  3. EventShape.Edge: Jedes Ergebnisereignis wird als Edge-Ereignis interpretiert. Das heißt, die Startzeit wird als Start-Edge ausgegeben und die Endzeit als entsprechender End-Edge.

Der Parameter für die Reihenfolge der Datenstromereignisse hat Auswirkungen auf die Aktualität von Intervallereignis-Ausgabedatenströmen. FullyOrdered bedeutet, dass Intervallereignisse immer in der Reihenfolge ihrer Startzeiten ausgegeben werden, während ChainOrdered eine Ausgabesequenz erzeugt, die nach den Endzeiten der Intervalle sortiert wird.

Außerdem muss ein Anwendungsobjekt als erster Parameter bereitgestellt werden, der jetzt die Abfrage enthält, sowie ein Abfragename und eine Beschreibung, durch die die Abfrage im Metadatenspeicher weiter identifiziert wird.

Starten der Abfrage

Im letzten Schritt wird die Abfrage gestartet. In diesem Beispiel wird die Abfrage durch einen Tastaturanschlag des Benutzers beendet.

query.Start();

Console.ReadLine();

query.Stop();

In diesem End-to-End-Beispiel wird gezeigt, wie eine implizite Bindung einer Ereignisquelle mit einer Abfragevorlage durch die CepStream.Create()-Überladung und die ToQuery()-Überladung verwendet wird, um schnell eine funktionierende Abfrage zu erstellen. Informationen zur expliziteren Kontrolle bei der Bindung von CEP-Objekten finden Sie unter Verwenden des Abfragebinders.

Vollständiges Beispiel

Im folgenden Beispiel werden die oben beschriebenen Komponenten kombiniert, um eine vollständige Anwendung zu erstellen.

Server server = null;

using (Server server = Server.Create(”MyInstance”))
{
    try
    {
        Application myApp = server.CreateApplication("MyApp");

        var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                       typeof(MyInputAdapterFactory),
                                                       new InputAdapterConfig { someFlag = true },
                                                       EventShape.Point);

        var filtered = from e in inputstream
                       where e.Value > 95
                       select e;

        var query = filtered.ToQuery(myApp,
                                     "filterQuery",
                                     "Filter out Values over 95",
                                     typeof(MyOutputAdapterFactory),
                                     new OutputAdapterConfig { someString = "foo" },
                                     EventShape.Point,
                                     StreamEventOrder.FullyOrdered);

        query.Start();
        Console.ReadLine();
        query.Stop();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Siehe auch

Konzepte

Verwenden des Abfragebinders