Erstellen von Abfragen zur Laufzeit

Durch Erstellen von StreamInsight-Abfragen zur Laufzeit erzielen Sie Flexibilität und Wiederverwendbarkeit der Abfragen, eine effiziente Ressourcenverwendung sowie eine einfache Wartung. Folgende Möglichkeiten bieten sich:

  • Bereitstellen des Abfrageergebnisses aus einer Abfrage für andere Abfragen auf demselben Server

  • Nutzen der Ausgaben anderer ausgeführter Abfragen, ähnlich dem Nutzen von Ereignissen von einem Eingabeadapter

Beispiel: Zwei erstellte Abfragen sind vorhanden, wobei Abfrage 1 in Abfrage 2 überführt wird, die in Isolation ausgeführt wurde. Wenn bei Abfrage 1 ein Fehler auftritt, wirkt sich dies nicht auf den Status von Abfrage 2 aus und umgekehrt. Abfrage 1 und Abfrage 2 können unabhängig voneinander gestartet und beendet werden. Beispielsweise können Sie Abfrage 1 beenden, durch eine andere Abfrage ersetzen und erneut starten.

In diesem Thema werden mehrere Anwendungsfälle und Beispiele zum dynamischen Erstellen von Abfragen zur Laufzeit beschrieben.

Wiederverwenden der Ausgabe einer vorhandenen Abfrage

Häufig werden mehrere Abfragen verwendet, wenn eine primäre Abfrage entworfen und bereitgestellt werden muss, mit der Daten vorverarbeitet und an einen Ausgabeadapter gesendet werden, während andere Abfragen das Ergebnis dieser Abfrage nutzen und eigene Ergebnisse an andere Ausgabeadapter senden. Dieses Szenario ist in der folgenden Abbildung dargestellt.

Abfrage 2 verwendet Daten aus Abfrage 1

Im folgenden Beispiel wird eine Abfrage dargestellt, die auf einem StreamInsight-Server in der vorhandenen Anwendung myApp erstellt wurde.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

Zum Streamen der Ergebnisse aus dieser Abfrage in eine zweite Abfrage wird die Query.ToStream()-Methode verwendet. Der mit der Ausgabenutzlast der primären Abfrage übereinstimmende Typ wird als generischer Parameter angegeben, wie im folgenden Beispiel veranschaulicht.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

In diesem Beispiel wird auf den Ausgabedatenstrom der primären Abfrage zugegriffen, und ein Projektionsoperator wird angewendet, um das neue Feld Status einzuführen. Für den zweiten ToQuery()-Aufruf ist kein Anwendungsobjekt mehr erforderlich, da dieses von der primären Abfrage abgeleitet werden kann.

Die ToStream()-Methode akzeptiert ein optionales AdvanceTimeSettings-Objekt, wenn an diesem Punkt aktuelle Zeitinkremente (Current Time Increments, CTI) eingefügt werden müssen. Durch Einfügen von CTIs können Sie die Aktualität bestimmter Abfragekonfigurationen erhöhen.

Die Art der Erstellung für das primäre Abfrageobjekt ist dabei nicht relevant. Das vorherige Modell zeigt ein Beispiel für die Verwendung der CepStream.ToQuery()-API. Zum Erstellen der Abfrage stehen folgende weitere Möglichkeiten zur Verfügung:

  • Durch einen Abfragebinder. Beispiel: myApp.CreateQuery("filterQuery", queryBinder, "description");.

  • Durch Aufrufen über die Objektmodell-API vom Server. Beispiel: myApp.Queries["filterQuery"].

Nicht gebundene Abfrageausgabe

Im vorherigen Beispiel wird gezeigt, wie das Ergebnis einer vorhandenen Abfrage wiederverwendet wird, deren Ausgabe bereits an einen Ausgabeadapter gebunden ist. Abfragen können jedoch auch einen ungebundenen Ausgabedatenstrom aufweisen, sodass eine Ausgabe nur erzeugt wird, wenn das Ergebnis von einer oder mehreren anderen Abfragen genutzt wird. Dieses Szenario ist in der folgenden Abbildung dargestellt.

Abfrage 1 verfügt über einen ungebundenen Abfragedatenstrom

Dies erzielen Sie mit einer Überladung von CepStream.ToQuery(), die keinen Adapter erfordert:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

Diese Abfrage kann gestartet werden. Eine zweite Abfrage kann später deren Ergebnisdatenstrom nutzen, indem sie diesen angibt, wie im vorherigen Beispiel für die Abfrage validationQuery gezeigt. Wenn kein Consumer vorhanden ist, werden die Ergebnisse der primären Abfrage gelöscht.

Mit diesem Muster können Sie zudem die Ergebnisse einer Abfrage an mehrere Ausgabeadapter streamen. Im einfachsten Fall erzielen Sie dies mit Pass-Through-Abfragen, die zusätzlich zu einer nicht gebundenen Abfrage ausgeführt werden, wobei jede für einen der Ausgabeadapter vorgesehen ist (Abfragen 2 und 3 in der obigen Abbildung).

Veröffentlichte Datenströme

In den Beispielen wurde bisher das eigentliche Abfrageobjekt verwendet, um einen neuen Eingabedatenstrom für eine andere Abfrage zu erstellen. Um die clientseitigen Objekte zu abstrahieren, können Sie den veröffentlichten Datenstrom-URI als Eingabe für eine oder mehrere andere Abfragen verwenden, wie in der folgenden Abbildung gezeigt.

Abfragen, die einen veröffentlichten Datenstrom als Eingabe verwenden

Jede Abfrage verfügt über einen Standard-URI für den veröffentlichten Datenstrom, der dem Abfragenamen selbst entspricht. Darüber hinaus können Sie der Abfrage durch das entsprechende Element der CepStream-Klasse explizit einen benutzerdefinierten Namen für den veröffentlichten Datenstrom zuweisen.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

Damit erstellen Sie eine Abfrage mit einer nicht gebundenen, aber explizit benannten Ausgabe. Beachten Sie, dass die Namen veröffentlichter Datenströme die folgende Konvention einhalten müssen: "<application_name>/PublishedStream/<stream_name>".

Andere Abfragen können nun auf diesen URI als Eingabedatenstrom verweisen, wie im folgenden Beispiel gezeigt.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Beachten Sie, dass der Consumer des veröffentlichten Datenstroms die Eingabeereignisform angeben muss, die mit einer Ausgabeform der Abfrage, auf die verwiesen wird, übereinstimmen muss.

Eine über den Namen eines veröffentlichten Datenstroms hergestellte Verbindung mit einer primären Abfrage ist weniger zuverlässig als eine Verbindung über das Abfrageobjekt. Daher muss beim Definieren der sekundären Abfrage eine Anwendung angegeben werden:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Adapter für veröffentlichte Datenströme

Wenn Sie Adapter einer erstellten Abfrage abrufen (z. B. über Query.InputStreamBindings), werden für die Verbindung besondere integrierte Adapter verwendet. Zusätzlich zu diesen integrierten Adaptern wird die Funktionalität zum Erstellen von Abfragen über CepStream.ToQuery, Query.ToStream() usw. über komfortable Oberflächen bereitgestellt. Sie können auch explizit wie gewöhnliche Adapter verwendet werden und verfügen dann über eine eigene Konfigurationsstruktur, die den Namen des veröffentlichten Datenstroms enthält, wie im folgenden Beispiel gezeigt:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Ebenso kann für eine Abfrage der Ausgabeadapter für veröffentlichte Datenströme verwendet werden, der die gleiche Funktionalität wie CepStream.toPublishedStreamQuery() aufweist:

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Verwenden des Abfragebinders

Das Abfragebinder-Entwicklungsmodell ermöglicht den Vollzugriff über die verschiedenen StreamInsight-Metadatenobjekte und trennt die Abfragebindung und -verwendung deutlich von der Entwurfsphase für Abfragevorlagen. Dieses Modell ermöglicht ebenfalls das dynamische Erstellen von Abfragen, sowohl auf der Eingabebindungs- als auch der Ausgabebindungsseite. Weitere Informationen finden Sie unter Verwenden des Abfragebinders.

Binden an eine andere Abfrage als Eingabe

Ebenso wie eine Abfragevorlage mit dem Abfragebinder an einen Eingabeadapter als Ereignisproducer gebunden werden kann, kann sie auch an eine vorhandene Abfrage gebunden werden. Angenommen, wie im ersten Beispiel ist eine primäre Abfrage (mit gebundener oder nicht gebundener Ausgabe) vorhanden.

var query = filtered.ToQuery(myApp, ...);

Dann kann der Abfragebinder wie folgt verwendet werden, wobei in der richtigen Überladung von BindProducer auf die vorherige Abfrage verwiesen wird.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

Der Abfragebinder kann auch auf einen veröffentlichten Datenstrom als Ereignisproducer verweisen.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Wie bei der Query.ToStream()-Signatur kann in BindProducer() ein optionales AdvanceTimeSettings-Objekt angegeben werden.

Binden an einen veröffentlichten Datenstrom als Ausgabe

Auf der Ausgabeseite ermöglicht der Abfragebinder das Streaming in einen explizit definierten veröffentlichten Datenstrom.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Sobald die Abfrage, die auf diesem Abfragebinder basiert, gestartet wurde, können andere Abfragen, wie in den vorherigen Beispielen beschrieben, eine Bindung an einen veröffentlichten Datenstrom herstellen und dessen Ergebnisereignisse nutzen.

Binden an Adapter für veröffentlichte Datenströme

Adapter für veröffentlichte Datenströme können auch im Abfragebindermodell verwendet werden. Sie können aus dem Anwendungsobjekt abgerufen und in BindProducer und AddConsumer wie gewöhnliche Adapter verwendet werden:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Siehe auch

Konzepte

StreamInsight-End-to-End-Beispiel

Vorlauf der Anwendungszeit