Utilisation d'un module de liaison de requête

Un module de liaison de requête est un modèle de développement côté client qui fournit le niveau maximal de flexibilité et de possibilités de réutilisation lors de la création d'applications StreamInsight. Dans ce modèle, les adaptateurs et les modèles de requête sont enregistrés en tant qu'objets de métadonnées distincts qui peuvent être liés ultérieurement pour instancier une requête. Cela donne au développeur un contrôle total sur son environnement de développement et d'application grâce à une liaison de requête explicite sur l'API du modèle objet.

Les cas d'usage par défaut pour le modèle de développement de serveur explicite incluent des applications StreamInsight qui requièrent :

  • Le contrôle total et l'accès au serveur StreamInsight.

  • La réutilisation des requêtes via la composition de requête statique ou dynamique, ou la réutilisation d'adaptateurs, types d'événements et modèles de requêtes définis par un tiers.

Principales caractéristiques du modèle de développement du module de liaison de requête

Le modèle du module de liaison de requête a les caractéristiques clés suivantes :

  • Le développeur doit créer tous les objets de métadonnées explicitement et les enregistrer sur le serveur StreamInsight.

  • Le modèle prend en charge la création et l'utilisation de plusieurs objets (modèles de requête, requêtes, applications et adaptateurs). Tous les objets doivent être inscrits sous une application.

    Le modèle de requête et l'instance de requête doivent être inscrits explicitement par le serveur avant que la requête puisse être exécutée. Les adaptateurs d'entrée et de sortie doivent être inscrits explicitement pour le modèle de requête ou la requête afin de référencer ces objets. De plus, tous les objets doivent être inscrits sous une application. Les types d'événement utilisés par les adaptateurs et les modèles de requête sont enregistrés implicitement.

Exemples

L'exemple suivant crée un objet serveur StreamInsight et un objet d'application nommé myApp sur le serveur. Il crée et enregistre ensuite tous les objets StreamInsight nécessaires à l'importation, au traitement et à l'exportation des flux d'événements.

En premier lieu, le serveur et l'objet d'application sont créés.

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Ensuite, les adaptateurs d'entrée et de sortie sont enregistrés dans l'application.

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

Le modèle de requête est spécifié sur un flux de données indépendant. Le seul paramètre requis pour créer un flux de données indépendant est un nom de flux de données, qui sera exigé ultérieurement pour la liaison de l'adaptateur.

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • Le dernier appel enregistre le modèle de requête dans l'application. Le modèle de requête enregistré peut maintenant être réutilisé dans plusieurs liaisons et, par conséquent, peut être instancié dans plusieurs requêtes, chacune étant liée à des adaptateurs d'entrée et de sortie potentiellement différents. Les liaisons de ces modèles de requête enregistrés sont définies via l'objet QueryBinder :
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

La méthode BindProducer() lie un objet d'adaptateur d'entrée (enregistré dans l'application) à un flux de données avec le nom spécifié, ici « filterInput ». Cela vous permet de différencier plusieurs points d'entrée d'un modèle de requête. Avec l'adaptateur d'entrée, les paramètres spécifiques à la liaison (configuration d'adaptateur et forme d'événement voulue) sont obligatoires.

La méthode AddConsumer() lie un objet d'adaptateur de sortie (enregistré dans l'application) au flux de données sortant unique du modèle de requête. Le nom du flux de sortie fourni, ici « validated », peut être utilisé pour identifier le flux de données à des fins de diagnostic. Comme avec l'adaptateur d'entrée, les paramètres spécifiques à la liaison sont fournis pour l'adaptateur de sortie.

L'objet de requête est créé en fonction du module de liaison de requête, d'un identificateur de requête et d'une description textuelle. La dernière étape consiste à démarrer la requête.

query.Start();

Requêtes avec plusieurs flux d'entrée

L'exemple suivant indique comment créer un modèle de requête qui utilise plusieurs flux d'entrée. Un modèle de requête peut avoir plusieurs points d'entrée, chacun alimenté par une source de données différente, c'est le cas, par exemple, lorsque deux flux de données doivent être joints. L'association du flux de données approprié est spécifiée par le nom du flux de données, comme indiqué dans l'exemple suivant.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

Modification d'une application existante

Notez que vous pouvez travaillez dans le modèle du module de liaison de requête avec le modèle de requête et les objets d'adaptateur sans qu'ils aient été nécessairement créés dans la même application. L'exemple suivant suppose une connexion à un serveur existant et récupère les entités des métadonnées existantes via l'API du modèle objet StreamInsight, au lieu de les créer.

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

Utilisation d'un magasin des métadonnées rendu persistant

Lors de la création d'un serveur StreamInsight, un paramètre optionnel de la méthode Server.Create() correspond au type de magasin des métadonnées à utiliser. Par défaut, les métadonnées sont stockées en mémoire. Éventuellement, les métadonnées peuvent être rendues persistantes sur le disque via une base de données SQL Server Compact 3.5. L'exemple suivant indique comment spécifier une base de données SQL Server Compact 3.5 comme magasin des métadonnées.

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

Notez qu'en spécifiant une base de métadonnées existante lors de la création du serveur, toutes les métadonnées du fichier spécifié seront lues. Les entités des métadonnées peuvent ensuite être récupérées via l'API du modèle objet StreamInsight.

Exemple complet

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

Voir aussi

Concepts

Concepts du serveur StreamInsight