Vorlauf der Anwendungszeit

StreamInsight-Entwickler müssen die Anforderungen von Datenquellen, die Daten außerhalb der Reihenfolge aufweisen können, mit den Anforderungen zur unmittelbaren Verarbeitung von Ereignissen ausgleichen. Ein schnellerer Vorlauf der Anwendungszeit verringert zwar die Wartezeit, er verringert jedoch auch das Zeitfenster für spät eintreffende Daten (d. h. die Fähigkeit, außerhalb der Reihenfolge eintreffende Daten zu verarbeiten). StreamInsight bietet verschiedene Möglichkeiten zum Berücksichtigen der Anwendungszeit. In diesem Thema werden die verschiedenen Ebenen und Richtlinien für einen Vorlauf der Anwendungszeit beschrieben, die auf Adapterebene und mit Abfragebindungen eingerichtet werden können.

Grundlegendes zum Zeitmodell

Das Zeitmodell von StreamInsight basiert ausschließlich auf der Anwendungszeit und niemals auf der Systemzeit. Dies bedeutet, dass alle Zeitoperatoren auf den Zeitstempel der Ereignisse und niemals auf die Systemuhr des Hostcomputers verweisen. Daher müssen Anwendungen ihre aktuelle Anwendungszeit dem StreamInsight-Server mitteilen. Die Anwendungszeit für eine angegebene Anwendung hängt von vielen unterschiedlichen Aspekten im Kontext der Anwendung ab. Letztlich liegt es in der Verantwortung des Anwendungsentwicklers, die entsprechende Anwendungszeit für den StreamInsight-Server bereitzustellen. Die wichtigsten Überlegungen für die Anwendungszeit lauten wie folgt:

  • Datenquellen

    Wenn Datenquellen Zeitinformationen übermitteln, kann anhand dieser Daten der Zeitpunkt bestimmt werden, zu dem alle Ereignisse aus der Datenquelle empfangen wurden. Dieser Zeitpunkt ist die aktuelle Anwendungszeit in Bezug auf diese Datenquelle. Beachten Sie, dass unterschiedliche Datenquellen möglicherweise in unterschiedlicher Geschwindigkeit ausgeführt werden.

  • Daten außerhalb der Reihenfolge

    Bei einigen Datenquellen treffen Ereignisse nicht immer in der Reihenfolge ihrer Zeitstempel ein. Dies bedeutet, dass die Daten außerhalb der Reihenfolge sind. StreamInsight kann Daten außerhalb der Reihenfolge behandeln und sicherstellen, dass Ergebnisse nicht von der Reihenfolge abhängen, in der Ereignisse auf dem StreamInsight-Server eintreffen. StreamInsight-Entwickler können für Datenquellen mit spät eintreffenden Ereignissen dem Vorlauf der Anwendungszeit Pufferzeit hinzufügen, damit Ereignisse außerhalb der Reihenfolge empfangen werden können.

  • Aktualität von Ergebnissen

    StreamInsight-Abfragen geben Ergebnisse aus, die bekanntermaßen bis zur aktuellen Anwendungszeit exakt sind. Dies bedeutet, dass StreamInsight-Abfragen Ergebnisse erzeugen, sobald sie durch den Fortschritt der Gesamtanwendungszeit abgeschlossen werden.

Aktuelle Zeitinkremente (Current Time Increment, CTI)

Die Anwendungszeit wird während der Abfrageverarbeitung durch CTI-Ereignisse gesteuert. Ein CTI ist ein Interpunktionsereignis, das eine zentrale Komponente des StreamInsight-Zeitmodells ist. CTIs werden verwendet, um einen Commit für Sequenzen von Ereignissen auszuführen und berechnete Ergebnisse für die Abfrageausgabe freizugeben, indem der StreamInsight-Server eine Bestätigung erhält, dass bestimmte Teile der Zeitachse nicht mehr geändert werden. Daher müssen CTIs unbedingt zusammen mit Ereignissen in die Warteschlange für den Eingabeereignis-Datenstrom eingereiht werden, um ein Ergebnis zu erzeugen und den Zustand zustandsbehafteter Operatoren zu leeren.

Durch das Einreihen eines CTI in eine Warteschlange wird gewährleistet, dass von der Eingabe kein darauf folgendes Ereignis erstellt wird, das sich auf den Zeitraum vor dem Zeitstempel des CTI auswirken würde. Nachdem ein CTI in die Warteschlange für die Eingabe eingereiht wurde, impliziert dies Folgendes:

  • Bei Ereignissen der Art "Punkt", "Intervall" oder "Edge Start" gilt: Die Startzeit des Ereignisses muss auf oder nach dem CTI liegen.

  • Bei Ereignissen der Art "Edge end" gilt: Die Endzeit des Ereignisses muss auf oder nach dem CTI liegen.

Werden diese Regeln gebrochen, handelt es sich um eine CTI-Verletzung. Im Folgenden wird beschrieben, wie Verletzungen behandelt werden.

Es gibt drei Methoden, um CTIs in einen Eingabedatenstrom einzufügen.

  1. Programmgesteuertes Einreihen von CTIs in die Warteschlange über den Eingabeadapter, analog zum Einreihen von Ereignissen in die Warteschlange.

  2. Deklaratives Generieren von CTIs mit einer angegebenen Frequenz. Dies kann durch AdvanceTimeGenerationSettings in der Adapterfactory oder als Teil der Abfragebindung angegeben werden.

  3. Definieren eines eigenen Eingabedatenstroms als CTI-Quelle. Dies kann nur in der Abfragebindung angegeben werden.

Wenn Methode 2 und 3 implementiert werden, muss immer auch eine Richtlinie für CTI-Verletzungen implementiert werden. Im folgenden Abschnitt werden AdvanceTimeGenerationSettings und die Verletzungsrichtlinien beschrieben. In nachfolgenden Abschnitten wird beschrieben, wie Vorlaufzeiteinstellungen in der Adapterfactory sowie in der Abfragebindung verwendet werden.

CTI-Generierung

Die Generierung von CTIs (weiter oben in Methode 2 und 3 beschrieben) weist zwei Dimensionen auf:

  1. Die Generierungsfrequenz, die entweder als positive Ganzzahl N oder als Zeitraum T angegeben wird. Die Richtlinie für die Generierungsfrequenz fügt eine CTI nach einem Vorkommen der Ereignisanzahl (N) oder des Zeitraums (T) ein.

  2. Der Zeitstempel der generierten CTIs, der als Verzögerung in Bezug auf das letzte empfangene Ereignis angegeben wird.

Darüber hinaus können Sie mit einem booleschen Flag angeben, ob ein abschließendes CTI mit einem Zeitstempel von positiv unendlich eingefügt werden soll, wenn die Abfrage beendet wird. Hiermit werden alle restlichen Ereignisse aus den Operatoren der Abfrage geleert.

Die CTI-Generierung wird mit der Klasse AdvanceTimeGenerationSettings definiert, deren Konstruktor die Frequenz, die Verzögerung und das Flag akzeptiert, wie im folgenden Beispiel gezeigt.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

In diesem Beispiel wird das Modul angewiesen, nach jeweils 10 Ereignissen aus der Ereignisquelle ein CTI einzufügen. Das CTI enthält einen Zeitstempel mit der Zeit des letzten Ereignisses minus fünf Sekunden. Dieser Verzögerungsmechanismus implementiert eine Toleranzperiode, damit die Ereignisquelle verspätete Ereignisse in die Warteschlange einreihen kann, ohne gegen CTI-Semantik zu verstoßen (solange die Verspätung der Ereignisse niemals mehr als fünf Sekunden beträgt). Wenn die entsprechende Abfrage beendet wird, wird ein CTI mit unendlicher Zeit in die Warteschlange eingereiht.

Beachten Sie, dass keine End-Edges berücksichtigt werden, wenn Sie eine Frequenz für die CTI-Generierung über AdvanceTimeSettings angeben. Sie werden ebenfalls nicht berücksichtigt, wenn Sie eine Dauer als Frequenz verwenden. Im Falle von Edge-Ereignissen werden für Frequenz und Dauer nur Start-Edges berücksichtigt.

CTI-Verletzungsrichtlinien

Eine Ereignisquelle kann gegen CTI-Semantik verstoßen, indem sie Ereignisse sendet, die einen früheren Zeitstempel als die eingefügten CTIs aufweisen. Die Vorlaufzeiteinstellungen ermöglichen das Angeben einer Richtlinie zum Behandeln solcher Vorkommnisse. Die Richtlinie kann über die folgenden beiden Werte verfügen:

  • Drop

    Ereignisse, die gegen das eingefügte CTI verstoßen, werden gelöscht und nicht in die Warteschlange für die Abfrage eingereiht.

  • Anpassen

    Ereignisse, die gegen das eingefügte CTI verstoßen, werden geändert, wenn sich ihre Lebensdauer mit dem CTI-Zeitstempel überschneidet. Das heißt, der Startzeitstempel der Ereignisse wird auf den letzten CTI-Zeitstempel festgelegt, durch den diese Ereignisse gültig sind. Wenn Start- und Endzeit eines Ereignisses vor der Zeit des CTI-Zeitstempels liegen, wird das Ereignis gelöscht.

Adaptervorlauf-Zeiteinstellungen

Einstellungen für den Vorlauf der Anwendungszeit können in der Definition der Adapterfactory angegeben werden. So wie bei jedem Instanziieren eines Adapters die Create()-Methode der Factory aufgerufen wird, wird eine entsprechende Methode zum Definieren der Vorlaufzeiteinstellungen der Adapterinstanz aufgerufen. Hierzu verwenden Sie die Schnittstelle ITypedDeclareAdvanceTimeProperties für einen typisierten Adapter (oder IDeclareAdvanceTimeProperties für einen nicht typisierten Adapter), wie im folgenden Beispiel gezeigt.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

Für diese Schnittstelle muss die folgende Methode als Teil der Factory implementiert werden.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

Die Methode DeclareAdvanceTimeProperties() wird für jeden neu instanziierten Adapter mit der Konfigurationsstruktur und dem Ereignisformparameter aufgerufen, die im entsprechenden Create()-Methodenaufruf angegeben sind. Dies ermöglicht es dem Adapterersteller, die richtigen CTI-Generierungseinstellungen aus den Konfigurationsinformationen abzuleiten, ohne dass der Abfrageschreiber und -binder die spezifischen Details der Vorlaufzeiteinstellungen kennen müssen.

Der AdapterAdvanceTimeSettings-Konstruktor erfordert sowohl das AdvanceTimeGenerationSettings-Objekt als auch die Verletzungsrichtlinie, die weiter oben beschrieben wurden.

CTI-Generierung in der Abfragebindung

Wie bei den AdapterAdvanceTimeSettings kann die Ausgabe von CTIs deklarativ in der Abfragebindung angegeben werden, wie im folgenden Beispiel gezeigt. Dies ermöglicht es dem Benutzer, der die Abfrage bindet, CTI-Anwendungszeitverhalten unabhängig von der Adapterimplementierung zu definieren.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

Der AdvanceTimeSettings-Konstruktor akzeptiert die folgenden drei Argumente:

  1. Ein AdvanceTimeGenerationSettings-Objekt

  2. Ein AdvanceTimeImportSettings-Objekt

  3. Die Verletzungsrichtlinie

Beachten Sie, dass entweder das Argument für die Generierungseinstellungen oder für die Importeinstellungen auf NULL festgelegt werden kann, jedoch nicht beide Argumente. Sie können außerdem zusammen angegeben werden. Im nächsten Abschnitt wird die AdvanceTimeImportSettings-Klasse eingeführt.

Im obigen Beispiel wird angegeben, dass mit jedem Ereignis ein CTI mit dem Zeitstempel des Ereignisses (ohne Verzögerung) generiert und eingefügt werden soll. Das AdvanceTimeSettings -Objekt kann als optionaler letzter Parameter an die CepStream.Create() -Methode übergeben werden, wie im folgenden Beispiel gezeigt.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

Es kann auch im Abfragebinderentwicklungsmodell verwendet werden:

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

Synchronisieren mit einem anderen Datenstrom

Wenn CTIs während der Abfragebindung verwendet werden, können sie mit AdvanceTimeImportSettings aus einem anderen Eingabedatenstrom in die Abfrage kopiert werden, statt sie auf Grundlage einer Frequenz zu generieren (oder zusätzlich zu diesem Verfahren). Diese Funktion ermöglicht die Synchronisierung von zwei Datenströmen, wie im folgenden Beispiel gezeigt.

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

In diesem Beispiel wird ein typischer Anwendungsfall veranschaulicht, in dem ein schneller Datenstrom mit einem langsamen Referenzdatenstrom verknüpft werden muss. Bei dem langsamen Datenstrom handelt es sich möglicherweise um Suchdaten, die sich viel seltener als der schnelle Datenstrom ändern. Damit der Join die Ausgabe so schnell wie die schnellste Eingabe erzeugt, wird der langsame Eingabedatenstrom durch Importieren der zugehörigen CTIs mit dem schnellen Datenstrom synchronisiert. In diesem Beispiel wird davon ausgegangen, dass die Anwendungszeitbehandlung des schnellen Datenstroms im Adapter erfolgt.

Aktualität von Ergebnissen

Der Verzögerungsparameter der Vorlaufzeitgenerierungseinstellungen gibt den Zeitstempel der eingefügten CTIs an. Die genaue Semantik von CTIs im StreamInsight-Framework muss verstanden werden, um den gewünschten Effekt für die Aktualität der Ausgabe zu erzielen. Ein CTI bestätigt dem Modul, dass für alle Elemente auf der Zeitachse, die sich eindeutig vor der Zeit des CTI-Zeitstempels befinden, ein Commit ausgeführt wird. Dies hat unterschiedliche Implikationen für die Aktualität des Ergebnisses zur Folge.

Angenommen, es sind ein Eingabedatenstrom von Punktereignissen sowie eine CTI-Generierungseinstellung mit der Frequenz 1 (jedes Ereignis) und der Verzögerung 0 vorhanden. Hierdurch werden CTIs erzeugt, bei denen jedes Punktereignis über genau den gleichen Zeitstempel verfügt. Dies bedeutet jedoch, dass für das letzte Punktereignis nur ein Commit mit dem nächsten CTI ausgeführt wird, weil sein Zeitstempel nicht eindeutig früher als das entsprechende CTI ist. Um für jedes Punktereignis einen Commit auszuführen, sobald es vom Adapter ausgegeben wird, müssen die CTIs sofort nach den Punktereignissen mit Zeitstempeln versehen werden. Dies führt zu einer negativen Verzögerung von einer Zeiteinheit, wie im folgenden Beispiel gezeigt.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

CTIs und Abfrageoperatoren

CTIs werden vom Eingabeadapter in die Warteschlange eingereiht oder wie oben beschrieben eingefügt. Sie durchlaufen die Abfrage und werden von bestimmten Operatoren unterschiedlich verarbeitet. Joinoperatoren beispielsweise geben ihre Ereignisse von beiden Seiten an das ältere CTI frei. Union-Operatoren geben das ältere Ergebnis der aktuellsten CTIs von beiden Seiten frei. Die gesamte Abfrage gibt ihre Ergebnisse nur bis zum aktuellen CTI frei.

Andererseits haben bestimmte Operatoren Auswirkungen auf CTI-Zeitstempel. Springende Fenster ziehen CTIs innerhalb eines Fensters zum Anfang des Fensters zurück, da sich das Ergebnis des Vorgangs über dem Fenster ändern kann, während die Ereignisse weiterhin innerhalb dieses Fensters liegen. Die ShiftEventTime()-Methode und die AlterEventLifeTime()-Methode ändern jeweils die Startzeit von Ereignissen. Dieselbe Transformation wird auf CTIs angewendet.

Siehe auch

Konzepte

Erstellen von Eingabe- und Ausgabeadaptern

StreamInsight-Serverkonzepte

Änderungsverlauf

Aktualisierter Inhalt

Der Abschnitt-"CTIs und Abfrageoperatoren" wurde hinzugefügt.

Im Abschnitt-"CTI-Generierung" wurde die Information hinzugefügt, dass End-Edges bei der Angabe einer CTI-Frequenz über AdvanceTimeSettings nicht berücksichtigt werden.