Share via


ユーザー定義のストリーム演算子

ユーザー定義ストリーム演算子では、イベント ストリームのカスタム処理を定義できます。

使用パターン

クエリ内では、CepStreamScan 拡張メソッドを使用してユーザー定義ストリーム演算子を呼び出します。次の例のように、入力ストリーム、および演算子の最初の状態は、ユーザーが指定します。

var query = input.Scan(new SmoothingOperator(0.7));

演算子の作成者は、抽象型 CepPointStreamOperator または CepEdgeStreamOperator から新しいクラスを抽出します。新しい型は、演算子のステート マシンをカプセル化します。この型のコンストラクターの呼び出しは、スキャン演算子に渡され、演算子の最初の状態が確立されます。

ユーザー定義ストリーム演算子の特性

ユーザー定義ストリーム演算子では、ユーザーは手順に従ってイベント ストリームを操作できます。そのため、ユーザー定義ストリーム演算子は、StreamInsight イベント処理エンジンと CLR 間の境界を表します。この境界はアダプターとユーザー定義演算子または集計の間の境界と同様です。これらすべての場合、エンジンおよび開発者はストリームの一時プロパティに関するコントラクトを満たすことになります。StreamInsight エンジンは、特定のクエリ入力イベントのシーケンスについて、確定的に動作するためにユーザー定義ストリーム演算子に対して次のことを保証します。

  • ユーザー定義ストリーム演算子は、必ず同期時間 (ポイント イベントおよび開始エッジ イベントの場合は開始時間、終了エッジ イベントの場合は終了時間) で順序付けられたイベントを受け取ります。各期間イベントは時間上、開始と終了の 2 つのポイントを示すため、期間イベントには同期時間で順序付けた簡単な表現はありません。そのため期間イベントはサポートされません。

  • ユーザー定義のストリーム演算子に渡されるのは、Insert イベントのみです。受信するストリームの CTI (Current Time Increment) イベントは、ユーザー定義ストリーム演算子で認識されませんが、ユーザー定義ストリーム演算子が時間の経過をどのように認識するかは CTI によって決定されます (下の NextCti を参照)。

  • ユーザー定義ストリーム演算子は、許可されている場合は StreamInsight で非アクティブ化できます (下の IsEmpty を参照)。非アクティブになったユーザー定義ストリーム演算子は、StreamInsight でリサイクルできます。

  • 各 Insert イベントにより ProcessEvent が呼び出され、NextCti プロパティと IsEmpty プロパティのポーリングが行われます。

ユーザー定義のストリーム演算子の入力と出力

ユーザー定義ストリーム演算子は、一度に 1 つの入力イベントを処理します。各入力イベントに対応して 0 ~ * 個の出力イベントが生成される可能性があります。さらにこの演算子は、入力に応じて、独自の内部状態を更新することもあります。入力イベントは、時間の経過を表すために演算子の要求により生成された CTI イベントである場合も、Insert イベントである場合もあります。入力には一時的に注釈が設定されます。

それとは対照に、出力イベントは単なるイベント ペイロードです。出力イベントにタイムスタンプを押したり、出力ストリームに CTI を注入する機会はありません。出力イベントは、対応する入力イベントのタイムスタンプに基づいたタイムスタンプ付きのポイント イベントとして生成されます。

ユーザー定義のストリーム演算子での時間の処理

新しいユーザー定義ストリーム演算子を作成する際、コードはイベントのペイロードを処理する必要があるだけです。時間は StreamInsight でのみ処理されます。入力イベントは順番に受け取られます。各出力イベントのタイムスタンプは、対応する入力イベントのタイムスタンプに基づいています。たとえば、エッジ終了イベントによって出力イベントがトリガーされた場合、その出力イベントはエッジ終了イベントのタイムスタンプを受け取ります。したがって、演算子は時間の影響を受けますが、それを制御することはできません。

ユーザー定義ストリーム演算子は、その ProcessEvent() メソッドの入力ストリームから直接 CTI を受け取りませんが、NextCti プロパティにより時間の経過に対処することができます。このプロパティは ProcessEvent() のすべての呼び出しの後、エンジンでポーリングされます。ユーザー定義ストリーム演算子は、ProcessEvent() の呼び出しとして受け取る次の CTI タイムスタンプを示すタイムスタンプを返すことができます。

NextCti プロパティを設定することにより要求された CTI のみが ProcessEvent に渡されます。これらの CTI は、ユーザー定義ストリーム演算子の外には伝播されません。

ユーザー定義ストリーム演算子の実装

新しいユーザー定義ストリーム演算子を作成するには、抽象基本クラス CepPointStreamOperator または CepEdgeStreamOperator から新しいクラスを抽出します。

  • 抽象基本クラス CepPointStreamOperator から抽出した場合は、演算子は入力イベントをポイント イベントとして認識します。ただし、イベントが実際にはポイント イベントでなくても、エラーではありません。演算子は開始時間のみを認識します。

  • 抽象基本クラス CCepEdgeStreamOperator から抽出した場合は、演算子は入力イベントの開始エッジと終了エッジの両方を認識します。

派生クラスでは、次のプロパティと実装をオーバーライドします。

  • ProcessEvent メソッド。各入力イベントに対して、出力を生成し、演算子の内部状態を更新します。ProcessEvent は 1 つの入力イベントを受け取り、ゼロ個以上の出力ペイロードを返すことができます。

  • IsEmpty プロパティ。演算子の内部状態が空かどうかを示します。true の場合、StreamInsight クエリ エンジンはメモリ使用を低減するために演算子を破棄することがあります。

  • 必要に応じて、NextCti メソッド。CTI イベントが演算子に送られる次の時点を示します。このプロパティをオーバーライドすると、ユーザー定義の演算子で今後の特定のポイントの出力を生成できるようになるか、またはある程度のアプリケーション期間が経過した後に内部状態が空であることが示されるようになります。

派生クラスは、WCF シリアル化も実装する必要があります。詳細については、「方法 : クラスまたは構造体に基本的なデータ コントラクトを作成する」を参照してください。

StreamInsight エンジンと演算子との連携

演算子のインスタンスごとに、同期時刻の順序のイベントによって ProcessEvent メソッドが呼び出されます。ポイント イベントまたは CTI では、同期時刻が有効な開始時刻です。エッジ イベントでは、同期時刻は開始エッジに対する有効な開始時刻であると共に、終了エッジに対する有効な終了時刻でもあります。

ProcessEvent メソッドのすべての呼び出しの後、IsEmpty プロパティと NextCti プロパティがポーリングされます。

演算子が NextCti をオーバーライドすると、演算子によって次に処理されるイベントは、同期時刻が NextCti の値よりも少ない Insert イベントであるか、開始時刻として NextCti の値を持つ CTI であることがエンジンによって保証されます。演算子が、最後に処理されたイベントの同期時刻以下の NextCti 値を返した場合、それは無視されます。NextCti プロパティにより、演算子は入力ストリームの時間の進行を独自のリズム (これらの内部 CTI の形式) に ''変換'' し、それに応じてこの進行に対処することができます。

演算子は、Insert イベントのみに対応してアクティブ化されます。CTI はアクティブ化をトリガーしません。演算子は、IsEmpty から true が返されると非アクティブ化されます。

どの時点でも、エンジンは演算子をシリアル化し、それに対する参照を解除します。演算子のシリアル化が後で解除される場合は、シリアル化前の状態から再開されます。

ユーザー定義ストリーム演算子の例

指数関数スムージング

ユーザー定義ストリーム演算子は、ポイント イベントのストリームを値のシーケンスとして扱い、指数関数スムージングを適用します。System.Runtime.Serialization の参照が必要なことに注意してください。

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

パターン照合

この単純なパターン照合の例は、IsEmpty および NextCti の別の使い方を示しています。この例では、演算子は、値が 1.0 のイベントで、その後 30 秒以内に発生する値 2.0 のイベントがないものを探します。 (この例は、ユーザー定義ストリーム演算子における有用な概念を示すために提供されています。このパターンは非常に単純なため、実際のアプリケーションでは、StreamInsight の組み込み演算子を使用して実装できます)。

前の例では NextCti を使用して演算子の存続期間を制御しました。この例でもこの目的のために NextCti を使用しますが、加えて、時間の経過に対応して出力を生成するためにも NextCti を使用します。

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

使用を単純化するためのヘルパー メソッドの定義

クエリでの演算子の使用を単純化したい場合があります。たとえば、クエリ作成者にとって、input.Scan(new SmoothingOperator(0.5)) を作成するよりも、input.Smooth(0.5) を作成する方が便利です。

次のようなカスタム拡張メソッドを作成することにより、この単純化されたパターンを有効にすることができます。

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }