Operadores de flujo definidos por el usuario

Un operador de flujo definido por el usuario permite definir el procesamiento personalizado de los flujos de eventos.

Patrón de uso

En una consulta, se llama a un operador de flujo definido por el usuario mediante el método de extensión Scan de CepStream. Se proporciona el flujo de entrada y el estado inicial del operador, como en el siguiente ejemplo.

var query = input.Scan(new SmoothingOperator(0.7));

El autor del operador deriva una nueva clase de los tipos CepPointStreamOperator o CepEdgeStreamOperator abstractos. El nuevo tipo encapsula la máquina de estados del operador. Una llamada al constructor de este tipo se pasa al operador de examen para establecer el estado inicial del operador.

Características de un operador de flujo definido por el usuario

Un operador de flujo definido por el usuario permite al usuario interactuar con el flujo de eventos de un modo procedimental. Como tal, representa un límite entre el motor de procesamiento de eventos de StreamInsight y CLR, similar a los adaptadores y los operadores o agregados definidos por el usuario. En todos estos casos, el motor y el desarrollador de software cumplen un contrato acerca de las propiedades temporales de un flujo. El motor de StreamInsight garantiza lo siguiente al operador de flujo definido por el usuario para que se comporte de forma determinista en una secuencia específicas de eventos de entrada de consulta:

  • Se garantiza que un operador de flujo definido por el usuario recibirá los eventos pedidos por su hora de sincronización (hora de inicio para los eventos de punto y perimetrales iniciales, y hora de finalización para los eventos perimetrales finales). Los eventos de intervalo no se admiten porque no tienen una representación sencilla ordenada por horas de sincronización, ya que cada evento indica dos puntos en el tiempo: un inicio y un final.

  • Solo se pasan eventos de inserción a un operador de flujo definido por el usuario. Los eventos de incremento de tiempo actual (CTI) del flujo de entrada son transparentes para el operador de flujo definido por el usuario, pero siguen determinando el modo en que este operador percibe el paso del tiempo (vea NextCti más adelante).

  • StreamInsight puede desactivar un operador de flujo definido por el usuario en función de si lo permite (vea IsEmpty más adelante). StreamInsight puede reciclar un operador de flujo definido por el usuario desactivado.

  • Cada evento de inserción provoca una llamada a ProcessEvent, seguida del sondeo de las propiedades NextCti e IsEmpty.

Entrada y salida de un operador de flujo definido por el usuario

Un operador de flujo definido por el usuario procesa un evento cada vez. En respuesta a cada evento de entrada, puede producir 0-* eventos de salida. El operador también puede actualizar su estado interno como respuesta a una entrada. Un evento de entrada puede ser un CTI, generado en la solicitud del operador para indicar el paso del tiempo, o una inserción. Las entradas se anotan temporalmente.

Por el contrario, un evento de salida es simplemente una carga de evento. No hay ninguna oportunidad de aplicar una marca de tiempo a los eventos de salida ni insertar CTI en el flujo de salida. Los eventos de salida se generan como eventos de punto, con las marcas de tiempo que están basadas en las marcas de tiempo de los eventos de entrada correspondientes.

Controlar el tiempo en un operador de flujo definido por el usuario

Al crear un nuevo operador de flujo definido por el usuario, el código solo tiene que procesar la carga de los eventos. StreamInsight controla el tiempo de forma exclusiva. Los eventos de entrada se reciben en orden. La marca de tiempo de cada evento de salida se basa en la marca de tiempo del evento de entrada correspondiente. Por ejemplo, si un evento final perimetral desencadena un evento de salida, dicho evento de salida recibe la marca de tiempo del evento final perimetral. Por lo tanto, el operador puede estar influido por el tiempo, pero no puede controlarlo.

El operador de flujo definido por el usuario no recibe CTI directamente del flujo de entrada en su método ProcessEvent(), pero tiene la capacidad de reaccionar ante el paso del tiempo mediante la propiedad NextCti. El motor sondea esta propiedad después de cada llamada a ProcessEvent(). El operador de flujo definido por el usuario puede devolver una marca de tiempo que indique la siguiente marca de tiempo de CTI que recibirá como una llamada a ProcessEvent().

Solo los CTI que se hayan solicitado mediante el establecimiento de la propiedad NextCti se pasarán a ProcessEvent. Estos CTI no se propagarán fuera del operador de flujo definido por el usuario.

Implementar un operador de flujo definido por el usuario

Para crear un nuevo operador de flujo definido por el usuario, derive una nueva clase de las clases base CepPointStreamOperator o CepEdgeStreamOperator abstractas.

  • Si realiza la derivación de la clase base CepPointStreamOperator abstracta, el operador considera los eventos de entrada como eventos de punto. No obstante, no es un error si los eventos no son de hecho eventos de punto. El operador solo ve sus horas de inicio.

  • Si realiza la derivación de la clase base CepEdgeStreamOperator abstracta, el operador ve los perímetros iniciales y finales de los eventos de entrada.

En la clase derivada, se reemplazan las siguientes propiedades y métodos:

  • Método ProcessEvent. Genera la salida y actualiza el estado interno del operador en respuesta a cada evento de entrada. ProcessEvent recibe un evento de entrada y puede devolver cero o más cargas de salida.

  • Propiedad IsEmpty. Indica si el estado interno del operador está vacío. Cuando es true, el motor de consultas de StreamInsight puede descartar la instancia del operador para reducir el consumo de memoria.

  • Opcionalmente, el método NextCti. Indica el siguiente momento en el que se enviará un evento CTI al operador. La invalidación de esta propiedad permite al operador definido por el usuario producir la salida en un momento específico del futuro o indicar que su estado interno está vacío una vez transcurrido un determinado intervalo de tiempo de aplicación.

La clase derivada también debe implementar la serialización WCF. Para obtener más información, vea Cómo: crear un contrato de datos básico para una clase o una estructura

Cómo interactúa el motor de StreamInsight con el operador

Por cada instancia del operador, se llama al método ProcessEvent con eventos por hora de sincronización. Para un evento de punto o un CTI, la hora de sincronización es una hora de inicio válida. Para un evento perimetral, la hora de sincronización es la hora de inicio válida para los perímetros iniciales o la hora de finalización válida para los perímetros finales.

Después de cada llamada al método ProcessEvent, se sondean las propiedades IsEmpty y NextCti.

Cuando el operador invalida NextCti, el motor garantiza que el siguiente evento procesado por el operador será un evento de inserción con una hora de sincronización menor que el valor de NextCti o un CTI con el valor de NextCti como su hora de inicio. Si el operador devuelve un valor NextCti que sea menor o igual que la hora de sincronización del último evento procesado, se omite. La propiedad NextCti permite que el operador “traduzca” el transcurso del tiempo en el flujo de entrada a su propio ritmo (en forma de estos CTI internos) y, a continuación, que reacciones a dicho transcurso.

Los operadores se activan como respuesta a eventos de inserción únicamente. Los CTI no desencadenan la activación. Un operador está desactivado cuando devuelve true en IsEmpty.

En cualquier momento, el motor puede optar por serializar el operador y liberarle su referencia. Cuando el operador se deserializa posteriormente, se espera que se retome donde se dejó.

Ejemplos de operadores de flujo definidos por el usuario

Suavizado exponencial

Este operador de flujo definido por el usuario trata un flujo de eventos de punto como una secuencia de valores y aplica un suavizado exponencial. Tenga en cuenta que hace falta una referencia a System.Runtime.Serialization.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Coincidencia de patrón

Este simple ejemplo de coincidencia de patrón ilustra un uso alternativo de IsEmpty y NextCti. En este ejemplo, el operador busca un evento con el valor 1.0 al que no le siga un evento con valor 2.0 en el plazo de treinta segundos. (Este ejemplo se proporciona para ilustrar conceptos útiles en los operadores de flujo definidos por el usuario. En una aplicación real, este patrón es lo suficientemente simple como para implementarse con los operadores integrados de StreamInsight).

El ejemplo anterior usó NextCti para controlar la duración de un operador. Este ejemplo también usa NextCti para este propósito, pero también usa NextCti para producir la salida en respuesta al paso del tiempo.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Definir un método auxiliar para simplificar el uso

Tal vez desee simplificar el uso del operador en una consulta. Por ejemplo, sería más cómodo para el autor de la consulta escribir input.Smooth(0.5) que input.Scan(new SmoothingOperator(0.5)).

Puede habilitar este patrón simplificado mediante la creación de un método de extensión personalizado como el siguiente:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }