Agregados y operadores definidos por el usuario

Los ejemplos de este tema muestran cómo ampliar el conjunto de operaciones basadas en ventana de los operadores LINQ de StreamInsight mediante agregados definidos por el usuario (UDA) y operadores definidos por el usuario (UDO). Estas extensiones están definidas en ventanas de evento y devuelven cero o más eventos de resultado. Los agregados definidos por el usuario y los operadores definidos por el usuario se deben compilar en un ensamblado al que pueda tener acceso el servidor de StreamInsight de la misma forma en que se proporcionan y usan los adaptadores en tiempo de ejecución.

StreamInsight también ofrece operadores de flujo definidos por el usuario como otro mecanismo de extensibilidad. Los operadores de flujo definidos por el usuario se definen en el flujo de eventos directamente en vez de hacerlo en las ventanas de evento.

Funciones de agregado definidas por el usuario

Un agregado definido por el usuario se usa con la especificación de una ventana para agregar los eventos de la ventana y generar un valor de resultado único. Un UDA toma como entrada una ventana CEP (que es resultado de un operador de ventana de salto, instantánea o basada en recuento) que contiene un conjunto de eventos CEP y genera un valor devuelto único (un tipo CLR que se asigna a uno de los tipos primitivos de StreamInsight). Para obtener más información acerca de las ventanas, vea Utilizar ventanas de eventos.

Puede implementar agregados UDA más complejos en su funcionalidad que los agregados más sencillos similares a count, sum y average proporcionados por StreamInsight. Un ejemplo de este tipo, el cálculo de los promedios ponderados de tiempo, se trata en una sección posterior.

Operadores definidos por el usuario

Un operador definido por el usuario se usa con la especificación de una ventana para procesar los eventos de la ventana y generar un conjunto de uno o varios eventos resultantes. Un UDO toma como entrada una ventana CEP (que es el resultado de un operador de ventana de salto, instantánea o recuento) que contiene un conjunto de eventos CEP y genera un conjunto de eventos CEP o de cargas CEP.

Puede usar un operador UDO si necesita que el cálculo genere o afecte a eventos completos, incluidas sus marcas de tiempo, de cada ventana. Un ejemplo es el establecimiento de un campo de estado de un evento además de calcular una agregación, en que el estado depende del resultado de la agregación y de otro parámetro. Por ejemplo, un UDO podría generar un evento único para cada ventana que contiene un campo de carga con el resultado de la agregación y un campo de estado que indica si el resultado de la agregación infringió alguna restricción.

Dependencia de tiempo en UDA y UDO

Puede definir agregados UDA y operadores UDO dependientes o independientes del tiempo, en función de la elección de la clase base para implementar estos elementos.

Los UDA Y UDO independientes del tiempo no esperan que se les pasen eventos completos, incluidas sus marcas de tiempo. Solamente tienen en cuenta un conjunto de uno o varios campos de carga de los eventos de la ventana definida. Además, no se les pasan las horas de inicio y de finalización de la ventana.

A los UDA y UDO dependientes del tiempo se les pasa un conjunto de eventos para cada ventana, con sus marcas de tiempo y las horas de inicio y de finalización de la ventana. Si un UDA o UDO depende del tiempo es determinado por la clase base respectiva de la que deriva la implementación el autor del UDA o UDO.

Implementar agregados definidos por el usuario

El autor de un UDA tiene las siguientes responsabilidades:

  • Proporcionar la implementación del UDA.

  • Proporcionar el método de extensión para que LINQ permita a un creador de consultas utilizar el UDA.

Para implementar un UDA, el usuario deriva de la clase base adecuada: CepAggregate para los UDA que no dependen del tiempo o CepTimeSensitiveAggregate para los UDA que dependen del tiempo.

La derivación de clase requiere la creación de instancias de los parámetro de tipo de entrada y salida. El tipo de entrada representa la carga completa (si el UDA necesita ser capaz de examinar el conjunto completo de campos de carga en el transcurso de su cálculo) o un tipo CLR que se asigna a un tipo primitivo correspondiente en el sistema de tipos de StreamInsight (si un campo singleton es la entrada a UDA). El tipo de salida en ambos casos debe ser un tipo CLR que se asigna a un tipo primitivo correspondiente.

Además de los datos de evento, se puede pasar al constructor de la clase del UDA una estructura de configuración opcional en la hora de inicio de la consulta (si el autor del UDA lo desea). Si el autor del UDA proporciona este constructor, el motor lo llamará en tiempo de ejecución, con la configuración proporcionada por el autor de la llamada del UDA en LINQ.

Tanto los IDA que dependen del tiempo como los que no dependen del tiempo reciben las caracas como un conjunto no ordenado. En el caso de un UDA que depende del tiempo, las marcas de tiempo de los eventos están también asociadas a cada carga. Además se pasa al UDA un descriptor de ventana que define las horas de inicio y finalización de la ventana.

Ejemplos de agregados definidos por el usuario

En el siguiente ejemplo se implementa un UDA que no depende del tiempo. Espera un conjunto de campos de evento enteros. En esta implementación del ejemplo no se especifica la estructura de configuración opcional, por lo que la clase no necesita un constructor concreto.

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

Además de implementar el UDA, debe proporcionar un método de extensión para que LINQ permita al creador de la consulta utilizar el UDA. El método de extensión es una firma que permite al autor de la consulta utilizar el agregado y compilar la consulta. Mediante un atributo, el proveedor de LINQ de StreamInsight puede hacer referencia a la clase real que contiene la implementación del UDA, tal como se muestra en el siguiente ejemplo.

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

Aquí, debe haber una implementación de un UDA a través de una clase Median, que implementa un UDA que funciona en un campo único de tipo int y devuelve un valor de tipo int. La expresión en la firma de la función representa la asignación del tipo de evento del flujo de entrada a un valor entero único. Observe que el método de extensión no se ejecutará nunca, de ahí CepUtility.DoNotCall() en su cuerpo. En función de esta especificación, el UDA se puede utilizar en LINQ, como se muestra en el siguiente ejemplo.

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

El argumento de expresión lambda asigna la carga del evento a un valor entero que será la entrada al UDA. En este caso, se calculará la mediana de los valores del campo de evento val para cada ventana.

A continuación, considere el ejemplo de un UDA que no depende del tiempo e incluye información de configuración. Espera una carga completa de tipo Trade como entrada y devuelve valores de tipo double. Este ejemplo también incluye el método de extensión correspondiente:

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

Dado que la carga completa será la entrada, el método de extensión no especifica ninguna expresión lambda. El único parámetro para UDA es el valor de la configuración (que aquí es de tipo double):

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

A continuación, considere el ejemplo de un UDA que depende del tiempo e incluye información de configuración. El UDA es un promedio ponderado de tiempo con eventos de intervalo interpretados como una función de paso (es decir, cada intervalo es válido hasta el siguiente). De forma similar al ejemplo anterior, no espera la carga completa como entrada, sino solamente valores de tipo double.

Tenga en cuenta que, aunque las cargas de evento se reduzcan a valores double, el conjunto de entrada sigue definido como un conjunto de eventos de intervalo en lugar de un conjunto de cargas útiles, como era el caso en el UDA que no depende del tiempo. Esto resulta necesario para incluir las marcas de tiempo, ya que el UDA se ha especificado como dependiente del tiempo. Además, la ventana recibe la forma de un objeto WindowDescription, que tiene una propiedad de hora de inicio y de hora de finalización. Estas marcas de tiempo se especifican en hora UTC. Observe también que UdaConfig es una clase o estructura que se debe poder serializar mediante DataContractSerializer.

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

Donde UDAConfig es

public class UDAConfig
{
    public double Multiplier { get; set; }
}

Ahora el método de extensión también incluye la siguiente estructura de configuración:

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

La configuración se convierte en otro parámetro del método de extensión:

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

Los ejemplos tratados hasta ahora consideran escenarios en los que el evento donde el evento tiene tipo. Es decir, los tipos de carga ya se conocen en el momento de la implementación del UDA. En el siguiente ejemplo se implementa un UDA que tiene un tipo de entrada genérico y en el que el tipo de entrada se pasa al UDA en tiempo de ejecución.

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

Implementar operadores definidos por el usuario

El autor de un UDO tiene las siguientes responsabilidades:

  • Proporcionar la implementación del UDO.

  • Proporcionar el método de extensión para que LINQ permita a un creador de consultas utilizar el UDO.

Para implementar un UDO, el usuario deriva de la clase base adecuada: CepOperator para UDO que no dependen del tiempo o CepTimeSensitiveOperator. La derivación de clase requiere la creación de instancias del parámetro de tipo de entrada y salida. El tipo de entrada siempre representa la carga útil entera. El tipo de salida es un conjunto de cargas o un conjunto de eventos, dependiendo de la clase base seleccionada.

Además de los datos de evento, puede pasar al constructor de la clase del UDO una estructura de configuración opcional en la hora de inicio de la consulta (si el autor del UDO lo desea). Si el autor del UDO proporciona este constructor, el motor lo llamará en tiempo de ejecución, con la configuración proporcionada por el autor de la llamada del UDO en LINQ.

Tanto los UDO que dependen del tiempo como los que no dependen del tiempo reciben las cargas como un conjunto no ordenado. En el caso de un UDO que depende del tiempo, las marcas de tiempo de los eventos están también asociadas a cada carga. Además se pasa al UDO un descriptor de ventana que define las horas de inicio y finalización de la ventana.

Comportamiento de CTI en operadores definidos por el usuario

Los UDO cambiarán los incrementos de tiempo actual (CTI) de la siguiente manera: cuando una ventana todavía está "abierta", es decir, no se ha recibido ningún CTI con una marca de tiempo posterior a la hora de finalización de la ventana, todos los CTI que caigan dentro de la ventana se cambian a la hora de inicio de la ventana. Así se garantiza que el resultado del UDO, que podría contener marcas tiempo definidas por el usuario, puede cambiar mientras la ventana todavía esté abierta.

Implementaciones de ejemplo de operadores definidos por el usuario

En el siguiente ejemplo se implementa un UDA que no depende del tiempo sin información de configuración.

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

En el siguiente ejemplo se muestra cómo cambiar la firma a un UDO que depende del tiempo y que acepta información de configuración.

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

Ejemplos de métodos de extensión para operadores definidos por el usuario

Además de implementar el UDO, su autor debe proporcionar un método de extensión para que LINQ permita al creador de la consulta utilizar el UDO. El método de extensión es una firma que permite al autor de la consulta usar el operador y compilar la consulta. Mediante un atributo, el proveedor de LINQ puede hacer referencia a la clase real que contiene la implementación del UDO, como se muestra en el siguiente ejemplo.

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

Este UDO se podría utilizar a partir de ahora de la siguiente manera.

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

El siguiente ejemplo muestra el método de extensión y el uso de un UDO que tiene una estructura de configuración y hace referencia a una implementación incluida en una clase denominada SampleUDOwithConfig.

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

Propiedades de campo de evento específicas de la referencia cultural

Las extensiones como los UDO, UDA y UDF se pueden considerar como interfaces entre el dominio CEP con su sistema de tipos y .NET CLR. En algunas aplicaciones, resulta conveniente poder pasar información de la referencia cultural a través de esta interfaz. En los UDA y UDO, el autor de la extensión puede implementar una interfaz adicional, IDeclareEventProperties, que permite la inspección o la configuración de las propiedades de referencia cultural en los campos de evento. Para implementar esta interfaz, debe proporcionar una función DeclareEventProperties, que devuelve un objeto de CepEventType, que puede transportar información de referencia cultural para sus campos, tal como se muestra en el siguiente ejemplo:

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

Este UDO de ejemplo toma eventos de entrada de tipo Input y genera eventos de tipo Output. El tipo Output tiene campos de cadena en los que el autor del UDO desea incluir explícitamente una anotación con determinada información de referencia cultural. La referencia cultural denominada zh-CN se aplica al campo de salida firstName, mientras que el campo de salida location recibe la anotación con la misma referencia cultural asociada al campo loc en el tipo de evento de entrada del UDO. Para cada evento que genera el UDO en tiempo de ejecución, estas referencias culturales se aplican a sus campos antes de que el evento se inserte en el flujo de salida del UDO.

También existe la misma interfaz para los agregados definidos por el usuario. Puesto que los agregados solamente tienen un único valor devuelto, para aplicar información específica de la referencia cultural a este tipo de campo, la interfaz IDeclareEventProperties incluye el valor devuelto en CepEventType con un único campo; de este modo proporciona una manera de incluir una anotación en ese campo con propiedades de evento específicas de CEP.

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

Aquí, la cadena que representa el resultado del agregado se incluye en CepEventType, de forma que el autor del UDA puede establecer la propiedad CultureInfo en este campo. Esta información de referencia cultural se propagará al campo de evento real que recibe el resultado de la agregación en la consulta de LINQ, donde se utiliza el UDA.

Vea también

Conceptos

Utilizar ventanas de eventos

Otros recursos

Escribir plantillas de consulta en LINQ