聚合

本主题中的示例演示如何针对事件窗口执行聚合操作。 支持的聚合函数如下所示:

  • 数值负载字段的平均值 (avg)。

  • 负载字段的总和 (sum)。

  • 负载字段的最小值 (min)。

  • 负载字段的最大值 (max)。

  • 事件计数 (count)。

聚合是基于集的操作。 即:聚合针对数据子集执行某种计算。 这些子集被指定为事件窗口(沿时间线对事件进行分组)。 按照这种定义,只能将聚合应用到窗口而不是普通事件流。 聚合作为 IQWindowedStreamable<T> 的扩展方法提供。 有关窗口的详细信息,请参阅使用事件窗口

注意注意

对于 CEPStream<T> 流,聚合是对 CepWindowStream<T> 的扩展。 有关开发输入适配器和输出适配器的详细信息,请参阅输入和输出适配器(旧模型)。 本主题中的讨论适用于这两种类型的流。

使用与投影操作相同的语法,聚合操作的结果将成为标量值负载字段。 时段内的聚合是按递增方式计算的。 也就是说,当某个事件进入或离开聚合窗口时,之前的聚合结果会更新。 请注意,这可能会影响数字精度。

avg、sum、min 和 max 全部采用一个输入参数,该参数表示要聚合的值。 通常,这是对事件字段的引用。 该输入参数作为 lambda 表达式提供,如下例所示。 count 聚合对窗口中的全部事件计数,因此不具有参数。

对 Null 值的处理与 Transact-SQL 中的类似函数是一致的:

  • 在 sum 中,Null 视作 0(零)。

  • 在 min 中,任何值都小于 Null。

  • 在 max 中,任何值都大于 Null。

  • 在 TopK 中,在根据负载字段对事件进行排名时,Null 始终小于任何值。

最小值和最大值函数可用于实现比较器(例如数值)、词典(字符串)或临时(日期时间)类型的任何类型。

示例

下面的示例说明如何将聚合函数应用于在跳跃窗口快照窗口中定义的事件。 请注意在本版本中,计数窗口不能用于内置聚合或 TopK。

A.针对跳跃窗口的聚合

跳跃窗口随时间推移按一定间隔和频率进行自我重复。 例如,每五分钟就重新计算一次的一小时内的聚合事件数据就对应一个窗口大小为 1 小时、跳跃大小为 5 分钟的跳跃窗口,如下例中所示。

// Assuming the following input event type for inputStream:
public class MyPayload 
{ 
    public int i; 
    public float f; 
}
var avgHourly = from win in inputStream.HoppingWindow(TimeSpan.FromHours(1), TimeSpan.FromMinutes(5))
                  select new { hourlyavg = win.Avg(e => e.f) }; 

该窗口运算符应用于输入流,从而生成 IQWindowedStreamable<T>。 该流中的每个元素 win 都代表一个包含事件的窗口。

以下示例统计每个小时内的事件数目。 它使用翻转窗口。 这是其跳跃大小等于窗口大小的跳跃窗口的快捷方式。

var countHourly = from win in hourStream.TumblingWindow(TimeSpan.FromHours(1))
                  select new { count = win.Count() };

B.针对快照窗口的聚合

在以下示例中,from 子句对流 inputStream 应用快照窗口。 在本例中,Sum 聚合结果被赋值给负载字段 e.i,Avg 聚合结果基于负载字段 e.f 计算。 此外,还使用了 count 聚合。 该示例还演示了如何在同一语句中组合多个聚合。 所有聚合都在同一窗口内计算。

// Assuming the following input event type for inputStream:
public class MyPayload 
{ 
    public int i; 
    public float f; 
}

// Multiple aggregations over a snapshot window
var snapshotAgg = from w in inputStream.Snapshot()
                  select new { sum = w.Sum(e => e.i),
                               avg = w.Avg(e => e.f),
                               count = w.Count() };

您可以在复杂表达式中嵌入聚合,并且一个表达式可以包含多个聚合,如下例所示。

var result = from w in inputStream.Snapshot()
             select new { ratio = w.Sum(e => e.i) / w.Sum(e => e.f) };

请参阅

概念

时间戳修改

使用事件窗口

跳跃窗口

计数窗口

快照窗口