Share via


クエリ バインダーの使用

クエリ バインダーは、StreamInsight アプリケーションの作成時に最高レベルの柔軟性と再利用性を提供するクライアント側の開発モデルです。このモデルでは、アダプターとクエリ テンプレートは、後でクエリをインスタンス化するためにバインドできる異なるメタデータ オブジェクトとして登録されます。これにより、開発者は、オブジェクト モデル API 上での明示的なクエリ バインディングを使用して、自身のアプリケーションおよび開発環境を完全に制御することができます。

明示的なサーバー開発モデルの一般的な使用例に含まれる StreamInsight アプリケーションには、次の要件があります。

  • StreamInsight サーバーに対してフル コントロールと完全アクセスを持っていること。

  • 静的または動的なクエリ構成によるクエリを再利用するか、またはサードパーティによって定義されたアダプター、イベントの種類、およびクエリ テンプレートを再利用すること。

クエリ バインダー開発モデルの主な特性

クエリ バインダー モデルの主な特性は次のとおりです。

  • 開発者は、すべてのメタデータ オブジェクトを明示的に作成し、StreamInsight サーバーに登録する必要があります。

  • このモデルでは、複数のオブジェクト (クエリ テンプレート、クエリ、アプリケーション、およびアダプター) の作成と使用がサポートされています。アプリケーションのすべてのオブジェクトを登録する必要があります。

    クエリを実行できるようにするには、事前に、クエリ テンプレートとクエリ インスタンスを明示的にサーバーに登録する必要があります。また、クエリ テンプレートやクエリでこのようなオブジェクトを参照するには、入力アダプターおよび出力アダプターを明示的に登録する必要があります。さらに、アプリケーションのすべてのオブジェクトを登録する必要もあります。アダプターおよびクエリ テンプレートによって使用されるイベントの種類は、明示的に登録されます。

使用例

次の例では、StreamInsight サーバー オブジェクトと myApp という名前のアプリケーション オブジェクトをサーバーに作成します。次に、イベント ストリームのインポート、処理、およびエクスポートに必要なすべての StreamInsight オブジェクトを作成して登録します。

まず、サーバーおよびアプリケーション オブジェクトが作成されます。

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

次に、入力アダプターと出力アダプターがアプリケーションに登録されます。

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

クエリ テンプレートは、バインドされていないストリームの最上位に指定されます。バインドされていないストリーム作成に必要なパラメーターは、ストリーム名のみです。このストリーム名は、後でアダプターのバインディングに必要となります。

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • 最後の呼び出しによって、クエリ テンプレートがアプリケーションに登録されます。これで、登録済みのクエリ テンプレートは、複数のバインディングに再利用できるようになります。その結果、複数のクエリがそれぞれ異なる入力アダプターおよび出力アダプターにバインドされる可能性がある場合は、登録済みのクエリ テンプレートは、これら複数のクエリでインスタンス化できます。登録済みのクエリ テンプレートのこれらのバインディングは、QueryBinder オブジェクトを使用して、次のように定義されます。
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

BindProducer() メソッドは、入力アダプター オブジェクト (アプリケーション内で登録されている必要があります) を、指定された名前のストリーム、ここでは "filterInput" にバインドします。これによって、クエリ テンプレートの複数のエントリ ポイントを区別できるようになります。入力アダプターと共に、バインディングに固有なパラメーター (アダプター構成と目的のイベント形状) が必要です。

AddConsumer() メソッドは、出力アダプター オブジェクト (アプリケーション内で登録されている必要があります) をクエリ テンプレートの単一の発信ストリームにバインドします。使用される出力ストリーム名、ここでは "validated" は、診断を目的としたストリームの識別に使用できます。入力アダプターと同様に、バインディングに固有のパラメーターが出力アダプターに対して指定されています。

クエリ オブジェクトは、クエリ バインダー、クエリ識別子、および説明テキストに基づいて作成されます。最後の手順はクエリの開始です。

query.Start();

複数の入力ストリームを持つクエリ

次の例に、複数の入力ストリームを使用するクエリ テンプレートの作成方法を示します。クエリ テンプレートは、2 つのストリームが結合される場合など、それぞれ異なるデータ ソースから入力される複数のエントリ ポイントを持つ場合があります。適切なストリームの関連付けは、次の例に示すように、ストリーム名の指定によって実行されます。

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

既存のアプリケーションの変更

作業するクエリ バインダー モデルで使用するクエリ テンプレートとアダプター オブジェクトは、必ずしも同じアプリケーションで作成されたものではないことに注意してください。次の例は、既存のサーバーへの接続があることを前提としており、既存のメタデータ エントリを作成する代わりに StreamInsight オブジェクト モデル API を使用して取得します。

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

保存されるメタデータ ストアの使用

StreamInsight サーバーを作成するとき、Server.Create() メソッドの省略可能なパラメーターとして、メタデータ ストアの種類を使用できます。既定では、メタデータはメモリに格納されます。必要に応じて、メタデータを SQL Server Compact 3.5 データベースを使用してディスク上に保存することもできます。次の例では、メタデータ ストアとして SQL Server Compact 3.5 データベースを指定する方法を示します。

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

サーバーの作成時に既存のメタデータ データベースを指定すると、指定されたファイルからすべてのメタデータが読み込まれることに注意してください。その後、StreamInsight オブジェクト モデル API を使用してメタデータ エンティティを取得できます。

完全なサンプル コード

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

関連項目

概念

StreamInsight サーバーの概念