Création d'applications StreamInsight résistantes

Cette rubrique indique les opérations permettant de créer une application StreamInsight résistante.

La résistance est disponible uniquement dans l'édition Premium de StreamInsight. Pour plus d'informations, voir Choix d'une édition StreamInsight.

Pour visualiser des exemples de code de bout en bout complets d'une application résistante qui comprend des fonctions de relecture et de déduplication, consultez l'exemple de point de contrôle de la page d'exemples StreamInsight sur le site Codeplex.

Dans cette rubrique

  1. Étape 1 : Configuration d'un serveur résistant

  2. Étape 2 : Définition d'une requête résistante

  3. Étape 3 : Capture des points de contrôle

  4. Étape 4 : Relecture d'événements dans l'adaptateur d'entrée

  5. Étape 5 : Élimination des doublons dans l'adaptateur de sortie

  6. Étape 6 : Récupération après échec

  7. Arrêt sans désactivation de la récupération

  8. Exemples

Étape 1 : Configuration d'un serveur résistant

Paramètres requis

Pour configurer un serveur résistant, indiquez des valeurs pour les paramètres de configuration suivants lorsque vous créez le serveur.

  • Magasin de métadonnées. Vous devez utiliser SQL Server Compact pour stocker les métadonnées du serveur. Les métadonnées ne peuvent pas être stockées en mémoire.

  • Chemin d'accès du journal. Ce paramètre détermine l'emplacement de stockage des données du point de contrôle pour les requêtes résistantes. Par défaut, la valeur du chemin d'accès est le répertoire de travail du processus StreamInsight. Un paramètre connexe, CreateLogPathIfMissing, détermine si ce répertoire doit être créé lorsqu'il n'existe pas.

Configurer un serveur pour la résistance rend la capture de points de contrôle possible, mais n'entraîne pas la capture des points de contrôle. Pour plus d'information sur l'appel des points de contrôle, voir Étape 3 : Capture des points de contrôle.

Gestion du chemin d'accès au journal des points de contrôle

  • Pour éviter la lecture ou la falsification non autorisée des fichiers de points de contrôle, assurez-vous que les autorisations du dossier qui les contient sont définies de sorte que seules les entités approuvées y ont accès.

  • Chaque instance de StreamInsight doit posséder son propre chemin d'accès.

  • Vérifiez que le processus qui héberge StreamInsight bénéficie d'un accès en écriture au dossier spécifié.

  • Ne modifiez pas le contenu du dossier. StreamInsight supprime les fichiers de points de contrôle lorsqu'ils deviennent inutiles.

Serveurs extra-processus

En cas de serveur extra-processus auquel le client se connecte en appelant Server.Connect, la configuration de résistance est fournie par la personne qui configure le serveur. Si le serveur extra-processus possède une configuration de résistance, le client peut alors l'utiliser en tant que serveur configuré. Si le serveur ne possède pas de configuration de résistance, le client ne peut pas utiliser les fonctions de résistance.

Méthodes de spécification des options de résistance

Pour définir les paramètres de résistance, procédez selon l'une des méthodes suivantes :

  • Définissez les paramètres par programme en indiquant la configuration de résistance lorsque vous appelez Server.Create.

  • Définissez les paramètres de façon déclarative dans le fichier de configuration de l'application.

    • Pour un serveur intra-processus, il s'agit du fichier app.config.

    • Pour un serveur extra-processus, il s'agit du fichier StreamInsightHost.exe.config qui se trouve dans le dossier Host sous le dossier d'installation de StreamInsight.

Si vous utilisez les deux méthodes, les paramètres que vous indiquez dans l'appel d'API remplacent ceux du fichier de configuration.

Création d'un serveur résistant par programme

L'exemple suivant indique comment créer un serveur intra-processus résistant par programme. Pour des exemples plus détaillés, voir Exemples. Essayez d'intercepter les exceptions qui entraînent la réalisation des points de contrôle lorsque vous appelez Server.Create.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

Création d'un serveur résistant de façon déclarative

L'exemple suivant indique comment créer un serveur résistant de façon déclarative en utilisant un fichier de configuration.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

HAUT

Étape 2 : Définition d'une requête résistante

Pour créer une requête résistante, ajoutez les étapes suivantes dans votre code.

  1. Avant de créer une nouvelle requête, vérifiez si elle existe déjà dans les métadonnées. Si tel est le cas, cela signifie que l'application a récupéré suite à une erreur. Votre code doit relancer la requête plutôt que la recréer.

  2. Si la requête n'existe pas dans les métadonnées, créez-la, puis configurez-la en tant que requête résistante. Pour ce faire, indiquez true pour le paramètre IsResilient de la méthode ToQuery. Vous avez également la possibilité d'appeler la méthode Application.CreateQuery avec le paramètre IsResilient.

Configurer une requête pour la résistance rend la capture de points de contrôle possible, mais n'entraîne pas la capture des points de contrôle. Pour plus d'information sur l'appel des points de contrôle, voir Étape 3 : Capture des points de contrôle.

Exemple de définition d'une requête résistante

Pour des exemples plus détaillés, voir Exemples.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

HAUT

Étape 3 : Capture des points de contrôle

Une fois la ou les requêtes exécutées, capturez régulièrement les points de contrôle afin d'enregistrer l'état des requêtes.

Les méthodes d'API qui prennent en charge la réalisation des points de contrôle suivent le schéma classique d'une opération asynchrone.

  1. Pour lancer un point de contrôle, appelez la méthode BeginCheckpoint. Si vous indiquez le mot clé facultatif AsyncCallback, il sera appelé une fois le point de contrôle terminé. IAsyncResult qui est renvoyé de l'appel vers la méthode BeginCheckpoint identifie la demande de point de contrôle et peut être utilisé ultérieurement dans les appels à la méthode EndCheckpoint ou CancelCheckpoint.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. La méthode EndCheckpoint est bloquée jusqu'à ce que le point de contrôle terminé. Si le point de contrôle est réalisé correctement, l'appel renvoie la valeur true. Si une erreur se produit, l'appel génère une exception.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. Vous pouvez aussi appeler la méthode CancelCheckpoint pour annuler la réalisation du point de contrôle. Lorsque l'appel à la méthode CancelCheckpoint est correctement effectué, l'appel suivant à la méthode EndCheckpoint renvoie la valeur false.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

Ce schéma asynchrone peut être utilisé de trois manières différentes :

  • L'appel à la méthode BeginCheckpoint peut être suivi d'un appel à la méthode EndCheckpoint. EndCheckpoint se bloque alors jusqu'à la fin de point de contrôle et renvoie ensuite le résultat (ou une exception). Dans ce schéma, asyncCallback et asyncState ne sont en principe pas utilisés.

  • Vous pouvez appeler la méthode BeginCheckpoint, puis interroger la propriété IsCompleted de la valeur IAsyncResult renvoyée. Lorsque la propriété IsCompleted a la valeur true, EndCheckpoint peut être appelée pour l'extraction du résultat. Dans ce schéma, asyncCallback et asyncState ne sont en principe pas utilisés.

  • BeginCheckpoint peut être appelée à l'aide d'une méthode de rappel. Dans ce cas, asyncState permet d'identifier l'appel et de renvoyer toutes les informations nécessaires à la méthode de rappel. Lorsque le rappel est exécuté, il appelle EndCheckpoint pour extraire le résultat.

La méthode EndCheckpoint doit être appelée, quel que soit le modèle utilisé et même si le point de contrôle est annulé. Cette méthode est la seule qui permet à l'utilisateur d'obtenir une valeur de l'appel et représente le seul moyen pour StreamInsight de savoir que l'appel est terminé. Vous ne pouvez pas lancer un autre point de contrôle tant que vous n'avez pas appelé EndCheckpoint.

Les erreurs qui surviennent au cours de la réalisation des points de contrôle n'arrêtent ni n'affectent les requêtes associées. Si vous arrêtez une requête alors qu'un point de contrôle est en cours, celui-ci est annulé.

HAUT

Étape 4 : Relecture d'événements dans l'adaptateur d'entrée

Pour prendre en charge la relecture d'événements dans le cadre de la récupération, la fabrique d'adaptateurs d'entrée doit implémenter l'interface IHighWaterMarkInputAdapterFactory ou IHighWaterMarkTypedInputAdapterFactory. L'appel à la méthode Create de la fabrique d'adaptateur transmet ensuite la limite supérieure à l'adaptateur afin que ce dernier puisse identifier les événements à relire.

Pour signifier à la sortie que la procédure est terminée, tous les adaptateurs d'entrée doivent relire dans le flux physique tous les événements qui ont lieu au niveau de la position indiquée par la limite supérieure ou après celle-ci.

HAUT

Étape 5 : Élimination des doublons dans l'adaptateur de sortie

Pour prendre en charge la suppression de doublons dans le cadre de la récupération, la fabrique d'adaptateurs de sortie doit implémenter l'interface IHighWaterMarkOutputAdapterFactory ou IHighWaterMarkTypedOutputAdapterFactory. L'appel à la méthode Create de la fabrique d'adaptateur transmet ensuite la limite supérieure et la valeur du décalage à l'adaptateur afin que ce dernier puisse identifier les doublons. Ce décalage est nécessaire car le point de contrôle peut se trouver n'importe où dans le flux de sortie.

Lors du premier démarrage de la requête, la méthode Create de la fabrique d'adaptateur est appelée sans limite supérieure ni décalage. Si le serveur n'a pas encore capturé de point de contrôle pour la requête, la méthode Create de la fabrique d'adaptateur est appelée avec une limite supérieure DateTime.MinValue et un décalage égal à 0 (zéro).

Si la requête est relue correctement, les événements produits après la capture du dernier point de contrôle mais avant la défaillance seront générés au redémarrage. Ce sont les doublons que l'adaptateur de sortie doit supprimer. L'adaptateur de sortie décide de la méthode de suppression : la copie d'origine peut être abandonnée ou il peut se contenter d'ignorer les doublons.

Pour signifier à la sortie qu'il existe des équivalents, tous les adaptateurs d'entrée doivent relire correctement dans le flux physique les événements d'entrée, et les adaptateurs de sortie doivent supprimer tous les doublons d'événements qui se sont produits avant la défaillance et qui ont lieu au niveau de la position indiquée par le décalage de limite supérieure ou après celui-ci.

HAUT

Étape 6 : Récupération après échec

Le serveur effectue automatiquement une récupération au démarrage et place toutes les requêtes dans un état cohérent. Il s'agit d'une opération asynchrone. Par conséquent, l'appel à la méthode Server.Create renvoie des résultat avant la fin de la récupération.

  • Les requêtes non résistantes sont placées dans l'état Arrêté. Ce comportement n'a pas changé.

  • Les requêtes résistantes sont placées dans l'état Initialisation. Le serveur charge ensuite les informations du point de contrôle enregistrées.

Vous pouvez appeler la méthode Start à ce stade pour redémarrer les requêtes. Les requêtes résistantes seront redémarrées dès la fin de l'initialisation.

Pour effectuer une récupération après défaillance, le code de démarrage doit réaliser les étapes suivantes :

  1. Extraire la liste des requêtes de l'application à partir des métadonnées.

  2. Pour chaque requête, vérifiez si elle existe déjà dans les métadonnées.

    1. Si elle existe, redémarrez-la.

    2. Si la requête n'existe pas dans les métadonnées, créez-la, puis configurez-la en tant que requête résistante comme indiqué à l'Étape 2 : Définition d'une requête résistante.

Si un problème survient au cours de la récupération, vous pouvez redémarrer le serveur sans résistance.

HAUT

Arrêt sans désactivation de la récupération

Vous pouvez arrêter le serveur sans désactiver la récupération. Pour ce faire, appelez la méthode Dispose de Server.

  • Les requêtes non résistantes sont arrêtées.

  • Les requêtes résistantes sont suspendues. Lorsque vous redémarrez le serveur, celui-ci tente de récupérer l'état des requêtes suspendues. Pour empêcher ce comportement, arrêtez les requêtes avant d'arrêter le serveur.

Les métadonnées des requêtes résistantes et non résistances sont conservées lorsque vous arrêtez le serveur de cette manière.

HAUT

Exemples

Pour visualiser des exemples de code de bout en bout complets d'une application résistante qui comprend des fonctions de relecture et de déduplication, consultez l'exemple de point de contrôle de la page d'exemples StreamInsight sur le site Codeplex.

HAUT

Définition d'une requête résistante à l'aide d'un modèle de développement explicite

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Réalisation de points de contrôle : modèle de rappel Rendezvous

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Voir aussi

Concepts

Résistance de StreamInsight

Création d'applications StreamInsight résistantes

Contrôle des applications StreamInsight résistantes

Résolution des problèmes liés aux applications StreamInsight résistantes