Création d'adaptateurs d'entrée et de sortie

Cette rubrique fournit les informations générales dont vous avez besoin pour créer des adaptateurs d'entrée et de sortie pour votre application CEP à l'aide de la plateforme StreamInsight. Les adaptateurs sont des transformateurs de logiciel qui remettent des événements dans ou hors d'un serveur StreamInsight.

Fonctionnement et contrôle de flux d'événements

Lors de la création d'adaptateurs, il est important de comprendre le fonctionnement d'un flux d'événements sur le serveur StreamInsight et de savoir comment les adaptateurs d'entrée et de sortie contrôlent ce flux. Comme indiqué dans l'illustration suivante, le flux d'événements est unidirectionnel de la source, via la requête active, jusqu'au récepteur. Les événements sont lus à partir d'une source par l'adaptateur d'entrée, lequel les remet ensuite à la requête. Les événements d'entrée, ou les nouveaux événements résultant du traitement des événements d'entrée, sont acheminés d'un opérateur au suivant dans la requête. La requête remet les événements traités à l'adaptateur de sortie, qui remet les événements au récepteur. L'illustration représente un scénario dans lequel une requête StreamInsight est liée à deux instances d'adaptateur d'entrée a1 et a2, et à une instance d'adaptateur de sortie a4.

Flux d'événements de l'adaptateur d'entrée vers l'adaptateur de sortie

Si le flux d'événements est unidirectionnel de la source au récepteur, le contrôle de flux et d'exécution pour la récupération et le transfert d'événements à certains points d'interaction entre les composants peut être bidirectionnel. Ces points d'interaction s'affichent en tant que READ, ENQUEUE, DEQUEUE et WRITE dans l'illustration.

L'implémentation de l'adaptateur d'entrée doit effectuer l'opération READ à l'aide des mécanismes d'accès spécifiques au périphérique source (tel qu'un fichier ou une base de données) et effectuer l'opération ENQUEUE à l'aide des API de l'adaptateur. De la même façon, l'implémentation de l'adaptateur de sortie doit effectuer l'opération WRITE à l'aide des mécanismes d'accès spécifiques au périphérique récepteur et effectue l'opération DEQUEUE à l'aide des API de l'adaptateur. Vous devez implémenter les opérations DEQUEUE et ENQUEUE selon un modèle de conception spécifié par un diagramme de transition d'états d'adaptateur, décrit plus loin dans cette rubrique.

En ce qui concerne le contrôle de flux d'événements, vous pouvez imaginer que les événements sont émis d'un fournisseur à un consommateur (flèches pleines de gauche à droite) ou bien sont extraits par le consommateur depuis un fournisseur (flèches à crochet). Aux points d'interaction READ et WRITE, l'implémentation de l'adaptateur peut adopter les transmission de type « push » (émission) ou « pull » (extraction) pour le contrôle de flux d'événements. Certains facteurs doivent être considérés pour cette interaction, notamment les taux d'événements que la source ou le récepteur peut prendre en charge, la capacité de votre adaptateur d'accélérer la source ou le récepteur, et toutes les capacités de mise en mémoire tampon que vous pouvez implémenter.

Pour les périphériques source qui pompent des événements à une latence très basse et sont difficiles à accélérer, l'approche typique consiste à implémenter un adaptateur où le périphérique source émet des événements dans l'adaptateur. Les capteurs (événements machine), les téléscripteurs et les ports réseau sont quelques exemples de tels périphériques. Pour les périphériques avec des latences plus hautes (fichiers, bases de données), considérez une implémentation où l'adaptateur extrait les données de la source. De la même façon, côté sortie, un adaptateur de sortie pour un périphérique qui peut accepter des événements à débit très élevé peut être implémenté pour émettre des événements dans le périphérique. Pour les périphériques de sortie plus lents vous pouvez adopter une approche où le périphérique appelle l'adaptateur chaque fois qu'il est prêt à consommer des événements.

Au point d'interaction ENQUEUE, le serveur StreamInsight prend en charge un modèle d'émission. Cela signifie que le modèle de conception de l'adaptateur vous autorise à mettre en file d'attente autant d'événements que le moteur peut consommer, à tout moment. Au point d'interaction DEQUEUE, le serveur StreamInsight prend en charge un modèle d'extraction. Cela signifie que le modèle de conception de l'adaptateur vous permet d'extraire des événements du serveur aussi rapidement que le permet le moteur.

Comme vous pouvez le constater, la stratégie d'accélération du serveur StreamInsight est très simple. Supposons une requête directe simple sans opérations de blocage. Le taux auquel le serveur StreamInsight peut consommer des événements provenant d'un adaptateur d'entrée au point d'interaction ENQUEUE est limité uniquement par le taux auquel l'adaptateur de sortie peut consommer des événements provenant du serveur au point d'interaction DEQUEUE. La vitesse à laquelle le serveur StreamInsight émet sur l'adaptateur d'entrée pendant ENQUEUE est déterminée par la rapidité de la requête à libérer la sortie, et par la rapidité de l'adaptateur de sortie à consommer cette sortie. StreamInsight fournit un ensemble complet de vues de diagnostic qui vous aident à mesurer les taux d'événements au niveau de chacun de ces points d'interaction. Pour plus d'informations, consultez Contrôle du serveur et des requêtes StreamInsight.

Tâches de développement d'adaptateur

Utilisez la liste de contrôle suivante pour développer l'adaptateur.

  • Déterminez le type d'adaptateur (entrée ou sortie) requis.

    Un adaptateur d'entrée lit les événements entrants dans le format dans lequel ils sont fournis, puis convertit ces données à un format que le serveur StreamInsight peut consommer.

    Un adaptateur de sortie reçoit les événements traités par le serveur StreamInsight, convertit ces événements à un format attendu par le périphérique de sortie, puis émet les données vers ce périphérique.

  • Déterminez le type d'événement.

    Pour un adaptateur d'entrée, définissez le type d'événement qui décrit la charge utile des événements fournis par la source. Pour un adaptateur de sortie, spécifiez le type d'événement qui décrit la charge utile des événements consommés par le récepteur. Pour plus d'informations sur les charges utiles d'événements, consultez Concepts du serveur StreamInsight.

    Vous spécifiez et générez un adaptateur typé pour une source ou un récepteur qui produit ou consomme toujours les événements d'un format de charge utile fixe, dans lequel le nombre et le type de champs sont connus d'avance. L'avantage principal de l'adaptateur typé est que l'implémentation de la création d'événements pour la mise en file d'attente dans le serveur StreamInsight est relativement facile. Étant donné que les types de champ sont déjà connus, vous pouvez utiliser IntelliSense dans Visual Studio (ou une fonctionnalité équivalente dans un autre environnement de développement intégré) pour remplir les champs.

    Vous spécifiez et générez un adaptateur non typé si la source ou le récepteur produit ou consomme des formats de charge utile différents. L'avantage principal d'un adaptateur non typé est la flexibilité qu'il autorise pour spécifier le type d'événement au moment de la liaison de la requête, ce qui évite de lier l'implémentation de l'adaptateur à un type d'événement spécifique. Comparée à celle de l'adaptateur typé, l'implémentation de l'adaptateur non typé est plus impliquée. L'adaptateur d'entrée non typé doit être écrit de façon à ce qu'il soit en mesure de déterminer le type de chaque champ à partir des paramètres de configuration fournis lors de la liaison de la requête, de remplir les champs un par un, puis de mettre en file d'attente l'événement. De la même façon, l'adaptateur de sortie non typé doit être en mesure de récupérer le résultat du traitement des requêtes d'un événement retiré de la file d'attente, en fonction des informations de configuration fournies à la sortie.

    Il est important de noter qu'une instance d'adaptateur (typé ou non typé) liée à la requête émet toujours des événements qui contiennent des charges utiles d'un type spécifique. Pour plus d'informations, consultez Création de types d'événements.

  • Déterminez le modèle de l'événement.

    Déterminez le modèle de l'événement pour les événements de sortie et entrée. StreamInsight prend en charge trois modèles d'événement : point, intervalle et session. Si la source fournit des événements ayant un modèle d'événement fixe, vous pouvez concevoir un adaptateur d'entrée pour ce modèle d'événement seul. De la même façon, si le récepteur requiert des événements d'un modèle particulier, vous pouvez concevoir un adaptateur de sortie pour ce modèle d'événement seul. Toutefois, dans la plupart des applications, tous les modèles d'événement sont nécessaires à un type d'événement particulier. Nous vous recommandons de générer un adaptateur typé ou non typé pour chacun des modèles d'événement. Pour plus d'informations sur les modèles d'événement, voir Concepts du serveur StreamInsight.

    Les classes AdapterFactory d'entrée et de sortie vous permettent d'assembler ces adaptateurs. L'adaptateur correct peut être instancié au moment de la liaison de la requête selon paramètres de configuration.

  • Choisissez la classe de base d'adaptateur correspondante.

    Selon le type et le modèle d'événement, sélectionnez la classe de base d'adaptateur appropriée. La nomenclature de la classe suit le modèle [Typé] [Point | Intervalle | Session] [Entrée | Sortie]. Les adaptateurs non typés n'ont pas le préfixe typé.

    Type d'adaptateur

    Classe de base d'adaptateur d'entrée

    Classe de base d'adaptateur de sortie

    Point typé

    TypedPointInputAdapter

    TypedPointOutputAdapter

    Point non typé

    PointInputAdapter

    PointOutputAdapter

    Intervalle typé

    TypedIntervalInputAdapter

    TypedIntervalOutputAdapter

    Intervalle non typé

    IntervalInputAdapter

    IntervalOutputAdapter

    Session typée

    TypedEdgeInputAdapter

    TypedEdgeOutputAdapter

    Session non typée

    EdgeInputAdapter

    EdgeOutputAdapter

    Pour plus d'informations, voir Microsoft.ComplexEventProcessing.Adapters.

  • Concevez les classes AdapterFactory d'entrée et de sortie.

    Une classe AdapterFactory est une classe de conteneur pour les adaptateurs. Vous devez implémenter une classe de fabrique. Les classes de fabrique de base sont organisées comme indiqué ci-dessous.

    Type d'adaptateur

    Classe de base d'adaptateur d'entrée

    Classe de base d'adaptateur de sortie

    Typé

    ITypedInputAdapterFactory

    ITypedOutputAdapterFactory

    Non typé

    IInputAdapterFactory

    IOutputAdapterFactory

    Typé avec prise en charge de la résistance

    IHighWaterMarkTypedInputAdapterFactory

    IHighWaterMarkTypedOutputAdapterFactory

    Non typé avec prise en charge de la résistance

    IHighWaterMarkInputAdapterFactory

    IHighWaterMarkOutputAdapterFactory

    La classe de fabrique a les fonctions suivantes :

    • Elle permet le partage de ressources entre différentes implémentations d'adaptateur pour une classe de dispositifs donnée (fichier CSV, base de données SQL Server, serveur web Common Log Format) ou une condition d'application donnée, et facilite l'envoi des paramètres de configuration au constructeur de l'adaptateur. Par exemple, une application peut requérir les trois modèles d'événement (point, intervalle et session) à la fois. Une fabrique unique peut prendre en charge trois implémentations d'adaptateur, une pour chaque modèle d'événement. L'application peut aussi avoir la même source d'événement, par exemple une table de base de données, mais cette source génère plusieurs structures de charge utile d'événement de la même source, en fonction des requêtes exécutées. Dans ce cas, une même fabrique peut prendre en charge diverses implémentations d'adaptateur pour gérer chaque structure de charge utile.

    • Elle fournit une passerelle entre l'adaptateur et l'exécution du serveur. Le développeur d'adaptateurs doit implémenter les méthodes Create() et Dispose() dans la fabrique d'adaptateurs pour la classe d'adaptateur. Le serveur appelle ces méthodes lors du démarrage ou de l'arrêt d'une requête.

    • Elle fournit une passerelle entre l'adaptateur et les informations de configuration avant l'exécution. Cela est particulièrement important pour les adaptateurs non typés, qui doivent déterminer le type de chaque champ de la structure à partir des paramètres de configuration fournis lors de la liaison de la requête. Vous pouvez définir la structure de configuration dans la classe de fabrique et passer cette structure de configuration via la méthode Create() à la méthode du constructeur de votre classe d'adaptateur. Cette structure de configuration est sérialisée à l'aide de DataContractSerialization. Cette contrainte mise à part, la méthodologie de développement vous donne une flexibilité totale pour la définition et l'utilisation de cette structure de configuration, en effet vous déterminez librement la façon dont la configuration est remplie et utilisée dans le constructeur d'adaptateur.

    • Elle offre un moyen de générer des CTI, sans les mettre explicitement en file d'attente via l'adaptateur d'entrée. En implémentant l'interface ITypedDeclareAdvanceTimePolicy (pour une fabrique d'adaptateurs typée) et IDeclareAdvanceTimePolicy (pour une fabrique d'adaptateurs non typée) dans la classe de fabrique d'adaptateurs, l'utilisateur peut spécifier la fréquence et les horodateurs CTI. Cela simplifie le code de l'adaptateur et peut affecter chaque flux d'événements généré par la fabrique via ses instances d'adaptateur. Pour plus d'informations, voir [AdvanceTimeSettingsClass].

    • Dans les applications résistantes, elle prend en charge la résistance en indiquant à l'adaptateur d'entrée la limite supérieure de relecture des événements manqués, et en signalant à l'adaptateur de sortie la limite supérieure et le décalage en ce qui concerne la suppression des doublons d'événements. Pour plus d'informations, voir Résistance de StreamInsight.

  • Générez et testez l'adaptateur.

    Compilez et générez l'adaptateur comme un assembly .NET. Testez l'adaptateur en effectuant des opérations de base à l'aide d'une requête relais simple qui lit les événements d'un adaptateur d'entrée et les dirige vers l'adaptateur de sortie, sans traitement de requête complexe. Cela permet de vérifier que l'adaptateur lit et écrit à partir/vers des périphériques et qu'il est capable de mettre en file d'attente ou d'enlever de la file d'attente des événements.

Machine à états de l'adaptateur

La machine à états qui définit l'interaction entre un adaptateur et le serveur StreamInsight est la même pour les adaptateurs d'entrée et de sortie. C'est important parce que la machine à états vous fournit un modèle de développement cohérent. La machine à états est affichée dans l'illustration suivante.

Diagramme d'état de mise en file d'attente et de suppression de la file d'attente par l'adaptateur

Les fonctionnalités principales et les conditions de fonctionnement de la machine à états sont décrits ci-après :

  • Start() et Resume() sont les méthodes appelées par le serveur StreamInsight et doivent être implémentées comme le développeur d'adaptateurs. De plus, vous devez également implémenter la méthode de constructeur pour votre classe d'adaptateur et la méthode Dispose(), héritée de la classe de base.

  • Ensuite, l'implémentation doit appeler les méthodes suivantes fournies dans le kit de développement logiciel (SDK) de l'adaptateur :

    • Enqueue() pour l'adaptateur d'entrée. Cette méthode retourne la valeur EnqueueOperationResult.Success ou EnqueueOperationResult.Full.

    • Dequeue() pour l'adaptateur de sortie. Cette méthode retourne la valeur DequeueOperationResult.Success ou DequeueOperationResult.Empty.

    • Ready(). Cette méthode retourne une valeur booléenne TRUE ou FALSE.

    • Stopped(). Cette méthode retourne une valeur booléenne TRUE ou FALSE.

  • Le serveur StreamInsight appelle la méthode interne de façon asynchrone (dénotée comme StopQuery()) pour le compte de l'utilisateur lorsqu'un administrateur ou un développeur de requête arrête l'exécution de la requête via des méthodes dans l'API du serveur.

  • Les appels de Enqueue() et Dequeue() retournent respectivement l'état Full et Empty lorsque l'adaptateur est dans l'un des états suivants :

    • Suspendu

    • Arrêt en cours

  • Les appels de Enqueue() ou Dequeue() provoquent une exception à lever lorsque l'adaptateur est dans l'un des états suivants :

    • Créé

    • Arrêté

  • Les appels de Ready() provoquent une exception à lever lorsque l'adaptateur est dans l'un des états suivants :

    • Créé

    • En cours d'exécution

    • Arrêté

  • Un adaptateur passe par une partie ou l'ensemble des cinq états (Créé, En cours d'exécution, Suspendu, Arrêt en cours et Arrêté) pendant son fonctionnement. Une transition d'état se produit avant que le serveur StreamInsight n'appelle Start() ou Resume() et après que l'adaptateur a appelé Enqueue(), Dequeue(), Ready() et Stopped().

  • Le serveur StreamInsight et l'adaptateur ne partagent jamais le même thread. Le serveur appelle toujours Start() ou Resume() dans un thread de travail distinct. Le serveur obtient ce thread d'un pool de threads de système d'exploitation pour le compte de l'adaptateur. Pour cela, les méthodes Start() et Resume() doivent pouvoir utiliser avec flexibilité le thread de travail autant que nécessaire (par exemple, pour engendrer plus de threads pour les lectures ou les écritures asynchrones). Face à cette situation, vous devez être attentif et appliquer les meilleures pratiques dans votre utilisation des ressources système de ce thread.

  • L'API élimine le besoin de synchronisation inhérente entre les opérations (threads) Start() et Resume(). Le serveur appelle toujours Resume() après que (et uniquement après que) Ready() a été appelée par l'adaptateur. Toutefois, notez que la synchronisation peut être requise pour les tâches de lecture, d'écriture ou de mise en mémoire tampon d'événements de périphérique, surtout dans des scénarios d'E/S asynchrones. Nous vous conseillons d'utiliser des E/S non bloquantes.

  • Si l'adaptateur peut être inactif, il doit vérifier périodiquement l'état pour déterminer si son arrêt a été demandé.

Cycle de vie de l'interaction entre l'adaptateur et le serveur

La négociation entre le serveur StreamInsight et l'adaptateur est toujours synchrone. Par conséquent, à tout point de son exécution, l'adaptateur peut vérifier son état et réagir en conséquence. Le cycle de vie de l'interaction entre l'adaptateur et le serveur StreamInsight comprend les opérations suivantes, qui correspondent à la machine à états affichée dans l'illustration précédente.

  • Créé

    Une instance d'adaptateur commence à interagir avec le serveur StreamInsight lorsque la requête est démarrée (en passant un appel correspondant dans l'API du serveur StreamInsight).

  • Exécution en cours

    Le serveur passe l'adaptateur à l'état Exécution en cours, appelle de façon asynchrone Start() sur l'adaptateur et autorise cet appel une seule fois. Lorsque l'adaptateur est en cours d'exécution, il peut mettre en file d'attente ou retirer de la file d'attente des événements dans ou depuis le serveur.

    Idéalement, l'adaptateur reste en cours d'exécution la plupart du temps. Le modèle de conception recommandé consiste à appeler le lecteur ou la routine de l'enregistreur, de préférence dans un thread séparé, à partir de la méthode Start(), et à retourner la routine Start(), en désaffectant ainsi rapidement le thread de travail.

    La routine du lecteur (prenons ProduceEvents () comme exemple) lit des événements dans la source et appelle Enqueue() pour émettre des événements dans le serveur. Dans le cas d'un adaptateur de sortie, une routine d'enregistreur (prenons ConsumeEvents () comme exemple) appelle Dequeue() pour extraire des événements du serveur et les écrire dans le récepteur.

  • Suspendu

    Lorsque le serveur ne peut pas recevoir un événement mis en file d'attente, ou retirer un événement de la file d'attente, l'adaptateur d'entrée ou de sortie passe à l'état suspendu. Les appels de Enqueue() et Dequeue() retournent par conséquent à l'état FULL et EMPTY respectivement. À l'état suspendu, vous pouvez implémenter des opérations de service telles qu'enregistrer la position du dernier enregistrement de lecture depuis la base de données ou de la ligne depuis le fichier. À la fin de cette section facultative, vous devez appeler la méthode Ready() pour indiquer au serveur que l'adaptateur est prêt pour la reprise. Si la routine s'exécute sur le même thread de travail que Start() lui-même, vous devez retourner la routine Start() elle-même.

  • En réponse à un appel Ready(), le serveur retourne l'adaptateur à l'état d'exécution et appelle toujours Resume() de façon asynchrone sur un thread de travail différent. Vous pouvez concevoir Resume() pour qu'il mette ou retire de la file d'attente la dernière itération non réussie, puis vous pouvez appeler ProduceEvents() ou ConsumeEvents(). Ce modèle peut être répété tant que l'adaptateur passe à l'état Arrêté ou Arrêt en cours.

  • Arrêt en cours

    À tout point de l'état En cours d'exécution ou Suspendu, le serveur peut placer l'adaptateur à l'état Arrêt en cours en réponse à une demande asynchrone d'arrêter la requête. Dans cet état, l'appel de Enqueue() ou Dequeue() retourne aussi l'état FULL ou EMPTY, respectivement.

    L'état Arrêt en cours fournit à l'implémentation de l'adaptateur une zone de transit pour préparer correctement l'arrêt. Vous pouvez implémenter l'adaptateur pour désaffecter toutes les ressources qu'il a obtenues (threads, mémoire) puis, vous pouvez appeler la méthode Stopped(). Tant que cette méthode n'est pas appelée, le serveur n'arrête pas l'adaptateur.

    Notez que l'adaptateur peut être passé de façon asynchrone à l'état Arrêt en cours. L'adaptateur doit pouvoir détecter qu'il est dans l'état Arrêt en cours. Comme expliqué plus haut, le modèle de conception de l'adaptateur lui indique d'appeler Ready() lorsqu'il est suspendu. En réponse, le serveur appelle la méthode Resume() une fois de plus, permettant de cette façon la détection de l'état Arrêt en cours dans la méthode Resume(). Comme meilleure pratique, nous vous recommandons de placer le contrôle de l'état Arrêt en cours comme premier bloc de code dans votre implémentation Start() et Resume().

  • Arrêté

    Le code d'adaptateur peut appeler Stopped() à tout moment. Cela met l'adaptateur dans l'état Arrêté. Nous vous conseillons de nettoyer les ressources que l'adaptateur a obtenues avant d'appeler Stopped().

    Important

    En cas d'échec lors de l'appel de la méthode Stopped(), la dernière page de mémoire associée à la requête reste allouée. Cela peut provoquer de petites fuites de mémoire qui peuvent s'accumuler au fil du temps s'il y a de nombreux cycles de démarrage et d'arrêt de requête dans un processus.

    Dans l'état Arrêté, l'adaptateur ne peut pas faire référence à des constructions spécifiques à un serveur StreamInsight ou à une mémoire d'événements, ni effectuer des opérations de mise en file d'attente ou de suppression de la file d'attente. De telles actions lèvent une exception. Toutefois, les activités de nettoyage de périphérique et du système d'exploitation peuvent continuer.

Exemples

Pour obtenir des exemples d'adaptateurs de sortie et d'entrée et de fabriques d'adaptateurs, consultez la page StreamInsight Samples (Exemples StreamInsight).

Voir aussi

Concepts

Concepts du serveur StreamInsight

Architecture du serveur StreamInsight