実行時のクエリの構築

実行時に StreamInsight クエリを構築することで、クエリの柔軟性、再利用性、リソースの効率、およびメンテナンス性を向上できます。これにより、次のことが可能になります。

  • 1 つのクエリの結果を同じサーバーの他のクエリに渡す。

  • 入力アダプターからのイベントを利用するのと同じように、実行している他のクエリの出力を利用する。

たとえば、クエリ 1 の結果をクエリ 2 に渡すような 2 つのクエリを構築すると、これらのクエリは分離して実行されます。クエリ 1 が失敗しても、クエリ 2 の状態に影響が及ぶことはなく、その反対も同様です。クエリ 1 と クエリ 2 は、個別に開始したり、停止することができます。たとえば、クエリ 1 を停止し、別のクエリに置き換えてから、再開することができます。

このトピックでは、実行時に動的にクエリを構築するいくつかの使用例について説明します。

既存のクエリの出力の再利用

複数のクエリの一般的な使用例として、データを前処理して出力アダプターに送信する最初のクエリを設計、展開する一方で、他のクエリでこのクエリの結果を使用し、その結果を他の出力アダプターに送信することが必要な場合が挙げられます。このシナリオを次の図に示します。

クエリ 2 がクエリ 1 のデータを呼び出す。

次の例は、StreamInsight サーバーの既存のアプリケーション myApp に作成されたクエリを表します。

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

このクエリの結果を 2 番目のクエリにストリーミングするために、メソッド Query.ToStream() を使用します。次の例のように、最初のクエリの出力ペイロードと一致する型は、汎用パラメーターとして指定します。

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

この例では、最初のクエリの出力ストリームにアクセスし、射影演算子を適用して Status という新しいフィールドを導入しています。2 番目の ToQuery() 呼び出しは、最初のクエリから推測できるために、アプリケーション オブジェクトが必要なくなります。

ToStream() メソッドでは、CTI (Current Time Increment) をその時点で挿入する必要がある場合に、オプションの AdvanceTimeSettings オブジェクトを使用します。CTI を挿入することで、特定クエリの構成の活動状態を向上することができます。

最初のクエリ オブジェクトを作成する方法は重要ではありません。前のモデルには、CepStream.ToQuery() API を使用する例を示しています。他の可能性として、次のようにクエリを作成する方法が挙げられます。

  • クエリ バインダーを使用。例: myApp.CreateQuery("filterQuery", queryBinder, "description");

  • サーバーからオブジェクト モデル API を取得。例: myApp.Queries["filterQuery"]

バインドされていないクエリの出力

前の例は、出力が既に出力アダプターにバインドされた既存のクエリの結果を再利用する方法を示しています。この代わりに、バインドされていない出力ストリームをクエリに使用して、他の 1 つ以上のクエリがその結果を利用しない限り、出力を作成しないようにすることができます。このシナリオを次の図に示します。

バインドされていないクエリ ストリームが含まれるクエリ 1。

これは、アダプターを必要としない CepStream.ToQuery() のペイロードを使用して実現します。

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

このクエリを開始することができます。2 番目のクエリは、その結果ストリームを後で利用することができます。そのためには、クエリ validationQuery の前の例に示した方法で、結果ストリームを指定します。利用するクエリが存在しないと、最初のクエリの結果は削除されます。

このパターンにより、クエリの結果を複数の出力アダプターにストリーミングすることもできます。最も単純な場合では、バインドされていないクエリに加えて、各出力アダプターに対して 1 つずつパススルー クエリを使用することでこれを実現することができます (前の図のクエリ 2 とクエリ 3)。

パブリッシュされたストリーム

これまでの例では、別のクエリのための新しい入力ストリームを作成するために、実際のクエリ オブジェクトを使用しました。クライアント側オブジェクトを抽出するには、次の図のように他の 1 つ以上のクエリの入力として、パブリッシュされたストリーム URI を使用できます。

パブリッシュされたストリームを入力として使用するクエリ。

各クエリには、パブリッシュされた既定のストリーム URI (Uniform Resource Identifier) があります。この URI は、クエリ自体の名前となります。さらに、CepStream クラスの適切なメンバーを使用して、パブリッシュされたカスタム ストリーム名をクエリに明示的に割り当てることができます。

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

これにより、バインドされていなくても、明示的に指定された出力のクエリが作成されます。パブリッシュされたストリームの名前は、"<application_name>/PublishedStream/<stream_name>" 規則に従う必要があります。

これにより、次の例のように、別のクエリが入力ストリームとしてこの URI を参照できるようになります。

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

パブリッシュされたストリームを利用するクエリは、入力イベントの形状を指定する必要があります。これは、参照されるクエリの出力形状と一致する必要があります。

パブリッシュされたストリーム名を使用して最初のクエリに接続する方法は、クエリ オブジェクトを使用して接続する場合よりも効率が低くなります。したがって、2 番目のクエリを定義するときは、アプリケーションを指定する必要があります。

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

パブリッシュされたストリームのアダプター

構築したクエリのアダプターを取得する場合 (Query.InputStreamBindings などを使用して)、接続に特殊な組み込みアダプターが使用されます。CepStream.ToQuery、Query.ToStream() などを使用してクエリを構成する機能は、これらの組み込みアダプターに加えて、便利な実装方法です。これらは、パブリッシュされたストリーム名を含む独自の構成構造を持ち、次の例のように、通常のアダプターと同じように明示的に使用することもできます。

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

同様に、クエリにはパブリッシュされたストリームの出力アダプターを使用することができます。これは、CepStream.toPublishedStreamQuery() と同じ機能を持ちます。

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

クエリ バインダーの使用

クエリ バインダー開発モデルを使用すると、各種 StreamInsight メタデータ オブジェクトを完全に制御でき、クエリのバインドおよび利用がクエリ テンプレートのデザイン フェーズから明確に分離されます。このモデルにより、入力バインドと出力バインド側の両方で動的にクエリを構築できます。詳細については、「クエリ バインダーの使用」を参照してください。

別のクエリへの入力としてのバインド

クエリ バインダーは、クエリ テンプレートをイベント プロデューサーとして入力アダプターにバインドできるのと同様に、既存のクエリにもバインドできます。最初の例のような最初のクエリ (バインドされている出力またはバインドされていない出力を持つもの) があるとします。

var query = filtered.ToQuery(myApp, ...);

クエリ バインダーは、BindProducer の適切なオーバーロード内の前のクエリを参照して、次のように使用できます。

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

または、クエリ バインダーは、イベント プロデューサーとしてパブリッシュされたストリームを参照できます。

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Query.ToStream() シグネチャと同様に、BindProducer() にオプションの AdvanceTimeSettings オブジェクトを指定できます。

パブリッシュされたストリームへの出力としてのバインド

出力側では、クエリ バインダーは、明示的に定義されたパブリッシュされたストリームにストリーミングできます。

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

このクエリ バインダーに基づくクエリが開始されるとすぐ、前の例で説明したように、他のクエリが、パブリッシュされたストリームにバインドし、結果イベントを利用できるようになります。

パブリッシュされたストリームのアダプターへのバインド

パブリッシュされたストリームのアダプターは、クエリ バインダー モデルでも使用できます。これらのアダプターは、通常のアダプターと同じように、アプリケーション オブジェクトから取得して BindProducer および AddConsumer に使用できます。

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

関連項目

概念

StreamInsight エンド ツー エンドの例

先行するアプリケーション時間