Using the Query Binder

 

Note


The example in this topic makes use of input and output adapters which were introduced in an earlier version of StreamInsight. Though adapters have been superseded by the current development model, they are still available for developers who are maintaining legacy code. For more information about the current development model, see Developer's Guide (StreamInsight). For examples that use the current development model, see StreamInsight Examples.

A Query Binder is a client-side development model that provides the maximum level of flexibility and reusability when creating StreamInsight applications. In this model, adapters and query templates are registered as separate metadata objects that can later be bound together in order to instantiate a query. This gives the developer complete control of his or her application and development environment by using an explicit query binding on top of the object model API.

Typical use cases for the explicit server development model include StreamInsight applications that require:

  • Full control and access to the StreamInsight server.

  • Reusing queries through static or dynamic query composition, or reusing adapters, event types, and query templates defined by a third party.

Key Characteristics of the Query Binder Development Model

The query binder model has the following key characteristics:

  • The developer must create all metadata objects explicitly and register them into the StreamInsight server.

  • The model supports the creation and use of multiple objects (query templates, queries, applications, and adapters). All objects must be registered under an application.

    The query template and query instance must be explicitly registered with the server before the query can be run. The input and output adapters must be explicitly registered in order for the query template or query to reference these objects. Moreover, all objects must be registered under an application. Event types used by adapters and query templates are registered implicitly.

Examples

The following example creates a StreamInsight server object and an application object named myApp on the server. It then creates and registers all the necessary StreamInsight objects required to import, process, and export the event streams.

First, the server and application object are created.

server = Server.Create(“MyInstance”);  
Application myApp = server.CreateApplication("MyApp");  

Next, input and output adapters are registered in the application.

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");  
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");  

The query template is specified on top of an unbound stream. The only necessary parameter to create an unbound stream is a stream name, which is later needed for the adapter binding.

var inputstream = CepStream<MyDataType>.Create("filterInput");  
  
var filtered = from e in inputstream  
               where e.Value > 95  
               select e;  
  
QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);  
  • The last call registers the query template in the application. The registered query template can now be reused in multiple bindings and, hence, instantiated in multiple queries each bound to potentially different input and output adapters. These bindings for registered query templates are defined through the QueryBinder object:
QueryBinder queryBinder = new QueryBinder(filterQT);  
  
queryBinder.BindProducer<MyDataType>("filterInput",  
                                      inputAdapter,  
                                      new InputAdapterConfig { someFlag = true },  
                                      EventShape.Point);  
  
queryBinder.AddConsumer("filterOutput",  
                         outputAdapter,  
                         new OutputAdapterConfig { someString = "foo" },  
                         EventShape.Point,  
                         StreamEventOrder.FullyOrdered);  
  
Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);  

The BindProducer() method binds an input adapter object (must be registered in the application) to a stream with the specified name, here "filterInput". This enables you to distinguish between multiple entry points of a query template. Along with the input adapter, the binding-specific parameters (the adapter configuration and the desired event shape) are required.

The AddConsumer() method binds an output adapter object (which must be registered in the application) to the single outgoing stream of the query template. The provided output stream name, here "validated", can be used to identify the stream for diagnostic purposes. As with the input adapter, the binding specific parameters are given for the output adapter.

The query object is created based on the query binder, a query identifier, and a textual description. The last step is to start the query.

query.Start();  

Queries with multiple input streams

The following example shows how to create a query template that uses multiple input streams. A query template can have multiple entry points, each fed from a different data source, for example when two streams must be joined. The proper stream association happens through the specification of the stream name, as shown in the following example.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");  
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");  
  
// Define query template in LINQ on top of sensorStream and locationStream  
// ...  
// Create query binder like in the previous example  
// ...  
  
InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");  
  
qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);  
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);  

Modifying an existing application

Notice that you work in the query binder model with the query template and adapter objects without necessarily having created them in the same application. The following example assumes a connection to an existing server and retrieves the existing metadata entities through the StreamInsight object model API instead of creating them.

Application myApp = server.Applications["app1"];  
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];  
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];  

Using a persisted metadata store

When creating a StreamInsight server, an optional parameter of the Server.Create() method is the type of metadata store to be used. By default, metadata is stored in memory. Optionally, metadata can also be persisted on disk through a SQL Server Compact database. The following example shows how to specify a database SQL Server Compact database as the metadata store.

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();  
metadataConfiguration.DataSource = "SIMetadata.sdf";  
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;  
  
server = Server.Create(”MyInstance”, metadataConfiguration);  
Application myApp = server.CreateApplication("MyApp");  

Note that specifying an existing metadata database when creating the server will read all metadata from the specified file. The metadata entities can then be retrieved through the StreamInsight object model API.

Complete Example

using (Server server = Server.Create("MyInstance"))  
{  
try  
{  
    Application myApp = server.CreateApplication("MyApp");  
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");  
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");  
  
    var inputstream = CepStream<MyDataType>.Create("filterInput");  
  
    var filtered = from e in inputstream  
                   where e.Value > 95  
                   select e;  
  
    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);  
    QueryBinder queryBinder = new QueryBinder(filterQT);  
  
    queryBinder.BindProducer<MyDataType>("filterInput",  
                                         inputAdapter,  
                                         new InputAdapterConfig { someFlag = true },  
                                         EventShape.Point);  
  
    queryBinder.AddConsumer("filterOutput",  
                                                 outputAdapter,  
                                                 new OutputAdapterConfig { someString = "foo" },  
                                                 EventShape.Point,  
                                                 StreamEventOrder.FullyOrdered);  
  
    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);  
  
    query.Start();  
    Console.ReadLine();  
    query.Stop();  
}  
catch (Exception e)  
{  
    Console.WriteLine(e.ToString());  
}  
}  

See Also

NIB StreamInsight Server Concepts