Group and Apply

 

The examples in this topic demonstrate how to partition events into event groups by using the LINQ "group by" functionality. Aggregations and other operations can be performed on the event groups, so that each group is computed separately. The set of operations applied to each group is called apply branch. The apply branch can be provided implicitly within a single group and apply statement, or, if it contains a more complex subquery, as a separate LINQ statement. Note that apply branches are closed within the group and apply construct; for instance it is not possible to join the grouped stream with a stream from outside the grouping.

Note


Only default window policies are supported in StreamInsight 2.1 and later, and this is reflected in the examples in this section. In version 2.0 and earlier you can specify an output policy class, such as SnapshotWindowOutputPolicy.Clip, as shown in the following example:

// version 2.0 and earlier:  
var avgCount = from v in inputStream  
               group v by v.i % 4 into eachGroup  
               from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)  
               select new { avgNumber = window.Avg(e => e.j) };  

Examples

The following example groups events by the specified modulo function. It then applies a snapshot window to each group and computes the average over a payload column on each group separately. Thus, the apply branch consists of the window and the aggregation.

// Assuming the following input event type for inputStream:  
public class MyPayload   
{  
    public int i;   
    public float j;   
}  
  
var avgCount = from v in inputStream  
               group v by v.i % 4 into eachGroup  
               from window in eachGroup.SnapshotWindow()  
               select new { avgNumber = window.Avg(e => e.j) };  
  

The previous example produced a stream with a single payload field, containing the average of the field j within each snapshot window and per group.

You can also group a projection of the original type in a "group by" clause, as shown in the following example.

var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))  
                  group new { myVal = e.Value * 10 } by e.SourceId into g  
                  from win in g.SnapshotWindow()  
                  select new  
                  {  
                      avg = win.Avg(e => e.myVal)  
                   };  

Usually, the grouping key should be retained so that the aggregation result can be associated with the respective group. The next example shows how to retrieve the grouping key.

var avgCount = from v in inputStream  
               group v by v.i % 4 into eachGroup  
               from window in eachGroup.SnapshotWindow()  
               select new { avgNumber = window.Avg(e => e.number),   
                   groupId = eachGroup.Key };  

It is possible to group on several keys, so that each unique key combination in the input stream results in a separate group. In this case, the grouping keys must be contained in an anonymous type definition so that they can be retrieved explicitly in the final projection. Note that all grouping fields must be referenced. The following example groups events by two event payload fields and assigns a new key name to one of them.

// Assuming the following input event type for inputStream:  
public class MyPayload   
{  
    public int section;   
    public string category;   
    public float value;   
}  
  
var avgCount = from v in inputStream  
               group v by new { sec = v.section, v.category } into eachGroup  
               from window in eachGroup.SnapshotWindow()  
               select new { avgNumber = window.Avg(e => e.value),   
                   section = eachGroup.Key.sec,  
                   category = eachGroup.Key.category };  

The apply branch can be more complex, containing a series of operations, as shown in the example below.

// Assuming the following input event type for inputStream:  
public class MyPayload   
{  
    public int section;   
    public string category;   
    public float value;   
}  
  
var result = from s in source  
                group s by s.section into sg  
                from e in  
                    (from e in sg  
                    group e by e.category into cg  
                    from win in cg.TumblingWindow(TimeSpan.FromMinutes(5))  
                    select new { cat = cg.Key, top = win.Max(e => e.value) })  
                select new { sec = sg.Key, e.cat, e.top };  

In the following example, a stream of power meter readings is assumed, containing the data of several meters. The example annotates each reading with the average over the last 10 minutes for the same meter. The query first groups the incoming data by meter ID. In each such group, the average over 10 minutes is computed and joined to the original meter events.

// Assuming the following input event type for sensorStream:  
public class MeterReading  
{  
    public string meterId;   
    public float usage;   
}  
  
var resultB = from s in sensorStream  
              group s by s.meterId into g  
              from e in  
                  (from left in g  
                  from right in  
                      (from win in g  
                          .AlterEventDuration(e => TimeSpan.FromMinutes(10))  
                          .SnapshotWindow()  
                      select new { avg = win.Avg(e => e.usage) })  
                  select new { right.avg, left.usage })  
              select new { slidingAvg = e.avg, e.usage, g.Key };  

As mentioned above, the function that represents the apply branch cannot integrate any other incoming streams except applyIn.

See Also

NIB StreamInsight Server Concepts
Using Event Windows
Snapshot Windows