先行するアプリケーション時間
StreamInsight 開発者は、順序不定なデータを含んでいる可能性があるデータ ソースに対する要求と、より効果的な方法でイベントを処理するための要求を両立させる必要があります。先行するアプリケーション時間をより早く設定すると、待機時間を短縮できますが、遅れて到着するデータ用のウィンドウが削減されます (つまり、データを順序不定で処理する機能が制限されます)。StreamInsight では、アプリケーション時間を判断する際に役立つさまざまな方法が用意されています。このトピックでは、アダプター レベルでクエリ バインドによって設定できる先行するアプリケーション時間のさまざまなレベルおよびポリシーについて説明します。
一時モデルについて
StreamInsight の一時モデルは、アプリケーション時間のみに基づいています。システム時刻には基づいていません。つまり、すべての時間操作ではイベントのタイムスタンプが参照され、ホスト コンピューターのシステム時計は参照されません。このため、アプリケーションは現在のアプリケーション時間を StreamInsight サーバーに通知する必要があります。指定されたアプリケーションのアプリケーション時間は、アプリケーションのコンテキストのさまざまな面に依存します。具体的には、適切なアプリケーション時間を StreamInsight サーバーに提供するのは、アプリケーション開発者の作業です。アプリケーション時間の主要な考慮事項は、次のとおりです。
データ ソース
データ ソースが一時情報を通知するとき、そのデータを使用して、データ ソースからのすべてのイベントが受信された時点を識別することができます。この時点によって、このデータ ソースに関する現在のアプリケーション時間が構成されます。さまざまなデータ ソースが異なる速度で処理される可能性があることに注意してください。
順序不定なデータ
一部のデータ ソースに関しては、タイムスタンプの順にイベントが到着しないことがあります。つまり、データが適切な順序になっていません。StreamInsight は、順序不定なデータに対応しており、StreamInsight サーバーにイベントが到着する順序に処理結果が影響を受けないようにすることができます。StreamInsight 開発者は、余裕時間を使用しアプリケーション時間を前倒しにして、到着遅延イベントを含んだデータ ソースに対応するために、順序不定なイベントを小数ずつ受け入れることができます。
結果の活動状態
StreamInsight は、現在のアプリケーション時間に正確に従っていることが明らかな出力結果をクエリします。これは、結果が全体のアプリケーション時間によりファイナライズされ、StreamInsight クエリから生じることを意味します。
CTI (Current Time Increment)
クエリの処理中、アプリケーション時間は CTI (Current Time Increment) イベントによる影響を受けます。CTI は、StreamInsight の時間モデルの中心的なコンポーネントである区切りイベントです。CTI は、イベントのシーケンスをコミットするために使用されます。また、タイムラインの特定の部分がこれ以上変化しないことを StreamInsight サーバーにアサートすることにより、計算された結果をクエリ出力に送り出すためにも使用されます。このため、結果を生成したり、ステートフルな操作の状態をフラッシュしたりするには、CTI をイベントと共に入力イベント ストリームにエンキューすることが重要です。
入力では、CTI をエンキューすることにより、CTI のタイムスタンプより前の期間に影響を与える後続イベントが生成されないことを保証します。したがって、入力で CTI がエンキューされると、次のルールが適用されるようになります。
イベントの形状がポイント、期間、またはエッジ開始の場合: イベントの開始時刻が CTI 以降である必要があります。
イベントの形状がエッジ終了の場合: イベントの終了時刻が CTI 以降である必要があります。
これらのルールに違反することを "CTI 違反" と呼びます。以降では、これらの違反がどのように処理されるかを説明します。
CTI を入力ストリームに挿入するには、3 つの方法があります。
イベントをキューに登録するのと同様に、入力アダプターを介して CTI をプログラムでキューに登録します。
指定された頻度で CTI を宣言的に生成します。これは、アダプター ファクトリの AdvanceTimeGenerationSettings を介して、またはクエリ バインドの一部として指定できます。
個別の入力ストリームを CTI ソースとして定義します。これは、クエリ バインドでのみ指定できます。
方法 2 および方法 3 を実装する場合は必ず、CTI 違反に関するポリシーも実装する必要があります。次のセクションでは、AdvanceTimeGenerationSettings および違反に関するポリシーについて説明します。その後のセクションでは、アダプター ファクトリおよびクエリ バインドにおける先行する時間の設定の使用方法について説明します。
CTI の生成
CTI の生成 (前のセクションの方法 2 および方法 3) には、2 つのディメンションがあります。
正の整数 N または期間 T のどちらかで指定される、生成頻度。生成頻度ポリシーは、イベント カウント (N) または期間 (T) の発生後に CTI を挿入します。
最終受信イベントに関する遅延として指定される、生成された CTI のタイムスタンプ。
また、ブール型のフラグを使用し、正の無限大のタイムスタンプを持つ最終 CTI を、クエリ停止時に挿入するかどうかを指定できます。これは、クエリの操作から残りのイベントをすべてフラッシュするために使用されます。
CTI 生成は、AdvanceTimeGenerationSettings クラスを介して定義されます。このクラスのコンストラクターは、次の例に示すように、頻度、遅延、およびフラグを取得します。
var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);
この例では、イベント ソースから受け取るイベント 10 件ごとに CTI を挿入するようにエンジンに指示します。CTI には、最後のイベントの時刻から 5 秒を差し引いたタイムスタンプが保持されます。この遅延メカニズムによって事実上の猶予期間が実装されるため、イベント ソースでは、CTI のセマンティクスに違反することなく遅延イベントをキューに登録できます (ただし、イベントの遅延が 5 秒以下の場合に限ります)。対応するクエリが停止すると、無制限の時間を持つ CTI がキューに登録されます。
AdvanceTimeSettings によって CTI 生成の頻度を指定するときには終了エッジが考慮されないことに注意してください。終了エッジは、頻度として期間を使用する場合にも考慮されません。エッジ イベントの場合、頻度と期間のいずれにおいても開始エッジだけが考慮されます。
CTI 違反に関するポリシー
挿入された CTI より早いタイムスタンプを持つイベントを送ると、イベント ソースが CTI セマンティクスに違反します。先行する時間の設定を使用すると、ポリシーの指定がこのような場合でも処理できるようになります。ポリシーには、次の 2 つの値を設定できます。
Drop
挿入された CTI に違反するイベントは削除され、クエリにはキュー登録されません。
Adjust
有効期間が CTI タイムスタンプと重複する場合、挿入された CTI に違反するイベントが変更されます。つまり、イベントの開始タイムスタンプは最新の CTI タイムスタンプに設定され、これらのイベントは有効になります。イベントの開始時刻および終了時刻の両方が CTI タイムスタンプより前である場合、そのイベントは削除されます。
アダプターの先行する時間の設定
先行するアプリケーション時間の設定は、アダプター ファクトリの定義で指定できます。アダプターがインスタンス化されるたびにファクトリの Create() メソッドが呼び出されるのと同様に、アダプター インスタンスの先行する時間設定を定義するための、対応するメソッドが呼び出されます。そのためには、次に例に示すように、型指定されたアダプターの ITypedDeclareAdvanceTimeProperties インターフェイス (または型指定されていないアダプターの IDeclareAdvanceTimeProperties) を使用します。
public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
ITypedDeclareAdvanceTimeProperties<MyInputConfig>
このインターフェイスには、次のメソッドがファクトリの一部として実装されていることが必要です。
public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
return ats;
}
DeclareAdvanceTimeProperties() メソッドは、新しくインスタンス化されたアダプターごとに呼び出されます。このとき、対応する Create() メソッド呼び出しで指定された、同じ構成構造およびイベントの形状パラメーターが使用されます。これにより、クエリの作成やバインドを行う際に先行する時間設定の指定を考慮しなくても、アダプター作成者は、構成情報から正しい CTI 生成の設定を取得できます。
AdapterAdvanceTimeSettings コンストラクターには、AdvanceTimeGenerationSettings オブジェクト、および前に説明した違反に関するポリシーが必要です。
クエリ バインドにおける CTI 生成
AdapterAdvanceTimeSettings と同様、CTI の発行は、次の例に示すように、クエリ バインドで宣言して指定できます。これにより、クエリをバインドするユーザーは、アダプターの実装とは独立して CTI アプリケーション時間の動作を定義できます。
var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);
AdvanceTimeSettings コンストラクターは次の 3 つの引数を使用します。
AdvanceTimeGenerationSettings オブジェクト
AdvanceTimeImportSettings オブジェクト
違反に関するポリシー
生成の設定の引数またはインポート設定の引数のどちらかを NULL に設定できますが、両方を NULL に設定することはできないことに注意してください。また、これらは同時に指定できます。次のセクションでは、AdvanceTimeImportSettings クラスを導入します。
上記の例では、イベントごとに、イベントのタイムスタンプ (遅延なし) を使用して、CTI を生成し挿入しています。次の例に示すように、AdvanceTimeSettings オブジェクトは、オプションの最終パラメーターとして CepStream.Create() メソッドに渡すことができます。
var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);
var inputstream = CepStream<MyPayloadType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new MyConfiguration(),
EventShape.Point,
ats);
これは、クエリ バインダー開発モデルでも使用できます。
queryBinder.BindProducer<MyPayloadType>("filterInput",
inputAdapter,
new MyConfiguration(),
EventShape.Point,
ats);
別のストリームとの同期
クエリ バインド時に使用される場合、頻度に基づいた CTI の生成に加えて (または代わりに)、AdvanceTimeImportSettings を使用して、別の入力ストリームからクエリに CTI をコピーできます。この機能により、次の例に示すように、2 つのストリームが同期されます。
var dataStream = CepStream<DataType>.Create("dataStream ",
typeof(DataInputAdapterFactory),
new MyDataAdapterConfiguration(),
EventShape.Point);
var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);
var lookupStream = CepStream<LookupType>.Create("lookupStream",
typeof(ReferenceInputAdapterFactory),
new MyReferenceConfiguration(),
EventShape.Edge,
ats);
var joined = from eLeft in dataStream
join eRight in lookupStream
where ...
この例は、"速い" データ ストリームが "遅い" 参照ストリームと結合する必要がある場合の一般的な使用を示しています。遅いストリームは、速いストリームより少ない頻度で変化する参照データである場合があります。結合生成出力を最速の入力と同じくらい速くするために、CTI のインポートにより、遅い入力ストリームが速いストリームに同期されています。この例では、速いストリームのアプリケーション時間の処理がアダプターで発生すると考えられます。
結果の活動状態
先行する時間を生成するための設定に関する遅延パラメーターでは、挿入された CTI のタイムスタンプを指定します。出力の活動状態を目的の状態にするには、StreamInsight フレームワークにおける CTI の正確なセマンティクスを理解することが重要です。CTI は、CTI タイムスタンプの直前にあるタイムラインのすべてがコミットされることをエンジンにアサートします。これは、結果の活動状態に対してさまざまな意味を持ちます。
たとえば、ポイント イベントの入力ストリーム、および頻度 1 (イベントごと) と遅延 0 の CTI 生成の設定を考えてみましょう。この場合、各ポイント イベントについてまったく同じタイムスタンプを持つ CTI が生成されます。ただし、これは、タイムスタンプが対応する CTI の直前にあるわけではないので、最終ポイント イベントのみが次の CTI と共にコミットされることを意味します。アダプターにより発行されるとすぐに各ポイント イベントがコミットされるようにするには、CTI のタイムスタンプがポイント イベントの直後であることが必要です。次の例に示すように、これは、1 チックの負の遅延に変換します。
var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);
CTI とクエリ演算子
CTI は、前述のとおり、入力アダプターによってキューに登録されるか、挿入されます。CTI はクエリを介して反映され、演算子ごとに異なる方法で処理されます。たとえば、結合演算子は、両辺から古い方の CTI までの結果を解放します。Union 演算子は、両辺から最近の CTI のうち古い方の結果を解放します。クエリ全体は、最近の CTI までの結果のみを解放します。
一方、CTI タイムスタンプに影響する演算子もあります。ホッピング ウィンドウでは、イベントがウィンドウ内にある間にウィンドウに対する演算の結果が変化する可能性があるため、ウィンドウ内の CTI がウィンドウの先頭に移動されます。ShiftEventTime() メソッドと AlterEventLifeTime() メソッドは、いずれもイベントの開始時刻を変更し、同じ変換が CTI に適用されます。
関連項目
概念
変更履歴
変更内容 |
---|
「CTI とクエリ演算子」を追加しました。 |
「CTI の生成」に、AdvanceTimeSettings によって CTI 生成の頻度を指定するときに終了エッジが考慮されないことに関する情報を追加しました。 |