Share via


ユーザー定義の集計と演算子

このトピックの例では、ユーザー定義集計 (UDA) とユーザー定義演算子 (UDO) の StreamInsight LINQ 演算子でウィンドウベースの演算子を拡張する方法を示します。これらの演算子は、イベント ウィンドウを介して定義され、ゼロ以上の結果イベントを返します。ユーザー定義集計またはユーザー定義演算子は、実行時にアダプターを指定して使用するように、StreamInsight サーバーからアクセスできるアセンブリにコンパイルする必要があります。

また、StreamInsight では別の拡張方式としてユーザー定義のストリーム演算子が提供されます。ユーザー定義のストリーム演算子は、イベント ウィンドウではなくイベント ストリームを介して直接定義されます。

ユーザー定義集計

ユーザー定義集計は、ウィンドウの指定の先頭で使用することで、そのウィンドウ内のイベントを集計し、1 つの結果値を返します。UDA は、一連の CEP イベントを含む CEP ウィンドウ (ホッピング ウィンドウ、スナップショット ウィンドウ、またはカウントベース ウィンドウの演算子の結果) を入力として使用し、1 つの戻り値 (StreamInsight プリミティブ型のいずれかにマップされる CLR 型) を出力します。ウィンドウの詳細については、「イベント ウィンドウの使用」を参照してください。

StreamInsight が提供する count、sum、および average のように、単純な集計よりもさらに複雑な機能の UDA を実装できます。その例の 1 つである「時間加重平均の計算」については、後のセクションで説明します。

ユーザー定義演算子

ユーザー定義演算子は、ウィンドウの指定の先頭で使用することで、そのウィンドウ内のイベントを処理し、結果として 1 つ以上のイベントを生成します。UDO は、一連の CEP イベントを含む CEP ウィンドウ (ホッピング ウィンドウ、スナップショット ウィンドウ、またはカウントベース ウィンドウの演算子の結果) を入力として使用し、一連の CEP イベントまたは CEP ペイロードを出力します。

UDO は、各ウィンドウのすべてのイベント (タイム スタンプを含む) を生成するための計算や、イベントへ影響を与える計算が必要となる場合に使用できます。たとえば、集計を計算する以外に、イベントのステータス フィールドを設定する場合、ステータスは、集計結果ともう 1 つのパラメーターによって決まります。また、UDO は、各ウィンドウに対して 1 つのイベントを生成する場合があります。このイベントには、集計結果が格納されているペイロード フィールドと、集計結果が制約に違反したかどうかを示すステータス フィールドが含まれます。

UDA と UDO における時間の区別

UDA と UDO には、これらの演算子を実装するための基本クラスの選択に基づいて、時間を区別するか区別しないかを定義できます。

時間を区別しない UDA と UDO では、すべてのイベント (タイム スタンプを含む) が渡されることは想定されていません。定義済みウィンドウのイベントの 1 つまたは複数のペイロード フィールドのみを考慮します。また、現在のウィンドウの開始時刻と終了時刻も渡されません。

時間を区別する UDA および UDO には、各ウィンドウのイベントのセットが渡されます。これらのイベントには、そのタイムスタンプおよびウィンドウの開始時刻と終了時刻が含まれています。UDA または UDO で時間を区別しているかどうかは、UDA または UDO の作成者による実装の派生元である個々の基本クラスによって決まります。

ユーザー定義集計の実装

UDA の作成者の役割は次のとおりです。

  • 実際に UDA を実装する。

  • クエリの記述に UDA を使用できるように LINQ の拡張メソッドを提供する。

UDA を実装するために、ユーザーは適切な基本クラスを派生元にする必要があります。適切な基本クラスは、時間を区別しない UDA の場合は CepAggregate、時間を区別する UDA の場合は CepTimeSensitiveAggregate です。

クラスの派生には、入力型と出力型のパラメーターのインスタンス化が必要です。入力型によって表されるのは、すべてのペイロード (計算中にペイロード フィールドのセット全体を UDA が参照できるようにする必要がある場合)、または StreamInsight の型システムに含まれる対応するプリミティブ型にマップされる CLR 型 (単一フィールドが UDA の入力となっているシナリオの場合) のいずれかになります。どちらの場合も、出力型は、対応するプリミティブ型にマップされる CLR 型でなければなりません。

UDA の作成者が必要とする場合は、イベント データとは別に、オプションの構成構造をクエリの開始時に UDA クラスのコンストラクターに渡すことができます。そのようなコンストラクターを UDA の作成者が指定する場合、このコンストラクターは、LINQ の UDA の呼び出し元が提供する構成と一緒に、エンジンによって実行時に呼び出されます。

時間を区別しない UDA および時間を区別する UDA はどちらも、ペイロードを順序付けのないセットとして受け取ります。時間を区別する UDA の場合、イベントのタイムスタンプはさらに各ペイロードに関連付けられています。また、ウィンドウの開始時刻と終了時刻を定義するウィンドウ記述子が UDA に渡されます。

ユーザー定義集計の例

次の例では、時間を区別しない UDA を実装します。これは整数のイベント フィールドを想定しています。この例の実装ではオプションの構成構造は指定されないため、クラスでは特定のコンストラクターを必要としません。

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

UDA の実装に加えて、クエリの記述で UDA を使用できるように LINQ の拡張メソッドを指定する必要があります。拡張メソッドとは、クエリ作成者が集計を使用してクエリをコンパイルできるようにするためのシグネチャです。次の例に示すように、StreamInsight LINQ プロバイダーは属性を使用して、UDA 実装を含む実際のクラスを参照できます。

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

ここでは、Median クラスを使用する UDA の実装が必要です。このクラスは、int 型の単一のフィールドを処理する UDA を実装し、int 型の値を返します。関数の署名内の式は、入力ストリームのイベント タイプから 1 つの整数値へのマッピングを表します。拡張メソッドは実行されなくなるため、そのメソッド本体に含まれる CepUtility.DoNotCall() に注意してください。この仕様に基づいて、次の例に示すように、UDA を LINQ 内で使用できます。

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

ラムダ式の引数は、イベントのペイロードを、UDA の入力となる整数値にマップします。この場合は、イベント フィールド val の値の平均がウィンドウごとに計算されます。

次に、構成情報を持ち、時間を区別しない UDA の例を示します。これは Trade 型のペイロード全体を入力とし、double 型の値を返します。次の例にも対応する拡張メソッドが含まれます。

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

ペイロード全体が入力になるため、拡張メソッドによってラムダ式は指定されません。UDA への唯一のパラメーターは構成の値 (ここでは double 値) です。

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

次に、構成情報を持ち、時間を区別する UDA の例を示します。UDA は、期間イベントを含んだ時間加重平均であり、ステップ関数として解釈されます (つまり、各期間は次の期間まで有効)。前の例と同様に、ペイロード全体ではなく、double 型の値のみを入力とします。

イベント ペイロードが double 型値に縮小されても、入力セットは、時間を区別しない UDA の場合のように一連のペイロードとして定義されるのではなく、一連の間隔イベントとして定義されることに注意してください。UDA が時間を区別するように指定されているため、この定義はタイムスタンプを含めるために必要です。さらに、ウィンドウ自体は、開始時刻プロパティと終了時刻プロパティを持つ WindowDescription オブジェクトの形式をとります。これらのタイムスタンプは UTC 時間で指定されます。また、UdaConfig は、DataContractSerializer によってシリアル化することができるクラスまたは構造であることが必要です。

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

ここで、UDAConfig は次のように使用されます。

public class UDAConfig
{
    public double Multiplier { get; set; }
}

これで、拡張メソッドに次の構成構造も含まれるようになります。

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

次のように、構成は拡張メソッドの別のパラメーターになります。

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

ここまでの例は、イベントが型指定されているシナリオを考慮しています。つまり、UDA の実装時には、ペイロードの型が明確になっています。次の例では、実行時にのみ入力型が UDA に渡されるジェネリックな入力型の UDA を実装します。

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

ユーザー定義演算子の実装

UDO の作成者の役割は次のとおりです。

  • 実際に UDO を実装する。

  • クエリの記述に UDO を使用できるように LINQ の拡張メソッドを提供する。

UDO を実装するために、ユーザーは適切な基本クラスを派生元にする必要があります。適切な基本クラスは、時間を区別する UDO の場合は CepOperator、それ以外の場合は CepTimeSensitiveOperator です。クラスの派生には、入力型と出力型のパラメーターのインスタンス化が必要です。入力型は、常に、ペイロード全体を表します。出力型は、選択した基本クラスに応じて、一連のペイロードまたは一連のイベントになります。

UDO の作成者が必要とする場合は、イベント データに加えて、オプションの構成構造をクエリの開始時に UDO クラスのコンストラクターに渡すことができます。コンストラクターを UDO の作成者が指定する場合、このコンストラクターは、LINQ の UDO の呼び出し元が提供する構成と一緒に、エンジンによって実行時に呼び出されます。

時間を区別しない UDO および時間を区別する UDO はどちらも、ペイロードを順序付けなしのセットとして受け取ります。時間を区別する UDO の場合、イベントのタイムスタンプはさらに各ペイロードに関連付けられています。また、ウィンドウの開始時刻と終了時刻を定義するウィンドウ記述子が UDO に渡されます。

ユーザー定義演算子における CTI の動作

UDO では、次のように CTI (Current Time Increment) を変更します。ウィンドウがまだ "開いている" 場合は、ウィンドウの終了時刻後のタイム スタンプが設定された CTI が受け取られていないため、ウィンドウ内に含まれるすべての CTI がウィンドウの開始時刻に変更されます。これにより、ウィンドウが開いている間は、ユーザー定義タイム スタンプを含む可能性がある、UDO の出力を変更できるようになります。

ユーザー定義演算子の実装例

次の例では、構成情報を持たない、時間を区別しない UDO を実装します。

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

次の例では、時間を区別する UDO (構成情報を受け入れる) に署名を変更する方法を示します。

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

ユーザー定義演算子の拡張メソッドの例

UDO 作成者は、UDO を実装する以外に、クエリの記述で UDO を使用できるように LINQ の拡張メソッドを提供する必要があります。拡張メソッドとは、クエリ作成者が演算子を使用してクエリをコンパイルできるようにするためのシグネチャです。次の例に示すように、1 つの属性を使用すると、LINQ プロバイダーでは UDO 実装を含む実際のクラスを参照できます。

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

この UDO は、次のように使用できます。

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

次の例では、SampleUDOwithConfig というクラスに含まれる実装を参照しながら、構成構造を持つ UDO の拡張メソッドと使用方法を示します。

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

カルチャ固有のイベント フィールド プロパティ

UDO、UDA、UDF などの拡張は、型システムを備えた CEP ドメインと .Net CLR の間におけるインターフェイスと見なすことができます。アプリケーションによっては、このインターフェイスを介してカルチャ情報を渡せるようにすることが適している場合があります。UDA と UDO では、イベント フィールドのカルチャ プロパティの検査や設定を可能にする追加のインターフェイス、IDeclareEventProperties を拡張の作成者が実装できます。このインターフェイスを実装するには、次の例に示すように、フィールドにカルチャ情報を格納できる CepEventType のオブジェクトを返す関数 DeclareEventProperties を指定する必要があります。

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

この例の UDO では、型 Input の入力イベントを使用し、型 Output のイベントを生成します。型 Output には、UDO の作成者が特定のカルチャ情報で明示的に注釈を付ける文字列フィールドが含まれています。zh-CN というカルチャは出力フィールド firstName に適用され、出力フィールド location は、UDO の入力イベント タイプのフィールド loc に関連付けられている同じカルチャで注釈が付けられます。実行時に UDO が生成するすべてのイベントについて、イベントが UDO の出力ストリームに挿入される前に、これらのカルチャがそのフィールドに適用されます。

ユーザー定義の集計用にも同じインターフェイスがあります。集計の戻り値は 1 つだけなので、そのようなフィールドにカルチャ固有の情報を適用するために、IDeclareEventProperties インターフェイスは 1 つのフィールドで戻り値を CepEventType にラップして、そのフィールドに CEP 固有のプロパティで注釈を付ける方法を提供します。

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

ここでは、UDA の作成者がそのフィールドに CultureInfo プロパティを設定できるように、集計の結果を表す文字列は CepEventType にラップされます。このカルチャ情報は、UDA が使用される LINQ クエリで集計結果を受け取る実際のイベント フィールドに反映されます。

関連項目

概念

イベント ウィンドウの使用

その他の技術情報

LINQ でのクエリ テンプレートの記述