Verwenden des Abfragebinders

Ein Abfragebinder ist ein clientseitiges Entwicklungsmodell, das beim Erstellen von StreamInsight-Anwendungen maximale Flexibilität und Wiederverwendbarkeit bietet. In diesem Modell werden Adapter und Abfragevorlagen als separate Metadatenobjekte registriert, die später kombiniert werden können, um eine Abfrage zu instanziieren. Dies gibt dem Entwickler mittels einer expliziten Abfragebindung auf Grundlage der Objektmodell-API die vollständige Kontrolle über die Anwendung und die Entwicklungsumgebung.

Typische Anwendungsfälle für das explizite Serverentwicklungsmodell sind StreamInsight-Anwendungen, die Folgendes erfordern:

  • Vollständige Kontrolle und vollständiger Zugriff auf den StreamInsight-Server.

  • Das Wiederverwenden von Abfragen durch statische oder dynamische Abfragekomposition oder das Wiederverwenden von Adaptern, Ereignistypen und Abfragevorlagen, die von einem Drittanbieter definiert wurden.

Haupteigenschaften des Abfragebinder-Entwicklungsmodells

Das Abfragebindermodell weist folgende Haupteigenschaften auf:

  • Der Entwickler muss alle Metadatenobjekte explizit erstellen und sie auf dem StreamInsight-Server registrieren.

  • Das Modell unterstützt die Erstellung und Verwendung mehrerer Objekte (Abfragevorlagen, Abfragen, Anwendungen und Adapter). Alle Objekte müssen unter einer Anwendung registriert sein.

    Die Abfragevorlage und die Abfrageinstanz müssen explizit beim Server registriert werden, bevor die Abfrage ausgeführt werden kann. Die Eingabe- und Ausgabeadapter müssen explizit registriert werden, sodass die Abfragevorlage oder die Abfrage auf diese Objekte verweisen kann. Darüber hinaus müssen alle Objekte unter einer Anwendung registriert sein. Von Adaptern und Abfragevorlagen verwendete Ereignistypen werden implizit registriert.

Beispiele

Im folgenden Beispiel werden ein StreamInsight-Serverobjekt und ein Anwendungsobjekt mit der Bezeichnung myApp auf dem Server erstellt. Anschließend werden alle notwendigen StreamInsight-Objekte erstellt und registriert, die zum Importieren, Verarbeiten und Exportieren von Ereignisdatenströmen erforderlich sind.

Zuerst werden der Server und das Anwendungsobjekt erstellt.

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Danach werden Eingabe- und Ausgabeadapter in der Anwendung registriert.

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

Die Abfragevorlage wird auf Grundlage eines ungebundenen Datenstroms angegeben. Der einzige Parameter, der zum Erstellen eines ungebundenen Datenstroms erforderlich ist, ist ein Datenstromname, der später für die Adapterbindung benötigt wird.

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • Der letzte Aufruf registriert die Abfragevorlage in der Anwendung. Die registrierte Abfragevorlage kann jetzt in Mehrfachbindungen wiederverwendet werden und so in mehreren Abfragen zu potenziell anderen Eingabe- und Ausgabeadaptern instanziiert werden. Diese Bindungen für registrierte Abfragevorlagen sind durch das QueryBinder-Objekt definiert:
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

Die BindProducer()-Methode bindet ein Eingabeadapterobjekt mit dem angegebenen Namen (muss in der Anwendung registriert sein) an einen Datenstrom, hier "filterInput". So können Sie zwischen mehreren Einstiegspunkten einer Abfragevorlage unterscheiden. Zusammen mit dem Eingabeadapter sind die bindungsspezifischen Parameter (die Adapterkonfiguration und die gewünschte Ereignisform) erforderlich.

Die AddConsumer()-Methode bindet ein Ausgabeadapterobjekt, das in der Anwendung registriert werden muss, an den einzelnen ausgehenden Datenstrom der Abfragevorlage. Der angegebene Ausgabedatenstromname, hier "validated", kann zur Identifikation von Datenstrom zu Diagnosezwecken verwendet werden. Wie beim Eingabeadapter werden die bindenden spezifischen Parameter für den Ausgabeadapter angegeben.

Das Abfrageobjekt wird auf Grundlage des Abfragebinders, eines Abfragebezeichners und einer Textbeschreibung erstellt. Im letzten Schritt wird die Abfrage gestartet.

query.Start();

Abfragen mit mehreren Eingabedatenströmen

Im folgenden Beispiel wird gezeigt, wie eine Abfragevorlage, die mehrere Eingabedatenströme verwendet, erstellt wird. Eine Abfragevorlage kann mehrere Einstiegspunkte aufweisen, die von unterschiedlichen Datenquellen versorgt werden, z. B., wenn zwei Datenströme verknüpft werden müssen. Die richtige Datenstromzuordnung erfolgt durch Angeben des Datenstromnamens, wie im folgenden Beispiel gezeigt.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

Ändern einer vorhandenen Anwendung

Beachten Sie, dass Sie mit den Abfragevorlagen- und Adapterobjekten im Abfragebindermodell arbeiten, ohne sie zwingend in der gleichen Anwendung erstellt zu haben. Im folgenden Beispiel wird von einer Verbindung zu einem vorhandenen Server ausgegangen. Die vorhandenen Metadatenentitäten werden durch die StreamInsight-Objektmodell-API abgerufen und nicht erstellt.

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

Verwenden eines permanenten Metadatenspeichers

Beim Erstellen eines StreamInsight-Servers stellt der Typ des zu verwendenden Metadatenspeichers einen optionalen Parameter der Server.Create()-Methode dar. Standardmäßig werden Metadaten im Arbeitsspeicher gespeichert. Optional können Metadaten auch über eine SQL Server Compact 3.5-Datenbank auf Datenträgern aufbewahrt werden. Im folgenden Beispiel wird gezeigt, wie eine SQL Server Compact 3.5-Datenbank als Metadatenspeicher angegeben wird.

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

Beachten Sie, dass durch Angeben einer vorhandenen Metadaten-Datenbank beim Erstellen des Servers alle Metadaten von der angegebenen Datei abgerufen werden. Die Metadatenentitäten können dann durch das StreamInsight-Objektmodell-API abgerufen werden.

Vollständiges Beispiel

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

Siehe auch

Konzepte

StreamInsight-Serverkonzepte