Erstellen stabiler StreamInsight-Anwendungen

Dieses Thema beschreibt die Schritte zum Erstellen einer stabilen StreamInsight-Anwendung.

Stabilität ist nur in der Premium Edition von StreamInsight verfügbar. Weitere Informationen finden Sie unter Auswählen einer StreamInsight-Edition.

Ein End-to-End-Codebeispiel einer stabilen Anwendung, die Wiedergaben und Deduplizierung verwendet, bietet das Beispiel zur Prüfpunktausführung (Checkpointing) auf der StreamInsight-Beispielseite von Codeplex.

In diesem Thema

  1. Schritt 1. Konfigurieren eines stabilen Servers

  2. Schritt 2. Definieren einer stabilen Abfrage

  3. Schritt 3. Erfassen von Prüfpunkten

  4. Schritt 4. Wiedergeben von Ereignissen im Eingabeadapter

  5. Schritt 5. Entfernen von Duplikaten im Ausgabeadapter

  6. Schritt 6. Wiederherstellung nach einem Fehler

  7. Herunterfahren ohne Deaktivieren der Wiederherstellung

  8. Beispiele

Schritt 1. Konfigurieren eines stabilen Servers

Erforderliche Einstellungen

Geben Sie zum Konfigurieren eines stabilen Servers Werte für die folgenden Konfigurationseinstellungen an, wenn Sie den Server erstellen:

  • Einen Metadatenspeicher. Sie müssen SQL Server Compact verwenden, um Metadaten für den Server zu speichern. Metadaten können nicht im Arbeitsspeicher gespeichert werden.

  • Einen Protokollpfad. Diese Einstellung legt fest, wo die Prüfpunktdaten für stabile Abfragen gespeichert werden. Der Standardwert ist das Arbeitsverzeichnis des StreamInsight-Prozesses. Die zugehörige Einstellung CreateLogPathIfMissing gibt an, ob das angegebene Verzeichnis erstellt werden soll, falls es nicht vorhanden ist.

Die Konfiguration eines Servers für Stabilität ermöglicht die Erfassung von Prüfpunkten, aktiviert die Erfassung aber noch nicht. Informationen zum Auslösen von Prüfpunkten finden Sie unter Schritt 3. Erfassen von Prüfpunkten.

Verwalten des Protokollpfads für Prüfpunkte

  • Um nicht autorisierte Lesezugriffe oder Änderungen an den Prüfpunktdateien zu verhindern, müssen Sie sicherstellen, dass die Berechtigungen des enthaltenden Ordners so festgelegt sind, dass nur vertrauenswürdige Entitäten Zugriff haben.

  • Jede Instanz von StreamInsight sollte einen eigenen Protokollpfad verwenden.

  • Stellen Sie sicher, dass der Prozess, der StreamInsight hostet, Lese- und Schreibzugriff auf den angegebenen Ordner hat.

  • Bearbeiten Sie den Inhalt des Ordners nicht. StreamInsight löscht Prüfpunktdateien, wenn sie nicht mehr benötigt werden.

Prozessexterne Server

Bei prozessexternen Servern, mit denen der Client über einen Aufruf von Server.Connect eine Verbindung herstellt, wird die Stabilitätskonfiguration von der Person bereitgestellt, die den Server eingerichtet hat. Wenn der prozessexterne Server über eine Stabilitätskonfiguration verfügt, kann der Client diese Konfiguration verwenden. Wenn der Server keine Stabilitätskonfiguration beinhaltet, kann der Client keine Stabilitätsfunktionen verwenden.

Methoden zur Angabe von Stabilitätsoptionen

Sie können die Stabilitätseinstellungen mit einer der folgenden Methoden angeben:

  • Geben Sie die Einstellungen programmgesteuert an, indem Sie die Stabilitätskonfiguration im Server.Create-Aufruf bereitstellen.

  • Deklarieren Sie die Einstellungen in der Konfigurationsdatei der Anwendung.

    • Bei einem prozessinternen Server ist dies die Datei app.config.

    • Bei einem prozessexternen Server ist dies die Datei StreamInsightHost.exe.config, die sich im StreamInsight-Installationsordner und dort im Ordner Host befindet.

Wenn Sie beide Methoden verwenden, überschreiben die Einstellungen, die Sie im API-Aufruf angeben, die Einstellungen in der Konfigurationsdatei.

Programmgesteuertes Erstellen eines stabilen Servers

Das folgende Beispiel zeigt, wie programmgesteuert ein prozessinterner stabiler Server erstellt werden kann. Weitere ausführliche Beispiele finden Sie unter Beispiele. Versuchen Sie, alle Ausnahmen abzufangen, die zu einem Fehler bei der Prüfpunktausführung führen würden, wenn Sie Server.Create aufrufen.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

Erstellen eines stabilen Servers über eine Deklaration

Das folgende Beispiel zeigt, wie ein stabiler Server durch Deklaration in einer Konfigurationsdatei erstellt werden kann.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

OBEN

Schritt 2. Definieren einer stabilen Abfrage

Führen Sie zum Erstellen einer stabilen Abfrage folgende Schritte im Code aus.

  1. Prüfen Sie vor dem Erstellen einer neuen Abfrage, ob die Abfrage bereits in den Metadaten vorhanden ist. Wenn die Abfrage bereits vorhanden ist, deutet dies darauf hin, dass für die Anwendung eine Wiederherstellung nach einem Fehler durchgeführt wurde. Die Abfrage darf dann nicht neu erstellt, sondern muss im Code neu gestartet werden.

  2. Wenn die Abfrage nicht in den Metadaten vorhanden ist, erstellen Sie sie, und definieren Sie die Abfrage als stabil, indem Sie true im IsResilient-Parameter der ToQuery-Methode angeben. Sie können auch die Application.CreateQuery-Methode mit dem IsResilient-Parameter aufrufen.

Die Konfiguration einer Abfrage als stabil ermöglicht die Erfassung von Prüfpunkten, aktiviert die Erfassung aber noch nicht. Informationen zum Auslösen von Prüfpunkten finden Sie unter Schritt 3. Erfassen von Prüfpunkten.

Beispiel für die Definition einer stabilen Abfrage

Weitere ausführliche Beispiele finden Sie unter Beispiele.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

OBEN

Schritt 3. Erfassen von Prüfpunkten

Sobald die Abfrage bzw. die Abfragen ausgeführt werden, können Sie periodisch Prüfpunkte erfassen, um den Status der Abfragen aufzuzeichnen.

Die API-Methoden, die die Prüfpunktausführung unterstützen, folgen dem üblichen Muster für asynchrone Vorgänge.

  1. Rufen Sie zum Aufrufen eines Prüfpunkts die BeginCheckpoint-Methode auf. Wenn Sie den optionalen AsyncCallback angeben, wird der Rückruf aufgerufen, sobald der Prüfpunkt abgeschlossen wurde. Das IAsyncResult, das vom BeginCheckpoint-Aufruf zurückgegeben wird, identifiziert diese Prüfpunktanforderung und kann später in Aufrufen von EndCheckpoint oder CancelCheckpoint verwendet werden.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. Die EndCheckpoint-Methode blockiert, bis der Prüfpunktvorgang abgeschlossen ist. Wenn der Prüfpunktvorgang erfolgreich ist, gibt der Aufruf true zurück. Wenn Fehler auftreten, löst der Aufruf eine Ausnahme aus.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. Sie können auch CancelCheckpoint aufrufen, um den Prüfpunktausführungsprozess abzubrechen. Wenn der CancelCheckpoint-Aufruf erfolgreich ist, gibt ein anschließender Aufruf von EndCheckpoint den Wert false zurück.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

Dieses asynchrone Muster kann auf drei verschiedene Arten verwendet werden:

  • Auf einen Aufruf von BeginCheckpoint kann ein Aufruf von EndCheckpoint folgen. EndCheckpoint blockiert in diesem Fall, bis der Prüfpunktvorgang abgeschlossen ist, und gibt anschließend das Ergebnis (bzw. die Ausnahme) zurück. Bei diesem Muster werden asyncCallback und asyncState üblicherweise nicht verwendet.

  • BeginCheckpoint kann aufgerufen werden. Der Benutzer kann anschließend die IsCompleted-Eigenschaft des zurückgegebenen IAsyncResult abrufen. Wenn IsCompleted den Wert true hat, kann EndCheckpoint aufgerufen werden, um das Ergebnis abzurufen. Bei diesem Muster werden asyncCallback und asyncState üblicherweise nicht verwendet.

  • BeginCheckpoint kann mit einer Rückrufmethode aufgerufen werden. In diesem Fall kann asyncState verwendet werden, um den Aufruf zu identifizieren und alle benötigten Informationen an die Rückrufmethode zurückzugeben. Sobald der Rückruf ausgeführt wird, ruft er EndCheckpoint auf, um das Ergebnis abzurufen.

Die EndCheckpoint-Methode muss aufgerufen werden, unabhängig davon, welches Muster verwendet wird, selbst dann, wenn der Prüfpunkt abgebrochen wird. Diese Methode ist die einzige Möglichkeit für den Benutzer, einen Rückgabewert vom Aufruf zu erhalten, und die einzige Möglichkeit für StreamInsight festzustellen, ob der Aufruf abgeschlossen ist. Sie können keinen anderen Prüfpunkt beginnen, bis EndCheckpoint aufgerufen wurde.

Fehler, die während des Prüfpunktausführungsprozesses auftreten, beenden und beeinflussen nicht die zugehörigen Abfragen. Wenn Sie eine Abfrage beenden, während ein Prüfpunktvorgang ausgeführt wird, wird der Prüfpunkt abgebrochen.

OBEN

Schritt 4. Wiedergeben von Ereignissen im Eingabeadapter

Um die Wiedergabe von Ereignissen als Teil der Wiederherstellung zu unterstützen, muss die Eingabeadapterfactory entweder die IHighWaterMarkInputAdapterFactory-Schnittstelle oder die IHighWaterMarkTypedInputAdapterFactory-Schnittstelle implementieren. Der Aufruf der Create-Methode der Adapterfactory stellt dann die Obergrenzenmarkierung bereit, mit der der Adapter die wiederzugebenden Ereignisse identifizieren kann.

Um sicherzustellen, dass die Ausgabe vollständig ist, müssen alle Eingabeadapter alle Ereignisse im physischen Datenstrom wiedergeben, die bei oder nach der Position aufgetreten sind, die durch die Obergrenzenmarkierung angegeben wird.

OBEN

Schritt 5. Entfernen von Duplikaten im Ausgabeadapter

Um das Entfernen von Duplikaten als Teil der Wiederherstellung zu unterstützen, muss die Ausgabeadapterfactory entweder die IHighWaterMarkOutputAdapterFactory-Schnittstelle oder die IHighWaterMarkTypedOutputAdapterFactory-Schnittstelle implementieren. Der Aufruf der Create-Methode der Adapterfactory stellt dann die Obergrenzenmarkierung und den Offsetwert bereit, mit denen der Adapter die doppelten Werte identifizieren kann. Dieser Offset ist erforderlich, da die Position im Ausgabedatenstrom, die dem Prüfpunkt entspricht, auf jeden Punkt im Datenstrom fallen kann.

Wenn die Abfrage zum ersten Mal gestartet wird, wird die Create-Methode der Adapterfactory ohne die Obergrenzenmarkierung und ohne den Offsetwert aufgerufen. Wenn der Server bisher noch keine Prüfpunkte für die Abfrage erfasst hat, wird die Create-Methode der Adapterfactory mit der Obergrenzenmarkierung DateTime.MinValue und dem Offset 0 (null) aufgerufen.

Wenn eine Abfrage ordnungsgemäß wiedergegeben wird, werden alle Ereignisse, die nach Erfassung des letzten Prüfpunkts, aber vor dem Ausfall erzeugt wurden, beim Neustart erneut erzeugt. Diese Duplikate muss der Ausgabeadapter entfernen. Die Art der Entfernung hängt vom Ausgabeadapter ab: Die ursprünglichen Kopien können verworfen, oder die doppelten Kopien können ignoriert werden.

Um sicherzustellen, dass die Ausgabe äquivalent ist, müssen alle Eingabeadapter Eingabeereignisse ordnungsgemäß wiedergeben, und alle Ausgabeadapter müssen alle doppelten Ereignisse im physischen Datenstrom entfernen, die vor dem Ausfall und bei oder nach der Position aufgetreten sind, die durch den Offset der Obergrenzenmarkierung angegeben ist.

OBEN

Schritt 6. Wiederherstellung nach einem Fehler

Der Server führt beim Start automatisch eine Wiederherstellung durch und versetzt alle Abfragen in einen konsistenten Status. Dies ist ein asynchroner Vorgang. Daher kehrt der Aufruf von Server.Create zurück, bevor die Wiederherstellung abgeschlossen ist.

  • Nicht stabile Abfragen werden auf den Status "Beendet" gesetzt. Dieses Verhalten hat sich nicht geändert.

  • Stabile Abfragen werden auf den Status "Initialisierung" gesetzt. Anschließend lädt der Server die gespeicherten Prüfpunktinformationen.

Ab diesem Zeitpunkt können Sie Start aufrufen, um die Abfragen neu zu starten. Stabile Abfragen werden neu gestartet, sobald die Initialisierung abgeschlossen ist.

Der Startcode muss für die Wiederherstellung nach einem Fehler folgende Schritte ausführen:

  1. Die Liste mit den Abfragen für die Anwendung muss aus den Metadaten abgerufen werden.

  2. Für jede Abfrage muss geprüft werden, ob die Abfrage bereits in den Metadaten vorhanden ist.

    1. Wenn die Abfrage bereits vorhanden ist, wird sie neu gestartet.

    2. Wenn die Abfrage nicht in den Metadaten vorhanden ist, muss sie erstellt und als stabil definiert werden, wie unter Schritt 2. Definieren einer stabilen Abfrage beschrieben.

Wenn während der Wiederherstellung selbst ein Problem auftritt, können Sie den Server ohne Stabilität neu starten.

OBEN

Herunterfahren ohne Deaktivieren der Wiederherstellung

Sie können den Server herunterfahren, ohne die Wiederherstellung zu deaktivieren, indem Sie die Dispose-Methode des Server aufrufen.

  • Nicht stabile Abfragen werden beendet.

  • Stabile Abfragen werden angehalten. Sobald Sie den Server neu starten, wird der Server versuchen, den Status der angehaltenen Abfragen wiederherzustellen. Um dieses Verhalten zu verhindern, beenden Sie die Abfragen vor dem Herunterfahren.

Die Metadaten für sowohl nicht stabile als auch für stabile Abfragen werden bei diesem Herunterfahren des Servers beibehalten.

OBEN

Beispiele

Ein End-to-End-Codebeispiel einer stabilen Anwendung, die Wiedergaben und Deduplizierung verwendet, bietet das Beispiel zur Prüfpunktausführung (Checkpointing) auf der StreamInsight-Beispielseite von Codeplex.

OBEN

Definieren einer stabilen Abfrage mit dem expliziten Entwicklungsmodell

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Prüfpunktausführung – das Rückruf-Rendezvous-Modell

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Siehe auch

Konzepte

StreamInsight-Stabilität

Erstellen stabiler StreamInsight-Anwendungen

Überwachen stabiler StreamInsight-Anwendungen

Problembehandlung für stabile StreamInsight-Anwendungen