Exemple StreamInsight de bout en bout

Cette rubrique décrit les différents composants et étapes impliqués dans la création d'une application StreamInsight et inclut l'exemple de bout en bout d'une application. Une application StreamInsight combine des sources d'événement, des récepteurs d'événement et des requêtes pour implémenter un scénario de traitement des événements complexes. L'API StreamInsight offre plusieurs interfaces prenant en charge différents niveaux de contrôle et de complexité pour la création et la maintenance d'applications qui traitent des événements. 

La plus petite unité d'un déploiement d'application est une requête, qui peut être démarrée et arrêtée. L'illustration suivante montre l'une des façons possibles de générer une requête. La source d'événement est représentée par un adaptateur d'entrée. L'adaptateur introduit un flux de données d'événements dans l'arborescence des opérateurs, qui représente la logique de la requête voulue, spécifiée par le concepteur sous la forme d'un modèle de requête. Le flux d'événements traité arrive ensuite dans un récepteur d'événements, en général un adaptateur de sortie.

Requête avec adaptateurs d'entrée et de sortie

Les développeurs qui ne sont pas familiarisés avec la terminologie de traitement des événements complexes doivent lire Concepts du serveur StreamInsight et Architecture du serveur StreamInsight.

Processus de l'application

Cette section décrit les étapes typiques de création d'une application de bout en bout.

Instancier une instance de serveur et une application

Le processus démarre avec l'instanciation d'une instance de serveur StreamInsight et d'une application.

server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Un serveur doit être créé et un nom d'instance doit lui être attribué et enregistré sur l'ordinateur via le processus d'installation StreamInsight (dans l'exemple précédent, MyInstance). Pour plus d'informations, consultez Installation (StreamInsight).

Une application représente une unité de portée dans le serveur qui contient d'autres entités de métadonnées.

L'exemple précédent crée une instance de serveur dans le même processus. Toutefois, un autre déploiement commun consiste à se connecter à un serveur distant et à travailler sur une application existante. L'exemple suivant indique comment se connecter à un serveur distant et accéder à une application existante.

server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];

Pour plus d'informations sur les serveurs locaux et distants, consultez Publication et connexion au serveur StreamInsight.

Créer un flux d'entrée.

Ensuite, un flux d'entrée est créé sur une implémentation d'adaptateur existante. Pour être précis, la fabrique d'adaptateurs doit être spécifiée comme indiqué dans l'exemple suivant.

var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

Cela crée un objet CepStream, représentant un flux d'événements, qui est produit (une fois la requête démarrée) par un adaptateur instancié via la classe de fabrique donnée. Un nom, qui peut être utilisé ultérieurement pour récupérer les diagnostics spécifiques au flux de données, est attribué au flux de données. De plus, une instance de la structure de configuration de la fabrique d'adaptateurs est fournie. La structure de configuration passe les informations spécifiques à l'exécution à la fabrique, ainsi que la forme d'événement désirée (modèle d'événement). Pour plus d'informations sur la façon dont la fabrique utilise ces paramètres, consultez Création d'adaptateurs d'entrée et de sortie.

Définir la requête

L'objet CepStream est utilisé comme base pour la définition de la logique de la requête actuelle. La requête utilise LINQ comme langage de spécification de requête :

var filtered = from e in inputstream
               where e.Value > 95
               select e;

Dans cet exemple, nous supposons que la classe ou la structure nommée MyDataType définie dans l'exemple précédent pour créer l'objet de flux d'entrée, contient un champ nommé Value. Cette définition se traduit en un opérateur de filtre qui supprime tous les événements du flux de données qui ne remplissent pas le prédicat de filtre where e.Value > 95. Pour plus d'informations sur les opérateurs de requête LINQ, consultez Écriture de modèles de requête dans LINQ.

Créer un adaptateur de sortie

À ce stade, le type de la variable filtered est toujours CepStream. Cela permet de transformer le flux de données en une requête qui peut être démarrée. Pour produire une instance de requête pouvant être démarrée, un adaptateur de sortie doit être spécifié, comme indiqué dans l'exemple suivant.

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

Comme pour le flux d'entrée, l'adaptateur de sortie requiert la spécification d'une fabrique d'adaptateurs de sortie, un objet de configuration, la forme voulue du flux de sortie et un classement temporel.

La spécification de la forme de l'événement garantit la forme d'événement respective au niveau du résultat de la requête :

  1. EventShape.Point : toute durée de vie d'événement de résultat est réduite à un événement point.

  2. EventShape.Interval : tout événement de résultat est interprété comme un événement d'intervalle. Autrement dit, il est fourni uniquement si sa durée de vie complète est validée par un événement CTI (Current Time Increment).

  3. EventShape.Edge : tout événement de résultat sera interprété comme un événement session. Autrement dit, son heure de début est fournie comme session de début, et son heure de fin comme session de fin correspondante.

Le paramètre d'ordre d'événement de flux affecte la dynamique des flux de sortie d'événement. FullyOrdered signifie que les événements d'intervalle sont toujours fournis dans l'ordre de leurs heures de début, tandis que ChainOrdered produit une séquence de sortie ordonnée selon les heures de fin d'intervalle.

De plus, un objet d'application doit être fourni comme premier paramètre, qui contient maintenant la requête, et un nom et une description de la requête, qui identifient plus précisément cette requête dans le magasin des métadonnées.

Démarrer la requête

La dernière étape consiste à démarrer la requête. Dans cet exemple, la requête est arrêtée par une frappe fournie par l'utilisateur.

query.Start();

Console.ReadLine();

query.Stop();

Cet exemple de bout en bout indique comment utiliser une liaison implicite d'une source d'événement avec un modèle de requête, via les surcharges CepStream.Create() et ToQuery() pour créer rapidement une requête active. Pour un contrôle plus explicite sur la liaison d'objets CEP, consultez Utilisation d'un module de liaison de requête.

Exemple complet

L'exemple suivant combine les composants décrits précédemment pour créer une application complète.

Server server = null;

using (Server server = Server.Create(”MyInstance”))
{
    try
    {
        Application myApp = server.CreateApplication("MyApp");

        var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                       typeof(MyInputAdapterFactory),
                                                       new InputAdapterConfig { someFlag = true },
                                                       EventShape.Point);

        var filtered = from e in inputstream
                       where e.Value > 95
                       select e;

        var query = filtered.ToQuery(myApp,
                                     "filterQuery",
                                     "Filter out Values over 95",
                                     typeof(MyOutputAdapterFactory),
                                     new OutputAdapterConfig { someString = "foo" },
                                     EventShape.Point,
                                     StreamEventOrder.FullyOrdered);

        query.Start();
        Console.ReadLine();
        query.Stop();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Voir aussi

Concepts

Utilisation d'un module de liaison de requête