Share via


列挙可能なイベント ソースとイベント シンクのエンド ツー エンドの例 (StreamInsight)

この単純なエンド ツー エンドの例は、IEnumerable インターフェイスを実装するイベント ソースとイベント シンクを使用して完全な StreamInsight アプリケーションを作成する方法を示しています。

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

この例では、StreamInsight を SQL Server および ADO.NET Entity Framework と組み合わせて使用して、Northwind サンプル データベースの履歴データに対する時間関連のクエリ (特定の地域でアクティブだった注文の数が 3 件を超えていた期間を検索するクエリ) に答えます。

この例では、IEnumerable イベント ソースを使用します。IObservable を実装するイベント ソースを使用する手順も同様です。ただし、監視可能な出力ではデータがオブザーバーにプッシュされるため、コンシューマーが (列挙可能なソースに対して foreach を呼び出すときのように) データをプルする必要はありません。

ステップ 1 - 監視可能なイベント ソースまたは列挙可能なイベント ソースを提供する

まず、Northwind データベースに対する LINQ to Entities クエリを発行して、クエリのソース データを定義します。結果 (databaseQuery) は、既定で IEnumerable インターフェイスを実装します。

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

ステップ 2 - 入力をストリームに変換し、ストリームの時間特性を記述する

次に、クエリの結果を期間イベントのストリームに変換します。

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

このコードでは、AdvanceTimeSettings クラスのヘルパー (IncreasingStartTime) を使用して、各イベントの後に delay が 0 (ゼロ) の CTI イベントを挿入しています。StrictlyIncreasingStartTime を使用して -1 ティックの delay を指定したり (イベントの開始時刻の 1 ティック後に CTI が配置されます)、UnorderedTimestamps を使用して delay にカスタム値を指定したりすることもできます。

次に、IntervalEvent クラスの CreateInsert メソッドを使用して、ソース データをイベントのストリームに変換しています。開始時刻に OrderDate、期間の終了時刻に ShippedDate、イベントのペイロードに ShipRegion がそれぞれ指定されています。

PointEvent クラスにも同様の CreateInsert メソッドがあり、EdgeEvent クラスには CreateStart メソッドと CreateEnd メソッドがあります。また、3 つのイベント クラスのすべてに CreateCti メソッドがあります。このメソッドを使用すると、CTI イベントを手続き的に挿入できます (AdvanceTimeSettings を使用する場合は宣言的に挿入することになります)。

ステップ 3 - クエリを記述する

次に、受信するイベントのストリームに対応する、時間対応の StreamInsight クエリを記述します。

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

このクエリは、各期間のイベントを地域別に分類し、アクティブだったイベントの数が 3 件を超えていた期間のみを選択します。結果は、アクティブな注文の数と ShipRegion の識別子を含むまったく別のペイロードを持つ新しいストリームに射影されます。

クエリ テンプレートの記述の詳細については、以下を参照してください。

ステップ 4 - クエリの出力を監視可能なイベント シンクまたは列挙可能なイベント シンクに変換する

次に、クエリからの出力ストリームを列挙可能な結果に変換します。

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

このクエリでは、CTI イベントをフィルターによって除外して、Insert イベントのみを期間イベントの列挙可能なストリームに射影しています。4 つのフィールドを持つ新しい匿名型にイベントのペイロードが格納されます。

ToIntervalEnumerable メソッド以外にも、次のような拡張メソッドがあります。

  • ToPointEnumerable およびToEdgeEnumerable

  • ToPointObservable、ToIntervalObservable、および ToEdgeObservable

これらのメソッドは、基本インターフェイスの IEnumerable および IObservable を拡張する ICepEnumerable インターフェイスや ICepObservable インターフェイスを返します。これらのインターフェイスでは、管理やデバッグのために、名前と説明を指定してクエリを識別できます。

ICepEnumerable インターフェイスや ICepObservable インターフェイスには、選択 (Where) または射影 (Select) によって出力イベントをフィルター処理するヘルパー メソッドも用意されています。次に例を示します。

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

ステップ 5 - 出力を使用する

最後に、クエリの結果を使用します。一般的な LINQ プロバイダーでは遅延評価モデルが使用されているため、コンシューマーが結果の列挙または監視を開始するまでクエリは評価されません。

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}