Crear consultas en tiempo de ejecución

La creación de consultas de StreamInsight en tiempo de ejecución proporciona flexibilidad en las consultas, la capacidad de reutilización, un uso eficaz de los recursos y facilidad de mantenimiento. Permite:

  • Proporcionar el resultado de una consulta a otras consultas del mismo servidor.

  • Utilizar el resultado de otras consultas en ejecución, de la misma forma que se usan los eventos de un adaptador de entrada.

Por ejemplo, dos consultas creadas se ejecutan de forma aislada y la consulta 1 alimenta a la consulta 2. Si la consulta 1 produce un error, el estado de la consulta 2 no se ve afectado y viceversa. Las consultas 1 y 2 se pueden iniciar y detener de forma independiente. Por ejemplo, puede detener la consulta 1, reemplazarla con una consulta diferente e iniciarla de nuevo.

En este tema se describen varios casos de uso y ejemplos de creación dinámica de consultas en tiempo de ejecución.

Reutilizar el resultado de una consulta existente

Un caso de uso común de varias consultas es la necesidad de diseñar e implementar una consulta primaria que preprocese los datos y los envíe a un adaptador de salida, mientras otras consultas utilizan el resultado de esta consulta y envían sus propios resultados a otros adaptadores de salida. Esta situación se muestra en la ilustración siguiente.

La consulta 2 consume datos de la consulta 1.

En el siguiente ejemplo se representa una consulta, creada en una aplicación existente myApp de un servidor de StreamInsight.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

Para transmitir los resultados de esta consulta a una segunda consulta, se usa el método Query.ToStream(). El tipo que coincide con la carga de salida de la consulta primaria se especifica como parámetro genérico, tal como se muestra en el siguiente ejemplo.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

En este ejemplo se tiene acceso al flujo de salida de la consulta primaria y se aplica un operador de proyección para incluir un nuevo campo denominado Status. La segunda llamada a ToQuery() ya no requiere ningún objeto de aplicación porque lo puede deducir de la consulta primaria.

El método ToStream() toma un objeto AdvanceTimeSettings opcional si deben inyectar incrementos de tiempo actual (CTI) en este punto. La inserción de los CTI puede ayudar a aumentar la agilidad de determinadas configuraciones de consulta.

Observe que no importa cómo se crea el objeto de consulta primario. El modelo anterior muestra un ejemplo del uso de la API CepStream.ToQuery(). También existe la posibilidad de crear la consulta:

  • A través de un enlazador de consultas. Por ejemplo, myApp.CreateQuery("filterQuery", queryBinder, "description");

  • Recuperándola a través de la API del modelo de objetos del servidor. Por ejemplo, myApp.Queries["filterQuery"]

Desenlazar el resultado de la consulta

En el ejemplo anterior se muestra cómo se reutiliza el resultado de una consulta existente en la que su resultado ya está enlazado a un adaptador de salida. Como alternativa, las consultas también pueden tener un flujo de salida no enlazado, de forma que no se genere ninguna salida a menos que otras consultas (una o varias) utilicen su resultado. Esta situación se muestra en la ilustración siguiente.

La consulta 1 tiene una flujo de consultas no enlazadas.

Para lograrlo, se usa una sobrecarga de CepStream.ToQuery() que no requiera ningún adaptador:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

Se puede iniciar esta consulta. Una segunda consulta puede utilizar más adelante su flujo de resultado especificándolo, tal como se muestra en el ejemplo anterior de la consulta validationQuery. Sin ningún consumidor, los resultados de la consulta primaria se quitan.

Este modelo también permite transmitir los resultados de una consulta a varios adaptadores de salida. En el caso más simple, esto se puede lograr utilizando consultas de paso en una consulta desenlazada, una para cada adaptador de salida (consultas 2 y 3 en la ilustración anterior).

Flujos publicados

Hasta ahora, los ejemplos utilizan el objeto de consulta real para crear un nuevo flujo de entrada para otra consulta. Para resumir en los objetos del lado cliente, puede utilizar el URI del flujo publicado como una entrada para una o varias consultas, tal como se muestra en la siguiente ilustración.

Consultas que usan una flujo publicada como entrada.

Cada consulta tiene un identificador uniforme de recursos (URI) predeterminado de la flujo publicada, que es el propio nombre de la consulta. Además, puede asignar explícitamente un nombre de flujo publicado personalizado a la consulta, a través del miembro adecuado de la clase CepStream.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

Esto crea una consulta con un resultado desenlazado pero con nombre explícito. Observe que los nombres de flujos publicados deben seguir la convención "<application_name>/PublishedStream/<stream_name>".

Ahora, otra consulta puede hacer referencia a este URI como su flujo de entrada, tal como se muestra en el siguiente ejemplo.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Observe que el consumidor del flujo publicado debe especificar la forma del evento de entrada, que debe coincidir con la forma de salida de la consulta a la que se hace referencia.

La conexión a una consulta primaria a través de un nombre de flujo publicado resulta menos fuerte que si se realiza a través del objeto de consulta. Por lo tanto, al definir la consulta secundaria, se debe proporcionar una aplicación:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Adaptadores de flujo publicado

Al recuperar adaptadores de una consulta creada (por ejemplo, a través de Query.InputStreamBindings), observará que se usan adaptadores integrados especiales para conectarlos. CepStream.ToQuery, Query.ToStream(), etc., suponen superficies convenientes para la funcionalidad de creación de consultas con estos adaptadores integrados. También se pueden utilizar explícitamente como los adaptadores normales y tener su propia estructura de configuración que contiene el nombre de flujo publicado, tal como se muestra en el siguiente ejemplo:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Del mismo modo, una consulta puede utilizar el adaptador de salida del flujo publicado, que tiene la misma funcionalidad que CepStream.toPublishedStreamQuery():

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Utilizar el enlazador de consultas

El modelo de desarrollo del enlazador de consultas permite un control total sobre los diferentes objetos de metadatos de StreamInsight y separa claramente el enlace y uso de consultas de la fase de diseño de la plantilla de consulta. Este modelo permite también la creación dinámica de consultas, tanto en el enlace de entrada como en el enlace de salida. Para obtener más información, vea Utilizar el enlazador de consultas.

Enlazar con otra consulta como entrada

Así como el enlazador de consultas puede enlazar una plantilla de consulta a un adaptador de entrada como productor de eventos, también puede enlazar a una consulta existente. Suponga que existe una consulta primaria (con salida enlazada o desenlazada), como en el primer ejemplo.

var query = filtered.ToQuery(myApp, ...);

El enlazador de consultas se puede utilizar como sigue, haciendo referencia a la consulta anterior en la sobrecarga apropiada de BindProducer.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

El enlazador de consultas también puede hacer referencia a un flujo publicado como productor de eventos.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Al igual que sucede con la firma Query.ToStream(), se puede especificar un objeto AdvanceTimeSettings opcional en BindProducer().

Enlazar a un flujo publicado como salida

En el lado de salida, el enlazador de consultas permite la transmisión a un flujo publicado definido explícitamente.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

En cuanto se inicia la consulta basada en este enlazador de consultas, otras consultas pueden enlazar a un flujo publicado tal como se indica en los ejemplos anteriores y utilizar sus eventos de resultado.

Enlazar a adaptadores de flujo publicado

También se pueden usar adaptadores de flujo publicado en el modelo del enlazador de consultas. Se pueden recuperar del objeto de aplicación y utilizar en BindProducer y AddConsumer al igual que los adaptadores normales:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Vea también

Conceptos

Ejemplo integral de StreamInsight

Adelantar tiempo de aplicación