Advancing Application Time

StreamInsight developers must balance the needs of data sources that may have out-of-order data with the requirements to process events in a highly lively manner. While advancing application time more quickly will help to reduce latency, it reduces the window for late-arriving data (that is, the ability for data to arrive out of order). StreamInsight provides various ways to reason about application time. This topic describes the different levels and policies of advancing application time that can be set at the adapter level and with query bindings.

Understanding the Temporal Model

The temporal model of StreamInsight is based only on application time and never on system time. This means that all temporal operators refer to the timestamp of the events and never to the system clock of the host machine. As a result, applications must communicate their current application time to the StreamInsight server. Application time for a given application depends on many different aspects in the context of the application. Ultimately, it is the responsibility of the application developer to provide the appropriate application time to the StreamInsight server. The main considerations for application time are as follows:

  • Data sources

    When data sources communicate temporal information, that data can be used to identify the point in time that all events from the data source have been received. This point in time constitutes the current application time with respect to this data source. Note that different data sources may proceed at different speeds.

  • Out-of-order data

    With some data sources, events do not always arrive in the order of their timestamps. That is, the data is out of order. StreamInsight can accommodate out-of-order data and ensures that results do not depend on the order in which events arrive at the StreamInsight server. StreamInsight developers can advance application time with some slack to allow out-of-order events to trickle in for those data sources that have late-arriving events.

  • Result liveliness

    StreamInsight queries output results that are known to be accurate up to the current application time. This means that results emerge from StreamInsight queries as they are finalized by the progress of overall application time.

Current Time Increments (CTIs)

During query processing, application time is driven by current time increment (CTI) events. A CTI is a punctuation event that is a central component of the StreamInsight temporal model. CTIs are used to commit sequences of events and release computed results to the query output by asserting to the StreamInsight server that certain parts of the timeline will not change anymore. Hence, it is crucial to enqueue CTIs along with events into the input event stream in order to produce any result and to flush the state of stateful operators.

By enqueueing a CTI, the input promises not to produce any subsequent event that would influence the period before the CTI’s timestamp. This implies that, after a CTI has been enqueued in the input:

  • For events of shape Point, Interval, or Edge start: The event’s start time needs to be at or after the CTI.

  • For events of shape Edge end: The event’s end time needs to be at or after the CTI.

If these rules are violated, we speak of a CTI violation. Below, we describe how these violations are handled.

There are three methods to insert CTIs into an input stream.

  1. Enqueue CTIs programmatically through the input adapter, analogous to enqueueing events.

  2. Generate CTIs declaratively with a given frequency. This can be specified through AdvanceTimeGenerationSettings in the adapter factory or as part of the query binding.

  3. Define a separate input stream as a CTI source. This can only be specified in the query binding.

Whenever methods 2 and 3 are implemented, a policy for CTI violations must be implemented as well. In the following section, AdvanceTimeGenerationSettings and the violation policies are described. Subsequent sections describe how to use advance time settings in the adapter factory as well as in the query binding.

CTI Generation

The generation of CTIs (described earlier in methods 2 and 3) has two dimensions:

  1. The generation frequency, which is specified either as a positive integer N or as a time span T. The generation frequency policy inserts a CTI after the occurrence of the event count (N) or time span (T).

  2. The timestamp of the generated CTIs, which is specified as a delay with respect to the last received event.

Additionally, you can use a Boolean flag to indicate whether a final CTI with a timestamp of positive infinity should be inserted when the query is shut down. This is used to flush all remaining events from the query's operators.

The CTI generation is defined through the class AdvanceTimeGenerationSettings, whose constructor takes the frequency, the delay, and the flag, as shown in the following example.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

This example instructs the engine to insert a CTI after every 10 events that come from the event source. The CTI carries a timestamp of the last event's time minus five seconds. This delay mechanism effectively implements a grace period so that the event source can enqueue late events without violating CTI semantics (as long as the events are never more than five seconds late). When the corresponding query is shut down, a CTI with infinite time will be enqueued.

Note that when specifying a frequency for CTI generation through AdvanceTimeSettings, end edges are not taken into account. They are also not considered when using a duration as a frequency. Only start edges are considered in the case of edge events for both frequency and duration.

CTI Violation Policies

It is possible for an event source to violate CTI semantics by sending events with an earlier timestamp than the inserted CTIs. The advance time settings allows for the specification of a policy to handle such occurrences. The policy can have the following two values:

  • Drop

    Events that violate the inserted CTI are dropped and are not enqueued into the query.

  • Adjust

    Events that violate the inserted CTI are modified if their lifetime overlaps with the CTI timestamp. That is, the start timestamp of the events is set to the most recent CTI timestamp, such that those events become valid. If both start and end time of an event fall before the CTI timestamp, then the event is dropped.

Adapter Advance Time Settings

Settings for advancing application time can be specified in the definition of the adapter factory. In the same way that the factory's Create() method is called whenever an adapter is instantiated, a corresponding method to define the advance time settings of the adapter instance is called. To do this, use the interface ITypedDeclareAdvanceTimeProperties for a typed adapter (or IDeclareAdvanceTimeProperties for an untyped adapter), as shown in the following example.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

This interface requires the following method to be implemented as part of the factory.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

The method DeclareAdvanceTimeProperties() is called for each newly instantiated adapter with the same configuration structure and event shape parameter specified in the corresponding Create() method call. This allows the adapter author to derive the correct CTI generation settings from the configuration information without requiring that the query writer and binder be aware of the specifics of the advance time settings.

The AdapterAdvanceTimeSettings constructor requires both the AdvanceTimeGenerationSettings object and the violation policy described earlier.

CTI Generation in the Query Binding

Similar to the AdapterAdvanceTimeSettings, issuing CTIs can be declaratively specified in the query binding as shown in the following example. This allows the user who binds the query to define CTI application time behavior independently of the adapter implementation.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

The AdvanceTimeSettings constructor takes the following three arguments:

  1. An AdvanceTimeGenerationSettings object

  2. An AdvanceTimeImportSettings object

  3. The violation policy

Note that either the Generation settings or the Import settings arguments can be set to null, but not both. In addition, they can be specified together. The next section introduces the AdvanceTimeImportSettings class.

The example above specifies to generate and insert a CTI with every event, with the timestamp of the event (no delay). The AdvanceTimeSettings object can be passed as an optional last parameter to the CepStream.Create() method as shown in the following example.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

It can also be used in the query binder development model:

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

Synchronizing with another Stream

When used during query binding, in addition to (or instead of) generating the CTIs based on a frequency, they can be copied from another input stream to the query by using AdvanceTimeImportSettings. This feature enables the synchronization of two streams as shown in the following example.

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

This example demonstrates a typical use case in which a "fast" data stream must be joined with a "slow" reference stream. The slow stream may be lookup data that changes much less frequently than the fast stream. In order to make the join produce output as fast as its fastest input, the slow input stream is synchronized to the fast stream by importing its CTIs. In this example, the application time handling of the fast stream is considered to happen in the adapter.

Result Liveliness

The delay parameter of the advance time generation settings specifies the timestamp of the inserted CTIs. It is important to understand the precise semantics of CTIs in the StreamInsight framework to achieve the desired effect for output liveliness. A CTI asserts to the engine that everything on the timeline strictly before the CTI's timestamp is committed. This has different implications for the liveliness of the result.

For example, consider an input stream of point events and a CTI generation setting of frequency 1 (every event) and delay 0. This produces CTIs with the exact same timestamp of each point event. However, this means that the very last point event will only be committed with the next CTI because its timestamp is not strictly before the corresponding CTI. In order to commit every point event as soon as it is issued by the adapter, the CTIs must be timestamped immediately after the point events. This translates to a negative delay of one tick, as shown in the following example.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

CTIs and Query Operators

CTIs are enqueued by the input adapter or injected as described above. They propagate through the query and are processed differently by certain operators. Join operators, for example, release their results up to the older CTI from either side. Union operators release the older result of the most recent CTIs from either side. The entire query will release its result only up to the most recent CTI.

On the other hand, certain operators have an effect on CTI timestamps. Hopping windows pull back CTIs within a window to the beginning of the window because the result of the operation on top of the window can change while events still fall into that window. ShiftEventTime() and AlterEventLifeTime() methods both change the start time of events, and the same transformation will be applied to CTIs.

See Also

Concepts

Creating Input and Output Adapters

StreamInsight Server Concepts

Change History

Updated content

Added the section "CTIs and Query Operators".

Added information in the section "CTI Generation" that end edges are not taken into account when specifying a CTI frequency through AdvanceTimeSettings.