Share via


Opérateurs de flux de données définis par l'utilisateur

Un opérateur de flux de données défini par l'utilisateur vous permet de personnaliser le traitement des flux d'événements.

Mode d'utilisation

Dans une requête, utilisez la méthode d'extension Scan de CepStream pour appeler un opérateur de flux de données défini par l'utilisateur. Définissez le flux d'entrée et l'état initial de l'opérateur, comme dans l'exemple ci-dessous.

var query = input.Scan(new SmoothingOperator(0.7));

L'auteur de l'opérateur dérive une nouvelle classe des classes abstraites CepPointStreamOperator ou CepEdgeStreamOperator. Le nouveau type comprend la machine à états de l'opérateur. L'opérateur d'analyse appelle le constructeur de ce type de classe afin d'établir l'état initial de l'opérateur.

Caractéristiques d'un opérateur de flux de données défini par l'utilisateur

Un opérateur de flux de données défini par l'utilisateur vous permet d'interagir avec le traitement des flux d'événements à l'aide de procédures. Ainsi, il représente une limite entre le moteur de traitement d'événements StreamInsight et le CLR, tout comme les adaptateurs et les agrégats ou opérateurs définis par l'utilisateur. Dans ces cas de figure, le moteur et le développeur remplissent un contrat au sujet des propriétés temporelles d'un flux de données. Le moteur StreamInsight garantit à l'opérateur de flux de données défini par l'utilisateur les éléments suivants, afin de se comporter de manière déterministe pour une séquence spécifique d'événements d'entrée de requêtes :

  • Un opérateur de flux de données défini par l'utilisateur reçoit les événements classés selon l'heure de synchronisation (heure de début pour les événements point et session de début, heure de fin pour les événements session de fin). Les événements intervalle ne sont pas pris en charge car ils ne classent pas directement les événements en fonction de l'heure de synchronisation, chaque événement indiquant deux points dans le temps : un début et une fin.

  • Seuls les événements insertion sont transmis à un opérateur de flux de données défini par l'utilisateur. Bien que les événements CTI (incrément de temps réel) du flux de données entrant soient transparents pour l'opérateur de flux de données défini par l'utilisateur, ce sont eux qui définissent comment ce dernier perçoit le passage du temps (voir NextCti ci-dessous).

  • StreamInsight peut désactiver un opérateur de flux de données si ce dernier l'autorise (voir IsEmpty ci-dessous). Un opérateur de flux de données défini par l'utilisateur désactivé peut également être recyclé par StreamInsight.

  • Chaque événement insertion entraîne l'appel de ProcessEvent, suivi de l'interrogation des propriétés de NextCti et de IsEmpty.

Entrée et sortie d'un opérateur de flux de données défini par l'utilisateur

Un opérateur de flux de données défini par l'utilisateur traite un seul événement insertion à la fois. Il peut produire 0-* événements de sortie pour chaque événement d'entrée. L'opérateur peut également mettre à jour son état interne suite à une entrée. Un événement d'entrée consiste soit en un CTI généré par une requête de l'opérateur pour indiquer le passage du temps, soit en une insertion. Les entrées sont horodatées.

En revanche, un événement de sortie consiste uniquement en une charge utile d'événement. Les événements de sortie ne peuvent pas être horodatés et les CTI ne peuvent pas être injectés dans le flux de sortie. Les événements de sortie sont générés comme des événements point dont les horodateurs se basent sur ceux des événements d'entrée correspondants.

Gestion du temps dans un opérateur de flux de données défini par l'utilisateur

Lorsque vous créez un nouvel opérateur de flux de données défini par l'utilisateur, votre code ne sert qu'à traiter la charge utile des événements. Le temps est entièrement pris en charge par StreamInsight. Les événements d'entrée sont reçus dans l'ordre. Les horodateurs de chaque événement de sortie se basent sur celui de l'événement d'entrée correspondant. Par exemple, si un événement fin de session déclenche un événement de sortie, ce dernier reçoit l'horodateur de l'événement fin de session. Par conséquent, l'opérateur peut être influencé par le temps, mais n'a aucun moyen de le contrôler.

L'opérateur de flux de données défini par l'utilisateur ne reçoit pas directement les CTI depuis le flux d'entrée dans sa méthode ProcessEvent(), mais peut réagir au passage du temps via la propriété NextCti. Cette propriété est interrogée par le moteur après chaque appel de ProcessEvent(). L'opérateur de flux de données défini par l'utilisateur peut retourner un horodateur indiquant le prochain horodateur CTI qu'il s'apprête à recevoir dans ProcessEvent() sous forme d'appel.

Seuls les CTI demandés lors de la configuration de la propriété NextCti seront transmis à ProcessEvent. Ces CTI se propagent uniquement dans l'opérateur de flux de données défini par l'utilisateur.

Implémentation d'un opérateur de flux de données défini par l'utilisateur

Pour créer un nouvel opérateur de flux de données défini par l'utilisateur, dérivez une nouvelle classe des classes abstraites de base CepPointStreamOperator ou CepEdgeStreamOperator.

  • L'opérateur considère les événements d'entrée comme des événements point lorsque la dérivation est effectuée depuis une classe abstraite de base CepPointStreamOperator. Toutefois, il ne s'agit pas d'une erreur si les événements ne sont pas en réalité des événements point. L'opérateur détecte uniquement leur heure de début.

  • L'opérateur détecte à la fois les sessions de début et de fin des événements d'entrée lorsque la dérivation est effectuée depuis une classe abstraite de base CepEdgeStreamOperator.

Les propriétés et les méthodes suivantes sont remplacées dans la classe dérivée :

  • Méthode ProcessEvent. Génère une sortie et met à jour l'état interne de l'opérateur en réponse à chaque événement d'entrée. ProcessEvent reçoit un événement d'entrée et peut retourner zéro charge utile de sortie ou plus.

  • Propriété IsEmpty. Indique si l'état interne de l'opérateur est vide. Si l'état est défini sur True, le moteur de requête StreamInsight peut ignorer l'instance de l'opérateur pour réduire l'utilisation de la mémoire.

  • Méthode NextCti (facultative). Indique le prochain point dans le temps auquel un événement CTI sera envoyé à l'opérateur. Le remplacement de cette propriété permet à l'opérateur défini par l'utilisateur de produire une sortie à un point spécifique dans le futur ou d'indiquer que son état interne est vide à l'issue d'un intervalle de temps d'application donné.

La classe dérivée doit également mettre en œuvre la sérialisation WCF. Pour plus d'informations sur la façon de créer un contrat de données de base pour une classe ou une structure, cliquez icihttps://go.microsoft.com/fwlink/?LinkId=215760.

Interaction entre le moteur StreamInsight et l'opérateur

La méthode ProcessEvent est appelée en même temps que les événements dans l'ordre de synchronisation à chaque instance de l'opérateur. L'heure de synchronisation des événements point ou des CTI correspond à l'heure de début valide. L'heure de synchronisation des événements session correspond à l'heure de début valide pour les sessions de début et à l'heure de fin valide pour les sessions de fin.

Les propriétés IsEmpty et NextCti sont interrogées après chaque appel de la méthode ProcessEvent.

Lorsque l'opérateur remplace NextCti, le moteur s'assure que le prochain événement traité par l'opérateur est un événement insertion dont l'heure de synchronisation est inférieure à la valeur de NextCti ou un CTI dont l'heure de début correspond à la valeur de NextCti. Les valeurs NextCti inférieures ou égales à l'heure de synchronisation du dernier événement traité retournées par l'opérateur sont ignorées. La propriété NextCti permet à l'opérateur de « convertir » la progression du temps dans le flux d'entrée en CTI internes pour qu'elle corresponde à son propre rythme, et d'y réagir en conséquence.

Seuls les événements insertion peuvent activer les opérateurs. Les CTI ne peuvent pas être à l'origine d'une activation. La désactivation d'un opérateur intervient lorsque la méthode IsEmpty retourne la valeur True.

Le moteur peut à tout moment choisir de sérialiser l'opérateur et lui communiquer ses références. Lorsqu'ensuite l'opérateur est désérialisé, celui-ci devrait reprendre là où il s'est arrêté.

Exemples d'opérateurs de flux de données définis par l'utilisateur

Lissage exponentiel

Cet opérateur de flux de données défini par l'opérateur traite un flux d'événements point comme une séquence de valeurs et applique un lissage exponentiel. Notez qu'une référence à System.Runtime.Serialization est nécessaire.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Correspondance de modèle

IsEmpty et NextCti peuvent également être utilisés autrement, comme l'illustre l'exemple de correspondance de modèle suivant. Dans cet exemple, l'opérateur recherche un événement dont la valeur est de 1.0 et qui ne soit pas suivi par un événement de 2.0 dans les trente secondes qui suivent. (Cet exemple a pour but d'illustrer des concepts utiles pour les opérateurs de flux de données définis par l'utilisateur. En pratique, ce modèle est assez simple pour être mis en œuvre à l'aide des opérateurs intégrés dans StreamInsight.)

Dans l'exemple précédent, NextCti servait à contrôler la durée de vie d'un opérateur. NextCti est également utilisé à cette fin dans notre exemple. Ici, toutefois, NextCti génère également des sorties à mesure que le temps passe.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Définition d'une méthode d'assistance facilitant l'utilisation

Une requête peut être utilisée afin de simplifier l'utilisation de l'opérateur. Par exemple, l'auteur de la requête trouvera plus simple d'écrire input.Smooth(0.5) plutôt que input.Scan(new SmoothingOperator(0.5)).

Vous pouvez activer ce modèle simplifié en créant une méthode d'extension personnalisée similaire à celle-ci :

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }