入力アダプターと出力アダプターの作成

ここでは、StreamInsight プラットフォームを使用して CEP (Complex Event Processing) アプリケーションの入出力アダプターを作成するために必要な一般的な情報を提供します。アダプターとは、イベントを StreamInsight サーバーの内外に配信するソフトウェア変換機能です。

イベント フローと制御について

アダプターを作成するときは、StreamInsight サーバーを経由するイベント フローについて理解し、入力アダプターおよび出力アダプターによってこのフローが制御されるしくみを把握することが重要です。次の図に示すように、イベント フローは、ソースから継続クエリを経由してシンクへと 1 方向に移動します。つまり、イベントをクエリに送る入力アダプターによって、ソースからイベントが読み取られます。その後、入力イベント、または入力イベントの処理結果として生成される新しいイベントが、クエリ内のある演算子から次の演算子にプッシュされます。続いて、クエリによって、処理されたイベントが、シンクにイベントを送る出力アダプターに送信されます。図は、StreamInsight クエリが 2 つの入力アダプター a1 と a2、および出力アダプター インスタンス a4 にバインドされるシナリオを示しています。

入力アダプターから出力アダプターへのイベント フロー

イベント フローはソースからシンクへと 1 方向に移動しますが、コンポーネント間の一部の対話ポイントで行われるイベントの取得と転送のフローおよび実行制御は双方向にすることができます。これらの対話ポイントは、次の図に READ、ENQUEUE、DEQUEUE、および WRITE として示しています。

入力アダプターの実装では、ソース デバイス (ファイルやデータベースなど) に固有のアクセス メカニズムを使用して READ 操作を実行し、アダプター API を使用して ENQUEUE 操作を実行する必要があります。同様に、出力アダプターの実装では、シンク デバイスに固有のアクセス メカニズムを使用して WRITE 操作を実行し、アダプター API を使用して DEQUEUE 操作を実行する必要があります。ENQUEUE および DEQUEUE 操作は、アダプター状態の移行ダイアグラムに指定されているデザイン パターンに従って実装する必要があります。デザイン パターンについては、このトピックの後半で説明します。

イベント フロー制御の面からイベントを考慮する場合、プロバイダーからコンシューマーにプッシュされるイベント (左から右方向へのブロック矢印で示されます)、またはコンシューマーによってプロバイダーからプルされるイベント (鍵状の矢印で示されます) を想定できます。READ および WRITE の対話ポイントでは、アダプター実装にイベント フロー制御としてプッシュ アプローチまたはプル アプローチを採用できます。この対話に関しては、ソースまたはシンクが対応できるイベント レート、ソースまたはシンクを調整するアダプターの機能、実装できるバッファリング機能を考慮する必要があります。

非常に短い待機時間でイベントを出力し、調整が難しいソース デバイスの場合、通常、ソース デバイスがイベントをアダプターにプッシュするようにアダプターを実装します。このようなデバイスの例として、センサー (マシン駆動イベント)、ティッカー プラント、ネットワーク ポートがあります。待機時間が長いデバイス (ファイル、データベース) の場合、アダプターがデータをソースからプルするような実装を検討します。同様に、出力側では、デバイスの出力アダプターを非常に高いスループットでイベントを受け取ることができるように実装して、イベントをデバイスにプッシュします。速度が遅い出力デバイスでは、イベントを使用する準備が完了するたびにデバイスがアダプターをポーリングする方法を採用できます。

ENQUEUE 対話ポイントでは、StreamInsight サーバーはプッシュ モデルをサポートします。つまり、アダプター デザイン パターンにより、任意の時点でエンジンが可能な限り使用できる数のイベントをキューに格納できます。DEQUEUE 対話ポイントでは、StreamInsight サーバーはプル モデルをサポートします。つまり、アダプター デザイン パターンでは、エンジンができるだけ速くサーバーからイベントをプルすることが想定されています。

このため、StreamInsight サーバーの調整ポリシーは非常に簡単です。ブロック操作のない簡単なパススルー クエリを想定した場合、StreamInsight サーバーが ENQUEUE 対話ポイントで入力アダプターからイベントを使用できる速度は、出力アダプターが DEQUEUE 対話ポイントでサーバーからイベントを使用できる速度でのみ制限されます。ENQUEUE 動作時に StreamInsight サーバーが入力アダプターでプッシュバックする範囲は、クエリが出力を提供する速度と出力アダプターがこの出力を使用する速度によって決まります。StreamInsight には、これらの対話ポイントでイベント レートを測定する際に役立つ診断ビューの包括的なセットが用意されています。詳細については、「StreamInsight サーバーおよびクエリの監視」を参照してください。

アダプターの開発タスク

次のチェックリストに従って、アダプターを開発します。

  • 必要なアダプターの種類 (入力または出力) を決定します。

    入力アダプターは、指定された形式で受信イベントを読み取り、StreamInsight サーバーで使用できる形式にこのデータを変換します。

    出力アダプターは StreamInsight サーバーによって処理されたイベントを受け取り、そのイベントを出力デバイスに必要な形式に変換して、データをそのデバイスに送信します。

  • イベントの種類を決定します。

    入力アダプターの場合、ソースによって提供されるイベントのペイロードを説明するイベントの種類を定義します。出力アダプターの場合、シンクによって使用されるイベントのペイロードを説明するイベントの種類を指定します。イベント ペイロードの詳細については、「StreamInsight サーバーの概念」を参照してください。

    フィールドの数と種類が事前にわかっている固定ペイロード形式のイベントを常に生成または使用する、ソースまたはシンク用の型指定されたアダプターを指定して作成します。型指定されたアダプターの主な利点は、イベントを作成して StreamInsight サーバーにキューを格納する処理を比較的簡単に実装できることです。フィールド型は既にわかっているため、IntelliSense を Visual Studio (または別の統合開発環境での同等の機能) で使用してフィールドを作成できます。

    ソースまたはシンクが異なるペイロード形式を生成または使用する場合は、型指定されていないアダプターを指定して作成します。型指定されていないアダプターの主な利点は、アダプターの実装を特定のイベントの種類に関連付けるのではなく、クエリ バインド時にイベントの種類を柔軟に指定できることです。型指定されたアダプターに比べて、型指定されていないアダプターの実装には詳細な設定が必要になります。型指定されていない入力アダプターは、クエリ バインド時に提供される構成パラメーターから各フィールドの型を特定し、フィールドを 1 つずつ生成して、イベントをキューに格納することができるように記述する必要があります。同様に、型指定されていない出力アダプターは、出力時に提供される構成情報に基づいてキューから取り出されるイベントからクエリ処理の結果を取得できる必要があります。

    重要なのは、クエリにバインドされるアダプター インスタンスは、型指定されているかどうかに関係なく、常に、特定の種類のペイロードを含むイベントを生成することです。詳細については、「イベントの種類の作成」を参照してください。

  • イベント モデルを決定します。

    入出力イベントのイベント モデルを決めます。StreamInsight では、ポイント、期間、境界という 3 つのイベント モデルがサポートされています。ソースによって固定イベント モデルのイベントが提供される場合、そのイベント モデルのみに対する入力アダプターをデザインできます。同様に、シンクで特定のモデルのイベントが必要になった場合、そのイベント モデルのみに対する出力アダプターをデザインできます。ただし、ほとんどのアプリケーションでは、すべてのイベント モデルが特定のイベントの種類に対応することが求められます。各イベント モデルについて型指定されたアダプターまたは型指定されていないアダプターを作成することをお勧めします。イベント モデルの詳細については、「StreamInsight サーバーの概念」を参照してください。

    AdapterFactory の入出力クラスを使用すると、これらのアダプターをまとめてパッケージ化できます。正しいアダプターは、構成パラメーターに基づくクエリ バインド時にインスタンス化できます。

  • アダプターの対応する基本クラスを選択します。

    イベントの種類とモデルに基づいて、アダプターの適切な基本クラスを選択します。クラスの用語体系は [型指定あり][ポイント | 期間 | 境界][入力 | 出力] パターンに従います。型指定されていないアダプターには型指定されたプレフィックスは付きません。

    アダプターの種類

    入力アダプターの基本クラス

    出力アダプターの基本クラス

    型指定されたポイント

    TypedPointInputAdapter

    TypedPointOutputAdapter

    型指定されていないポイント

    PointInputAdapter

    PointOutputAdapter

    型指定された期間

    TypedIntervalInputAdapter

    TypedIntervalOutputAdapter

    型指定されていない期間

    IntervalInputAdapter

    IntervalOutputAdapter

    型指定された境界

    TypedEdgeInputAdapter

    TypedEdgeOutputAdapter

    型指定されていない境界

    EdgeInputAdapter

    EdgeOutputAdapter

    詳細については、「Microsoft.ComplexEventProcessing.Adapters」を参照してください。

  • 入出力 AdapterFactory クラスをデザインします。

    AdapterFactory は、アダプターのコンテナー クラスです。ファクトリ クラスを実装する必要があります。ファクトリの基本クラスは、次のように構成されます。

    アダプターの種類

    入力アダプターの基本クラス

    出力アダプターの基本クラス

    型指定あり

    ITypedInputAdapterFactory

    ITypedOutputAdapterFactory

    型指定なし

    IInputAdapterFactory

    IOutputAdapterFactory

    回復性サポートによる型指定あり

    IHighWaterMarkTypedInputAdapterFactory

    IHighWaterMarkTypedOutputAdapterFactory

    回復性サポートによる型指定なし

    IHighWaterMarkInputAdapterFactory

    IHighWaterMarkOutputAdapterFactory

    ファクトリ クラスは、次の目的で利用できます。

    • ファクトリ クラスを使用すると、特定のデバイス クラス (CSV ファイル、SQL Server データベース、Web サーバー共通ログ形式) またはアプリケーション要件に対応する異なるアダプター実装間でリソースを共有でき、アダプター コンストラクターに構成パラメーターを渡すことが容易になります。たとえば、アプリケーションでは 3 つのイベント モデル (ポイント、期間、および境界) すべてが必要になる場合があります。1 つのファクトリでは、各イベント モデルに 1 つずつ、合計で 3 つのアダプター実装をサポートできます。別の例として、アプリケーションにデータベースのテーブルなどの同じイベント ソースが存在することがありますが、ソースは実行されるクエリに基づいて、同じソースから複数のイベント ペイロード構造を生成します。この場合、1 つのファクトリでは、各ペイロード構造を処理する複数のアダプター実装をサポートできます。

    • ファクトリ クラスを使用すると、サーバー ランタイムに対するアダプター用のゲートウェイを利用できます。アダプター開発者は、アダプター クラスのアダプター ファクトリで Create() メソッドと Dispose() メソッドを実装する必要があります。これらのメソッドは、クエリの起動時とシャットダウン時にサーバーによって呼び出されます。

    • ファクトリ クラスを使用すると、ランタイム前の構成情報に対するアダプター用のゲートウェイを利用できます。このことは、型指定されていないアダプターにとって特に重要です。型指定されていないアダプターは、クエリ バインド時に提供される構成パラメーターから、構造内の各フィールドの型を特定する必要があります。ファクトリ クラスで構成構造を定義し、この構成構造を Create() メソッドによってアダプター クラスのコンストラクター メソッドに渡すことができます。この構成構造は、DataContractSerialization を使用してシリアル化されます。この制約とは別に、この開発手法では、構成構造を作成したりアダプター コンストラクターで使用する際にこの構成構造を制限なく定義し使用できます。

    • ファクトリ クラスを使用すると、CTI (Current Time Increment) を入力アダプターによって明示的にキューに格納しなくても CTI を生成できます。ユーザーは、アダプター ファクトリ クラスに TypedDeclareAdvanceTimePolicy インターフェイス (型指定されたアダプター ファクトリの場合) または IDeclareAdvanceTimePolicy インターフェイス (型指定されていないアダプター ファクトリの場合) を実装することで、CTI の頻度とタイムスタンプを指定できます。これにより、アダプター コードが簡素化され、ファクトリがアダプター インスタンスを使用して生成する各イベント ストリームに影響を与えることができます。詳細については、「[AdvanceTimeSettingsClass]」を参照してください。

    • 回復性アプリケーションでは、欠落したイベントを再生するために入力アダプターに高ウォーター マークを適用すること、および重複イベントの削除のために出力アダプターに高ウォーター マークとオフセットを提供することによって回復性がサポートされています。詳細については、「StreamInsight の回復性」を参照してください。

  • アダプターをビルドしテストします。

    アダプターを .NET アセンブリとしてコンパイルおよびビルドします。複雑なクエリ処理を行わずに、イベントを入力アダプターから読み取って出力アダプターに出力する単純なパススルー クエリに対して、基本操作のアダプターをテストします。これにより、アダプターがデバイスからの読み取り操作およびデバイスへの書き込み操作を実行していて、イベントのキューへの格納およびキューからの取り出しを実行できるかどうかが検証されます。

アダプターのステート マシン

アダプターと StreamInsight サーバーの対話を定義するステート マシンは、入力アダプターでも出力アダプターでも同じです。これが重要なのは、ステート マシンでは一貫性のある開発モデルが提供されるためです。ステート マシンを次の図に示します。

アダプターのキューへの登録およびキューからの削除状態の図

このステート マシンを動作させる主な機能と要件は次のとおりです。

  • Start() および Resume() メソッドは、StreamInsight サーバーで呼び出され、アダプター開発者によって実装される必要があります。また、基本クラスから継承される、アダプター クラスのコンストラクター メソッドおよび Dispose() メソッドも実装する必要があります。

  • そのため、アダプターの実装ではアダプター SDK によって提供される次のメソッドを呼び出す必要があります。

    • Enqueue() (入力アダプターの場合)。このメソッドの戻り値は EnqueueOperationResult.Success または EnqueueOperationResult.Full です。

    • Dequeue() (出力アダプターの場合)。このメソッドの戻り値は DequeueOperationResult.Success または DequeueOperationResult.Empty です。

    • Ready().このメソッドの戻り値は、ブール値 TRUE または FALSE です。

    • Stopped().このメソッドの戻り値は、ブール値 TRUE または FALSE です。

  • StreamInsight サーバーは、管理者またはクエリ開発者がサーバー API でメソッドを使用してクエリの実行を停止する場合、ユーザーに代わって内部メソッド (StopQuery() で表される) を非同期的に呼び出します。

  • アダプターが次のいずれかの状態の場合、Enqueue() と Dequeue() を呼び出し、それぞれ Full 状態と Empty 状態を返します。

    • Suspended

    • Stopping

  • アダプターが次のいずれかの状態の場合、Enqueue() および Dequeue() を呼び出すと、例外が発生します。

    • Created

    • Stopped

  • アダプターが次のいずれかの状態の場合、Ready() を呼び出すと、例外が発生します。

    • Created

    • Running

    • Stopped

  • アダプターの操作の過程で、5 つの状態 ("Created"、"Running"、"Suspended"、"Stopping"、および "Stopped") の一部またはすべてが変化します。状態の変化は、StreamInsight サーバーが Start() または Resume() を呼び出す前と、アダプターが Enqueue()、Dequeue()、Ready()、および Stopped() を呼び出した後に発生します。

  • StreamInsight サーバーとアダプターが同じスレッドを共有することはありません。サーバーは、常に、別のワーカー スレッドで Start() または Resume() を呼び出します。サーバーは、アダプターに代わって、このスレッドをオペレーティング システム スレッド プールから取得します。つまり、Start() メソッドと Resume() メソッドには必要に応じてワーカー スレッドを使用する十分な機能と柔軟性があります (たとえば、非同期の読み取りと書き込みに対して多くのスレッドを起動するなど)。このため、このスレッドからシステム リソースを使用する際には注意を払い最適な処理を行うことが必要です。

  • この API によって、Start() 操作 (スレッド) と Resume() 操作 (スレッド) 間に特有の同期が不要になります。サーバーは、Ready() がアダプターによって呼び出されると (呼び出された後のみ)、常に Resume() を呼び出します。ただし、イベントの読み取り、書き込み、またはバッファリングというデバイスに向けたタスク (特に非同期の I/O シナリオ) では、同期が必要になる場合があることに注意してください。ここでは、ベスト プラクティスとして、非ブロッキング I/O を使用することをお勧めします。

  • アダプターをアイドル状態にすることができる場合は、アダプターでは定期的に状態をチェックして、停止要求があったかどうかを確認する必要があります。

アダプターとサーバーの対話のライフ サイクル

StreamInsight サーバーとアダプター間のハンドシェイクは常に同期しています。そのため、実行中の任意の時点で、アダプターは状態をチェックして状況に応じた処理を行うことができます。アダプターと StreamInsight サーバーの対話のライフ サイクルは、次の操作で構成されています。操作は前の図に示したステート マシンに対応します。

  • Created

    クエリが開始される (StreamInsight  サーバー API で対応する呼び出しを行うことで) と、アダプター インスタンスは StreamInsight サーバーとの対話を開始します。

  • Running

    サーバーは、アダプターを "Running" 状態にしてアダプターで非同期に Start() を呼び出し、この呼び出しが 1 回だけ行われるようにします。アダプターが "Running" 状態になると、アダプターはイベントをサーバーのキューに登録したり、サーバーのキューから削除したりできるようになります。

    通常、アダプターを "Running" 状態にするのが最適です。推奨されるデザイン パターンは、リーダーまたはライター ルーチンを、可能であれば個別のスレッドで、Start() メソッドから起動し、Start() ルーチンから返すようにすると、ワーカー スレッドがすばやく解放されます。

    リーダー ルーチン (たとえば ProduceEvents() と呼ばれるとします) はソースからイベントを読み取り、Enqueue() を呼び出してイベントをサーバーにプッシュします。出力アダプターの場合は、ライター ルーチン (たとえば ConsumeEvents() と呼ばれるとします) は Dequeue() を呼び出し、サーバーからイベントをプルしてシンクに書き込みます。

  • Suspended

    サーバーがキューに登録されたイベントを受け取ることができない場合、またはイベントを出力して取り出すことができない場合、入力または出力アダプターは "Suspended" 状態になります。これにより、Enqueue() と Dequeue() の呼び出しにそれぞれ "FULL" 状態と "EMPTY" 状態が返されます。"Suspended" 状態では、データベースから最後に読み取ったレコードの場所を保存したり、ファイルから行を保存したりするなどのハウスキーピング操作を実装できます。このオプションのセクションの終わりでは、Ready() メソッドを呼び出して、アダプターの再開準備ができているサーバーと通信する必要があります。ルーチンが Start() と同じワーカー スレッドで実行されている場合は、Start() ルーチン自体から返す必要があります。

  • Ready() の呼び出しに応えて、サーバーはアダプターを "Running" 状態に戻し、常に、異なるワーカー スレッドで Resume() を非同期に呼び出します。前回失敗した繰り返し処理をキューに登録したりキューから削除するように Resume() をデザインすると、ProduceEvents() または ConsumeEvents() を呼び出すことができます。このパターンは、アダプターが "Stopped" 状態または "Stopping" 状態に移行するまで続けられます。

  • Stopping

    "Running" 状態または "Suspended" 状態の任意のポイントで、サーバーは、クエリを停止するための非同期要求に応じてアダプターを "Stopping" 状態に移行できます。この状態では、Enqueue() または Dequeue() の呼び出しでも "FULL" または "EMPTY" 状態がそれぞれ返されます。

    "Stopping" 状態には、アダプター実装が停止するための準備を正しく行うことができるステージング領域が用意されています。アダプターを実装して、アダプターが取得しているすべてのリソース (スレッド、メモリ) を解放し、Stopped() メソッドを呼び出すことができます。このメソッドが呼び出されるまで、サーバーはアダプターを停止しません。

    アダプターは非同期に "Stopping" 状態に移行する場合があります。アダプターには、"Stopping" 状態に移行したことを検出するなんらかの手段が必要です。上記で説明されているように、デザイン パターンの目的は、Ready() を中断されたときにアダプターがこれを呼び出すことができるようにすることです。その結果、サーバーによって Resume() メソッドがもう一度呼び出され、"Stopping" 状態を Resume() メソッドで検出できるようになります。Start() と Resume() の実装に最初のブロック コードとして "Stopping" 状態にチェックマークを付けることをお勧めします。

  • Stopped

    アダプター コードでは、いつでも Stopped() を呼び出すことができます。これにより、アダプターは "Stopped" 状態になります。デザイン時の注意点として、Stopped() を呼び出す前に、アダプターが取得したリソースがクリーンアップされるようにすることをお勧めします。

    重要な注意事項重要

    Stopped() メソッドの呼び出しに失敗すると、クエリに関連付けられているメモリの最後のページが割り当てられたままになります。これにより、少量のメモリ リークが発生しますが、プロセス内でクエリの開始と停止が何度も繰りされると、このようなメモリ リークが時間の経過と共に蓄積される場合があります。

    "Stopped" 状態のアダプターは、StreamInsight サーバー固有の構造またはイベント メモリを参照することができません。また、キューに登録したりキューから削除したりすることもできません。このような処理を行うと、例外が発生します。ただし、オペレーティング システムおよびデバイスに向けたクリーンアップ処理は続行できます。

使用例

さまざまな入出力アダプター、およびアダプター ファクトリの例については、「StreamInsight のサンプル」で使用可能なサンプルを参照してください。

関連項目

概念

StreamInsight サーバーの概念

StreamInsight サーバー アーキテクチャ