Composer des requêtes pendant l'exécution

Composer des requêtes StreamInsight pendant l'exécution offre de nombreux avantages, dont la flexibilité, la possibilité de réutiliser les requêtes, une utilisation efficace des ressources et une facilité de maintenance. Cela vous permet de :

  • Fournir le résultat d'une requête à d'autres requêtes sur le même serveur.

  • Consommer la sortie d'autres requêtes en cours d'exécution, de la même façon qu'on consomme les événements d'un adaptateur d'entrée.

Par exemple, deux requêtes composées (la requête 1 qui alimente la requête 2) fonctionnent en isolement. Si la requête 1 échoue, l'état de la requête 2 n'est pas affecté, et vice versa. Les requêtes 1 et 2 peuvent être démarrés et arrêtés indépendamment. Par exemple, vous pouvez arrêter la requête 1, la remplacer par une requête différente, et la redémarrer.

Cette rubrique décrit plusieurs cas d'usage et fournit des exemples de composition dynamique des requêtes pendant l'exécution.

Réutilisation de la sortie d'une requête existante

On utilise généralement des requêtes multiples pour concevoir et déployer une requête primaire qui prétraite des données et les envoie à un adaptateur de sortie, tandis que d'autres requêtes consomment le résultat de cette requête et envoient leurs propres résultats à d'autres adaptateurs de sortie. Ce scénario est illustré ci-dessous.

La requête 2 consomme des données de la requête 1.

L'exemple suivant représente une requête créée dans une application myApp existants sur un serveur StreamInsight.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

Pour émettre en continu les résultats de cette requête dans une deuxième requête, la méthode Query.ToStream() est utilisée. Le type qui correspond à la charge utile de sortie de la requête primaire est spécifié comme un paramètre générique, comme indiqué dans l'exemple suivant.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

Dans cet exemple, l'accès au flux de sortie de la requête primaire est effectué et un opérateur de projection est appliqué pour introduire un nouveau champ nommé Status. Le deuxième appel ToQuery() ne requiert plus d'objet d'application parce qu'il peut le déduire de la requête primaire.

La méthode ToStream() accepte un objet AdvanceTimeSettings facultatif si les événements CTI doivent être injectés à ce stade. L'insertion de CTI peut aider à augmenter la dynamique pour certaines configurations de requête.

Notez que la façon dont l'objet de la requête primaire est créé importe peu. Le modèle précédent affiche un exemple d'utilisation de l'API CepStream.ToQuery(). Il est également possible de créer la requête :

  • via un module de liaison de requête ; par exemple, myApp.CreateQuery("filterQuery", queryBinder, "description"); ;

  • en la récupérant via l'API du modèle objet depuis le serveur ; par exemple, myApp.Queries["filterQuery"].

Sortie de requête indépendante

L'exemple précédent indique comment réutiliser le résultat d'une requête existante dans laquelle la sortie est déjà liée à un adaptateur de sortie. Comme alternative, les requêtes peuvent également avoir un flux de sortie indépendant afin qu'aucune sortie ne soit produite à moins qu'une ou plusieurs autres requêtes consomment son résultat. Ce scénario est illustré ci-dessous.

La requête 1 a un flux de requête indépendant.

Cela se fait à l'aide d'une surcharge de CepStream.ToQuery() qui ne requiert pas d'adaptateur :

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

Cette requête peut être démarrée. Une deuxième requête peut faire ultérieurement appel à son flux de résultats en le spécifiant comme indiqué dans l'exemple précédent pour la requête validationQuery. Sans consommateur, les résultats de la requête primaire sont supprimés.

Ce modèle vous permet également de transmettre en continu les résultats d'une requête à plusieurs adaptateurs de sortie. Dans le cas le plus simple, cela peut être accompli en utilisant des requêtes directes sur une requête indépendante, une pour chaque adaptateur de sortie (les requêtes 2 et 3 dans l'illustration précédente).

Flux publiés

Jusqu'à présent, les exemples utilisent l'objet de requête réel pour créer un flux d'entrée pour une autre requête. Pour abstraire les objets côté client, vous pouvez utiliser l'URI du flux de données publié comme entrée pour une ou plusieurs autres requêtes, comme indiqué dans l'illustration suivante.

Requêtes utilisant un flux publié en tant qu'entrée.

Chaque requête possède un URI (Uniform Resource Identifier) de flux publié par défaut, qui correspond au nom de la requête elle-même. De plus, vous pouvez affecter explicitement un nom de flux publié personnalisé à la requête, via le membre approprié de la classe CepStream.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

Cela crée une requête avec une sortie indépendante, mais nommée explicitement. Notez que les noms des flux de données publiés doivent suivre la convention « <application_name>/PublishedStream/<stream_name> ».

Une autre requête peut maintenant faire référence à cet URI en tant que son flux d'entrée, comme indiqué dans l'exemple suivant.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Notez que le consommateur du flux de données publié doit spécifier la forme d'événement d'entrée, qui doit correspondre à la forme de sortie de la requête référencée.

La connexion à une requête primaire via un nom de flux de données publié est moins contraignante que la connexion via l'objet de requête. Par conséquent, lors de la définition de la requête secondaire, une application doit être fournie :

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Adaptateurs d'un flux de données publié

Lors de la récupération des adaptateurs d'une requête composée (par exemple, via Query.InputStreamBindings), vous remarquerez que des adaptateurs intégrés spéciaux sont utilisés pour les connecter. La possibilité de composer des requêtes via CepStream.ToQuery, Query.ToStream(), et ainsi de suite, comme indiqué ci-dessus, est une surface d'exposition commode sur ces adaptateurs intégrés. Ils peuvent également être utilisés explicitement comme des adaptateurs ordinaires, ayant leur propre structure de configuration qui contient le nom du flux de données publié, comme indiqué dans l'exemple suivant :

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

De la même façon, une requête peut utiliser l'adaptateur de sortie du flux de données publié, qui a les mêmes fonctionnalités que CepStream.toPublishedStreamQuery() :

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Utilisation d'un module de liaison de requête

Le modèle de développement du module de liaison de requête autorise le contrôle total des différents objets de métadonnées StreamInsight et distingue clairement la liaison et l'utilisation de la requête, de la phase de conception du modèle de la requête. Ce modèle tient également compte de la composition de la requête dynamique, à la fois sur la liaison d'entrée et sur la liaison de sortie. Pour plus d'informations, consultez Utilisation d'un module de liaison de requête.

Création d'une liaison avec une autre requête comme entrée

Tout comme le module de liaison de requête peut lier un modèle de requête à un adaptateur d'entrée en tant que producteur d'événement, il peut créer une liaison avec une requête existante. Supposez qu'une requête primaire (avec une sortie dépendante ou indépendante) existe, comme dans le premier exemple.

var query = filtered.ToQuery(myApp, ...);

Le module de liaison de requête peut être utilisé comme suit, en faisant référence à la requête précédente dans la surcharge appropriée de BindProducer.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

Le module de liaison de requête peut également faire référence à un flux de données publié en tant que producteur d'événement.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Comme pour la signature Query.ToStream(), un objet AdvanceTimeSettings facultatif peut être spécifié dans BindProducer().

Création d'une liaison avec un flux de données publié comme sortie

Côté sortie, le module de liaison de requête autorise la transmission en continu dans un flux de données publié défini explicitement.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Dès que la requête basée sur ce module de liaison est démarrée, d'autres requêtes peuvent se lier à un flux de données publié, comme indiqué dans les exemples précédents, et consommer ses événements de résultat.

Création d'une liaison avec les adaptateurs d'un flux de données publié

Les adaptateurs d'un flux de données publié peuvent également être utilisés dans le modèle du module de liaison de requête. Ils peuvent être récupérés de l'objet d'application et utilisés dans BindProducer et AddConsumer comme des adaptateurs ordinaires :

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Voir aussi

Concepts

Exemple StreamInsight de bout en bout

Avancer le temps d'application