Composing Queries at Runtime

Composing StreamInsight queries at runtime provides query flexibility, reusability, efficient use of resources, and ease of maintenance. It enables you to:

  • Provide the query result of one query to other queries on the same server.

  • Consume the output of other running queries, just like consuming events from an input adapter.

Two composed queries, for example, query 1 feeding into query 2, run in isolation. If query 1 fails, the state of query 2 is not impacted, and vice versa. Query 1 and query 2 can be started and stopped independently. For example, you can stop query 1, replace it with a different query, and start it again.

This topic describes several use cases and examples of composing queries dynamically at runtime.

Reusing the Output of an Existing Query

A common use case for multiple queries is the need to design and deploy a primary query that pre-processes data and sends it to an output adapter, while other queries consume the result of this query and send their own results to other output adapters. This scenario is shown in the following illustration.

Query 2 consumes data from query 1.

The following example represents a query, created in an existing application myApp on a StreamInsight server.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

To stream the results of this query into a second query, the method Query.ToStream() is used. The type that matches the output payload of the primary query is specified as a generic parameter, as shown in the following example.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

In this example, the output stream of the primary query is accessed and a projection operator is applied to introduce a new field named Status. The second ToQuery() call no longer requires an application object because it can infer it from the primary query.

The ToStream() method takes an optional AdvanceTimeSettings object if current time increments (CTIs) must be injected at that point. Inserting CTIs can help increase liveliness for certain query configurations.

Note that it does not matter how the primary query object is created. The previous model shows an example of using the CepStream.ToQuery() API. Other possibilities are to create the query:

  • Through a query binder. For example, myApp.CreateQuery("filterQuery", queryBinder, "description");

  • Retrieve it through the object model API from the server. For example, myApp.Queries["filterQuery"]

Unbound Query Output

The previous example shows how to reuse the result of an existing query in which its output is already bound to an output adapter. As an alternative, queries can also have an unbound output stream so that no output is produced unless one or more other queries consume its result. This scenario is shown in the following illustration.

Query 1 has an unbound query stream.

This is accomplished by using an overload of CepStream.ToQuery() that does not require an adapter:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

This query can be started. A second query can later consume its result stream by specifying it as shown in the previous example for the query validationQuery. Without any consumer, the results of the primary query are dropped.

This pattern also enables you to stream the results of a query to multiple output adapters. In the simplest case, this can be accomplished by using pass-through queries on top of an unbound query, one for each output adapter (Query 2 and 3 in the previous illustration).

Published Streams

So far, the examples use the actual query object in order to create a new input stream for another query. In order to abstract for the client-side objects, you can use the published stream URI as an input for one or more other queries, as shown in the following illustration.

Queries using a published stream as input.

Each query has a default published stream uniform resource identifier (URI), which is the query name itself. Moreover, you can explicitly assign a custom published stream name to the query, through the appropriate member of the CepStream class.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

This creates a query with an unbound, but explicitly named output. Note that the names of published streams must follow the convention "<application_name>/PublishedStream/<stream_name>".

Another query can now refer to this URI as its input stream, as shown in the following example.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Note that the consumer of the published stream must specify the input event shape, which must match the output shape of the referenced query.

Connecting to a primary query through a published stream name is less tight than connecting through the query object. Hence, when defining the secondary query, an application must be given:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Published Stream Adapters

When retrieving adapters of a composed query (for example, through Query.InputStreamBindings), you will notice that special built-in adapters are used to connect them. The functionality of composing queries through CepStream.ToQuery, Query.ToStream(), and so on as shown above are convenient surfaces on top of these built-in adapters. They can also be used explicitly like ordinary adapters, having their own configuration structure which contains the published stream name, as shown in the following example:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

In the same fashion, a query can use the published stream output adapter, which has the same functionality as CepStream.toPublishedStreamQuery():

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Using the Query Binder

The query binder development model allows for full control over the various StreamInsight metadata objects and clearly separates the query binding and usage from the query template design phase. This model allows for dynamic query composition, too, both on the input binding as well as on the output binding side. For more information, see Using the Query Binder.

Binding to another query as input

Just as the query binder can bind a query template to an input adapter as an event producer, it can bind to an existing query. Assume that a primary query (with bound or unbound output) exists as in the first example.

var query = filtered.ToQuery(myApp, ...);

Then, the query binder can be used as follows, referring to the previous query in the proper overload of BindProducer.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

Alternatively, the query binder can refer to a published stream as an event producer.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

As with the Query.ToStream() signature, an optional AdvanceTimeSettings object can be specified in BindProducer().

Binding to a published stream as output

On the output side, the query binder allows for streaming into an explicitly defined published stream.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

As soon as the query that is based on this query binder is started, other queries can bind to a published stream as outlined in the previous examples and consume its result events.

Binding to Published Stream Adapters

Published stream adapters can also be used in the query binder model. They can be retrieved from the application object and used in BindProducer and AddConsumer like ordinary adapters:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

See Also

Concepts

StreamInsight End-to-End Example

Advancing Application Time