时间戳修改

本主题中的示例演示如何使用运算符修改事件的时间戳。通过更改事件时间戳,您可以更改事件对后续操作(如联接或对窗口的聚合等)的影响。以下 LINQ 扩展方法演示了此功能。

改变事件时间

ShiftEventTime() 运算符根据指定的表达式更改流中每个事件的开始时间。

以下示例将流中每个事件的时间往后推 15 分钟。

// shift events by 15 minutes into the future.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromMinutes(15)); 

以下示例将流中每个事件的时间往前推 1 小时。

// shift events by 1 hour into the past.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromHours(-1));

用于指定时间推移的表达式可以引用当前事件的开始时间,而不引用其结束时间或负载。此推移不影响事件的生存期或负载。

对负无穷大的时间值进行编码时要考虑值 DateTime.MinValue。如果事件开始时间具有此值并且在指定的表达式中引用该开始时间(而不是某个常量),则不对该表达式求值,开始时间仍将为 DateTime.MinValue。如果不是这样,则将在运行时对表达式求值,其结果仍可能导致溢出异常。

请注意,指定的时间推移也适用于通过此运算符传递的 CTI 事件,因为 ShiftEventTime 影响流中所有事件的开始时间。

更改事件的持续时间

AlterEventDuration() 运算符可更改事件的生存期。事件生存期指定事件保持有效的时间段。持续时间定义为事件上的一个函数,因此它可以根据事件的开始时间、结束时间或负载计算得出。

以下示例将事件的持续时间设置为 1 小时。

// set event duration to 1 hour.
var onehour = inputStream.AlterEventDuration(e => TimeSpan.FromHours(1));

以下示例将事件的持续时间设置为其当前生存期的两倍。

// double event duration. 
var doubled = inputStream.AlterEventDuration(e => (e.EndTime - e.StartTime) * 2);

对正无穷大的时间值进行编码时要考虑值 DateTime.MaxValue。如果事件结束时间具有此值并且在指定的表达式中引用该结束时间,则不对该表达式求值,结束时间仍将为 DateTime.MaxValue。

同时更改事件的位移和持续时间

为了便于表述,AlterEventLifetime() 运算符将 AlterEventDuration 和 ShiftEventTime 函数合二为一。

AlterEventLifeTime() 方法的第一个参数指定新的开始时间戳,并可以引用当前事件的开始时间。此参数必须指定为 UTC 时间。第二个参数指定新的生存期,它可以引用当前事件的开始时间、结束时间和负载字段。

以下示例在将新生存期指定为第二个参数时,将事件时间往前推 1 分钟,但保持事件的结束时间不变(通过让原始生存期增加一分钟)。

// shift event 1 minute into the past, but leave the end time (event duration) unchanged.
var newStream = inputStream.AlterEventLifetime(e => e.StartTime - TimeSpan.FromMinutes(1),
                                               e => e.EndTime - e.StartTime + TimeSpan.FromMinutes(1));]

请注意,指定的开始时间推移也适用于通过此运算符传递的 CTI 事件。

另请参阅本主题上文中关于 DateTime.MinValue 和 DateTime.MaxValue 的备注。

将流转换为点事件流

ToPointEventStream 运算符是一个提供便利的函数,用于将边缘和间隔事件转换为点事件(通过将事件的生存期修改为事件开始时间之前的一个时钟周期),如以下示例中所示。

var pointStream = inputStream.ToPointEventStream();

在间隔事件转换为点事件时,只保留事件的开始时间。

剪辑事件的持续时间

ClipEventDuration 运算符采用两个流作为参数,并且根据第二个流中下一匹配事件的开始时间更改第一个流中每个事件的生存期。

到现在为止,我们已看到允许您按固定时间范围更改事件的生存期的运算符。ClipEventDuration 运算符提供“针对其他事件”调整事件生存期的非常灵活的方法。通常,对一个流指定该运算符,并且该运算符取另一个流作为参数以及匹配的条件。该运算符将第一个流中每个事件的生存期剪辑到满足匹配条件的其他流中“下一个”事件(根据应用程序时间)的开始时间。

例如,假定两个流 stream1 和 stream2,它们承载含负载字段“Id”的事件。下面的语句将 stream1 中的所有事件剪辑到 stream2 中具有相同“Id”值的下一事件:

var clipped = stream1.ClipEventDuration(stream2, (e1, e2) => e1.Id == e2.Id);

匹配条件作为对两个输入负载的表达式给出。在下图中说明此语句的语义:

ClipEventDuration 的语义

该图显示 stream1 中 Id = A 的第一个事件是如何剪辑到 stream2 中 Id = A 的下一个事件的。stream1 中 Id = B 的其他事件不剪辑,因为 stream2 中的下一个匹配事件仅在 stream1 中该事件结束后发生。

该剪辑行为适合于很广范围的应用程序。它可以满足的一个常见要求就是将点流转换为连续间隔流(也称作“信号”)。

点到信号转换

在这个例子中,您首先需要扩展所有点事件,以便它们实际到达下一个事件。换言之,您需要应用一个超时,该超时确定一个事件要持续到下一个事件发生需要多长的时间。此超时可以是无限或有限的时间范围。现在假定一个 60 秒的超时:

var extended = input.AlterEventDuration(e => TimeSpan.FromSeconds(60));

准备好后,我们可以使用 ClipEventDuration 运算符,并且将流本身作为其参数提供。这将导致每个事件被剪辑到同一流中的下一事件,并且创建间隔事件的连续系列。因为只有第二个流的开始时间对于该剪辑操作有意义,所以,我们也可以使用原始点流:

var signal = extended.ClipEventDuration(input, (e1, e2) => true);

在这里,匹配条件的求值结果始终为 true,并且假定我们正在查看的是单个逻辑流,也就是说,该流中的所有事件都与单个数据源相关联。

下图说明通过 ClipEventDuration 运算符执行点到信号转换的影响:

具有 ClipEventDuration 的点到信号转换

两个 LINQ 语句可以合并成单个语句:

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => true);

如果该流包含多个逻辑流(例如,来自多个设备的度量值或多个股票的值),则相应键(设备 ID 或股票代码)将需要在布尔表达式中匹配:

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => e1.Symbol == e2.Symbol);

创建会话

针对 ClipEventDuration 的另一个用例是创建会话事件,以便给在此类会话期间发生的事件添加批注。现在假定以下事件架构,并且描述某些用户交互的事件:

public class EventType
{
    public int UserId;
    public string Type;
    public DateTime Time;
    public byte[] data;
};

在这个示例中,负载字段 Type 可以是“start”、“end”或“other”,分别描述用户会话的开始、会话的结束以及会话期间的用户事件。字段 Time 包含交互操作的时间戳,而 data 包含进一步的信息。该任务是使用该事件发生期间会话的开始时间给每个事件添加批注。此外,我们假定每个会话在 10 分钟后超时。

以下关系图显示了此应用场景中示例事件的系列:

使用 ClipEventDuration 创建会话事件

首先,根据超时的持续时间扩展将应用于“start”类型的所有事件:

var sessionStarts = from e in input
                    where e.Type == “start”
                    select e;
var sessionStartsExt = sessionStarts.AlterEventDuration(e => TimeSpan.FromMinutes(10));

接下来,对于每个用户 ID,这些会话事件需要剪辑到其相应末尾:

var sessionEnds = from e in input
                  where e.Type == “end”
                  select e;
var sessions = sessionStartsExt.ClipEventDuration(sessionEnds, (e1, e2) => e1.UserId == e2.UserId);

下面的关系图说明这些语句:

使用 ClipEventDuration 剪切会话事件

现在,这些会话事件可以联接到其余的事件:

var sessionActivity = from e in input
                      where e.Type == “other”
                      select e;
var annotated = from s1 in sessions
                join s2 in sessionActivity
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

在该联接中,我们能够引用 sessionActivity 事件以及会话事件的字段,以便我们可以汇集加批注的 sessionActivity 事件,并且将会话开始时间拖入每个 sessionActivity 事件:

将会话事件联接到其他事件

因为该联接条件与 UserId 等效,所以对于 UserId=X 的这个特定会话,不考虑 sessionActivity 中 UserId=Y 的事件。

这些 LINQ 语句可以压缩成一个更简洁的集合:

var sessions = input
                 .Where(e => e.Type == “start”)
                 .AlterEventDuration(e => TimeSpan.FromMinutes(10))
                 .ClipEventDuration(input.Where(e => e.Type == “end”), (e1, e2) => e1.UserId == e2.UserId);
var annotated = from s1 in sessions
                join s2 in input.Where(e => e.Type == “other”)
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

请参阅

概念

StreamInsight 服务器概念