End-to-End Example of an Enumerable Event Source and Event Sink (StreamInsight)

This simple end-to-end demonstrates the use of an event source and event sink that implement the IEnumerable interface to create a complete StreamInsight application:

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

This example uses StreamInsight in combination with SQL Server and the ADO.NET Entity Framework to answer a time-related query over historical data from the Northwind sample database. The query finds time intervals during which more than 3 orders were active within a region.

This example uses an IEnumerable event source. The steps for using an event source that implements IObservable are similar. However an observable output pushes data to the observer – the consumer does not have to pull the data, as it does when calling foreach over an enumerable source.

Step 1 - Provide an observable or enumerable event source

First, define the source data for the query by issuing a LINQ to Entities query over the Northwind database. The result (databaseQuery) by default implements the IEnumerable interface.

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

Step 2 - Transform the input to a stream and describe the temporal characteristics of the stream

Next, transform the result of the query into a stream of interval events:

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

This code uses a helper on the AdvanceTimeSettings class - IncreasingStartTime - to insert CTI events after each event with a delay of 0 (zero). Alternately, you can use StrictlyIncreasingStartTime to specify a delay of -1 tick (thus placing the CTI one tick after the event’s start time), or UnorderedTimestamps to specify a custom value for delay.

Then, the CreateInsert method of the IntervalEvent class converts the source data to a stream of events by providing the OrderDate as the start time, the Shipped Date as the end time of the interval, and the ShipRegion as the payload of the event.

An analogous CreateInsert method is provided by the PointEvent class, while the EdgeEvent class has CreateStart and CreateEnd methods. All 3 of the event classes have a CreateCti method for inserting CTI events procedurally, rather than through the declarative use of AdvanceTimeSettings.

Step 3 - Write a query

Next, write the time-aware StreamInsight query that is appropriate for the incoming stream of events:

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

This query groups the events in each time interval by region, then selects only those intervals where more than 3 events were active. It projects the results into a new stream with an entirely different payload that includes the count of active orders and the identifier for the ShipRegion.

For more information about writing query templates, see:

Step 4 - Convert the query output to an observable or enumerable event sink

Next, transform the output stream from the query into an enumerable result:

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

This query filters out the CTI events and projects only the insert events into an enumerable stream of interval events. A new anonymous type with 4 fields contains the payload of the events.

In addition to the ToIntervalEnumerable method, related extension methods include:

  • ToPointEnumerable and ToEdgeEnumerable

  • ToPointObservable, ToIntervalObservable, and ToEdgeObservable

These methods return ICepEnumerable or ICepObservable interfaces that extend the base IEnumerable and IObservable interfaces by providing a query name and query description to identify the query for management and debugging purposes.

The ICepEnumerable or ICepObservable interfaces also provide helper methods that filter output events through selection (Where) or projection (Select). For example:

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

Step 5 - Consume the output

Finally, consume the results of the query. Note that due to the deferred evaluation model of the typical LINQ provider, queries are not evaluated until the consumer begins to enumerate or observe the results:

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}