TopK

 

本主题中的示例演示如何使用 TopK 操作对窗口流中的事件进行排名,并且基于排名的升序或降序返回前 N 个事件。 TopK 通过 Take() 扩展方法进行指定,该方法需要IQOrderedWindow<T> 类型的流。 此类型在指定 orderby 子句后将立即生成。 orderby 子句必须对 IQWindowedStreamable<T> 流内的事件进行操作。 因此,必须对输入流应用快照或跳跃(翻转)窗口运算符。

System_CAPS_ICON_note.jpg 注意


对于 CEPStream<T> 流,Take() 扩展方法需要 CepOrderedStream<T> 类型的流。 本主题中的讨论适用于 IQStreamableCEPStream 流。

在具有重复值的情况下,TopK 将返回超过 N 个事件,因此在行为上始终是确定性的。

TopK 不使用基于计数的窗口。

TopK 作为对时间不敏感的运算符实现。 因此,输出事件的生存期将根据窗口的输出策略进行设置。

示例

下面的示例从为输入流 inputStream 定义的每个快照窗口获取排名最高的五个事件,并生成一个新事件流。 每个窗口中的事件按负载字段 e.f 中值的升序以及负载字段 e.i 中值的降序进行排序。

// Assuming the following input event type for inputStream:   
public class MyPayload   
{  
  public int f;   
  public int i;   
}  
  
// IQStreamable example:  
var topfive = (from window in inputStream.Snapshot()  
               from b in  
                   (from e in window  
                    orderby e.f ascending, e.i descending  
                    select e).Take(5)  
               select b;  
  
// CEPStream example:  
var topfive = (from window in inputStream.Snapshot()  
               from e in window  
               orderby e.f ascending, e.i descending  
               select e).Take(5);  
  

您可以在 Take 子句中使用投影,如下例所示。

  
// IQStreamable example:  
var result = (from win in source  
                       .TumblingWindow(TimeSpan.FromMinutes(10))  
                   from b in   
                       (from r in win  
                        orderby r.Value descending  
                        select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new  
                                              {  
                                                  ranking = e.Rank,  
                                                  e.Payload. scaled,  
                                                  e.Payload.SourceId  
                                              })  
                   select b;  
  
// CEPStream example:  
var result = (from win in source  
                       .TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)  
                   from r in win  
                   orderby r.Value descending  
                   select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new  
                                         {  
                                             ranking = e.Rank,  
                                             e.Payload. scaled,  
                                             e.Payload.SourceId  
                                         });  
  

如果需要将实际排名结果提取到事件的负载中,将使用相应的 lambda 表达式。 随后可通过 Rank 属性访问排名,同时可通过 Payload 属性访问负载字段。

  
// IQStreamable example:  
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2))  
               from b in  
                   (from e in window  
                    orderby e.f ascending, e.i descending  
                    select e).Take(3, e => new  
                    {  
                        ranking = e.Rank,  
                        f = e.Payload.f,  
                        i = e.Payload.i  
                    })  
               select b;  
  
  
// CEPStream example:  
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)  
               from e in window  
               orderby e.f ascending, e.i descending  
               select e).Take(3, e => new  
               {  
                   ranking = e.Rank,  
                   f = e.Payload.f,  
                   i = e.Payload.i  
               });  
  

下图显示了 TopK 结果的临时属性。 该示例使用具有 TopK 的跳跃窗口,这样,将从每个窗口中选出其负载字段 val 具有最高负载值的两个事件。

  
// IQStreamable example:  
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10))  
               from b in  
                   (from e in window  
                    orderby e.val descending  
                    select e).Take(2)  
               select b;  
  
  
// CEPStream example:  
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)  
               from e in window  
               orderby e.val descending  
               select e).Take(2);  
  

橙色框表示这些窗口。 该图显示 TopK 是如何将所选负载的生存期设置为窗口大小的。 在图中,我们在输入流中采用点事件,并且显示其负载字段 val 的值。 请注意,ClipToWindowEnd 的指定输出策略将窗口时间范围分配给结果事件。

TopK

理解对于重复值的 TopK 的行为是十分重要的。 假定下面这个例子,其中,输入流包含重复值,并且计算针对翻转窗口的前 2 个值。 在这个例子中,使用 PointAlignToWindowEnd 的默认输出策略:

带有 PointAlignToWindowEnd 策略的 TopK 运算符

第一个窗口中的前 2 个值是 3 和 2。 其排名字段具有这些值之一的所有事件都将包含在结果中,在此示例中将包含三个事件。

请参阅

StreamInsight 服务器概念
使用事件窗口
跳跃窗口
快照窗口