Crear adaptadores de entrada y de salida

En este tema se proporciona la información general necesaria para crear adaptadores de entrada y de salida para la aplicación de procesamiento de eventos complejos (CEP) con la plataforma de StreamInsight. Los adaptadores son convertidores de software que entregan eventos en un servidor de StreamInsight o desde él.

Descripción del flujo y el control de eventos

Al crear adaptadores, es importante entender el flujo de eventos en el servidor de StreamInsight y la forma en que los adaptadores de entrada y de salida controlan este flujo. Como se muestra en la siguiente ilustración, el flujo de eventos es unidireccional desde el origen, a través de la consulta permanente y hasta el receptor. El adaptador de entrada lee los eventos de un origen y los entrega a la consulta. Los eventos de entrada, o nuevos eventos generados por el procesamiento de los eventos de entrada, se insertan desde un operador de la consulta al siguiente. La consulta entrega los eventos procesados al adaptador de salida, que a su vez los entrega al receptor. La ilustración describe un escenario en el que una consulta de StreamInsight está enlazada a dos instancias de adaptador de entrada a1 y a2, y a la instancia de adaptador de salida a4.

Flujo de eventos desde el adaptador de entrada al adaptador de salida

Si bien el flujo de eventos es unidireccional desde el origen hasta el receptor, el flujo y control de ejecución para la recuperación y transferencia de eventos en algunos puntos de interacción entre los componentes puede ser bidireccional. Estos puntos de interacción se indican en la ilustración como READ, ENQUEUE, DEQUEUE y WRITE.

La implementación del adaptador de entrada debe realizar la operación READ utilizando los mecanismos de acceso específicos del dispositivo de origen (como un archivo o una base de datos) y la operación ENQUEUE utilizando las API del adaptador. De igual forma, la implementación del adaptador de salida debe realizar la operación WRITE utilizando los mecanismos de acceso específicos del dispositivo receptor y la operación DEQUEUE utilizando las API del adaptador. Las operaciones ENQUEUE y DEQUEUE se deben implementar según un modelo de diseño especificado en un diagrama de transiciones de estado del adaptador, que se describe más adelante en este tema.

Desde una perspectiva de control de flujo de eventos, puede imaginar los eventos insertándose desde un proveedor a un consumidor (se indica mediante flechas de bloque de izquierda a derecha) o siendo extraído por el consumidor de un proveedor (se indica mediante las flechas en ángulo). En los puntos de interacción de READ y WRITE, la implementación del adaptador puede adoptar los enfoques de inserción o extracción para el control de flujo de eventos. Algunos de los factores que se deben tener en cuenta en esta interacción son las frecuencias de eventos de las que es capaz el origen o el receptor, la capacidad del adaptador para limitar el origen o el receptor y las capacidades de almacenamiento en búfer que se pueden implementar.

En los dispositivos de origen que bombean eventos con una latencia muy baja y que resultan difíciles de limitar, un enfoque típico consiste en implementar un adaptador donde el dispositivo de origen inserta los eventos en el adaptador. Algunos ejemplos de estos dispositivos son los sensores (eventos controlados por el equipo), los centros de cotización y los puertos de red. En los dispositivos con latencias más altas (archivos, bases de datos), puede realizar una implementación donde el adaptador extraiga los datos del origen. De igual forma, en el lado de la salida, el adaptador de salida de un dispositivo que puede aceptar eventos con un rendimiento muy alto se puede implementar para insertar eventos en el dispositivo. Los dispositivos de salida más lentos pueden adoptar un enfoque donde el dispositivo sondee el adaptador cada vez que esté listo para utilizar eventos.

En el punto de interacción de ENQUEUE, el servidor de StreamInsightadmite un modelo de inserción. Esto significa que el modelo de diseño del adaptador permite poner en cola tantos eventos como pueda utilizar el motor en un momento determinado. En el punto de interacción de DEQUEUE, el servidor de StreamInsightadmite un modelo de extracción. Esto significa que el modelo de diseño del adaptador espera que se extraigan los eventos del servidor tan rápido como el motor pueda hacerlo.

Partiendo de esto, la directiva de limitación del servidor de StreamInsight es muy sencilla. Suponiendo una consulta de paso a través simple sin operaciones de bloqueo, la frecuencia con la que un servidor de StreamInsight puede utilizar eventos de un adaptador de entrada en el punto de interacción de ENQUEUE sola está limitada por la frecuencia con la que el adaptador de salida puede utilizar los eventos del servidor en el punto de interacción de DEQUEUE. El punto hasta el que el servidor de StreamInsight inserta los datos de nuevo en el adaptador de entrada durante ENQUEUE viene determinado por la rapidez con que la consulta puede proporcionar el resultado y la rapidez con que el adaptador de salida puede utilizar este resultado. StreamInsight proporciona un amplio conjunto de vistas de diagnóstico que ayudan a medir las frecuencias de eventos en cada uno de estos puntos de interacción. Para obtener más información, vea Supervisar el servidor y las consultas de StreamInsight.

Tareas de programación de adaptadores

Utilice la siguiente lista de comprobación para programar el adaptador.

  • Determinar el tipo de adaptador (entrada o salida) que necesita.

    Un adaptador de entrada lee los eventos de entrada en el formato en que se proporcionan y transforma estos datos a un formato compatible con el servidor de StreamInsight.

    Un adaptador de salida recibe eventos procesados por el servidor de StreamInsight, los transforma a un formato compatible con el dispositivo de salida y envía los datos a dicho dispositivo.

  • Determinar el tipo de evento.

    En un adaptador de entrada, defina el tipo de evento que describe la carga de los eventos que proporciona el origen. En un adaptador de salida, especifique el tipo de evento que describe la carga de eventos que usa el receptor. Para obtener más información sobre las cargas de eventos, vea Conceptos de servidor de StreamInsight.

    Especifique y genere un adaptador con tipo para un origen o receptor que siempre genere o utilice eventos de un formato de carga fijo en los que el número de campos y sus tipos se conocen por adelantado. La ventaja principal del adaptador con tipo es que la implementación de la creación de eventos para poner en la cola del servidor de StreamInsight es relativamente fácil. Dado que ya se conocen los tipos de campo, puede utilizar IntelliSense en Visual Studio (o una característica equivalente en otro entorno de desarrollo integrado) para rellenar los campos.

    Especifique y genere un adaptador sin tipo si el origen o el receptor genera o utiliza diferentes formatos de carga. La ventaja principal de un adaptador sin tipo se encuentra en la flexibilidad que proporciona para especificar el tipo de evento en el momento del enlace de consultas, en lugar de unir la implementación del adaptador a un tipo de evento concreto. Si se compara con el adaptador con tipo, la implementación del adaptador sin tipo resulta más compleja. El adaptador de entrada sin tipo se debe escribir de manera que se pueda determinar el tipo de cada campo a partir de los parámetros de configuración que se proporcionan durante el tiempo de enlace de consultas, rellenar los campos de uno en uno y, a continuación, poner el evento en cola. De igual forma, el adaptador de salida sin tipo debe poder recuperar el resultado del procesamiento de la consulta de un evento sacado de la cola en función de la información de configuración proporcionada a la salida.

    Es importante tener en cuenta que una instancia de un adaptador (con o sin tipo) que se enlaza a la consulta siempre emite eventos que contienen cargas de un tipo específico. Para obtener más información, vea Crear tipos de evento.

  • Determinar el modelo de evento.

    Determine el modelo de evento para los eventos de entrada y salida. StreamInsight admite tres modelos de evento: punto, intervalo y perimetral. Si el origen proporciona eventos de un modelo de evento fijo, puede diseñar un adaptador de entrada exclusivamente para dicho modelo de evento. De la misma forma, si el receptor requiere eventos de un modelo determinado, puede diseñar un adaptador de salida exclusivamente para dicho modelo de evento. Sin embargo, la mayoría de las aplicaciones pueden necesitar todos los modelos de evento para un tipo de evento determinado. Se recomienda crear un adaptador con tipo o un adaptador sin tipo para cada modelo de evento. Para obtener más información sobre los modelos de evento, vea Conceptos de servidor de StreamInsight.

    Las clases AdapterFactory de entrada y salida permiten empaquetar juntos estos adaptadores. Se puede crear la instancia del adaptador correcto en el momento del enlace de consultas, en función de los parámetros de configuración.

  • Elegir la clase base de adaptador correspondiente.

    En función del tipo y el modelo de evento, seleccione la clase base de adaptador adecuada. La nomenclatura de clase sigue el modelo [Typed][Point | Interval | Edge][Input | Output]. Los adaptadores sin tipo no tienen el prefijo Typed.

    Tipo de adaptador

    Clase base de adaptador de entrada

    Clase base de adaptador de salida

    Punto con tipo

    TypedPointInputAdapter

    TypedPointOutputAdapter

    Punto sin tipo

    PointInputAdapter

    PointOutputAdapter

    Intervalo con tipo

    TypedIntervalInputAdapter

    TypedIntervalOutputAdapter

    Intervalo sin tipo

    IntervalInputAdapter

    IntervalOutputAdapter

    Perimetral con tipo

    TypedEdgeInputAdapter

    TypedEdgeOutputAdapter

    Perimetral sin tipo

    EdgeInputAdapter

    EdgeOutputAdapter

    Para obtener más información, vea Microsoft.ComplexEventProcessing.Adapters.

  • Diseñar las clases AdapterFactory de entrada y salida.

    AdapterFactory es una clase contenedora para adaptadores. Debe implementar una clase de generador. Las clases de generador base se organizan tal como se muestra a continuación.

    Tipo de adaptador

    Clase base de adaptador de entrada

    Clase base de adaptador de salida

    Con tipo

    ITypedInputAdapterFactory

    ITypedOutputAdapterFactory

    Sin tipo

    IInputAdapterFactory

    IOutputAdapterFactory

    Con tipo de compatibilidad con resistencia

    IHighWaterMarkTypedInputAdapterFactory

    IHighWaterMarkTypedOutputAdapterFactory

    Sin tipo de compatibilidad con resistencia

    IHighWaterMarkInputAdapterFactory

    IHighWaterMarkOutputAdapterFactory

    La clase de generador sirve para los siguientes objetivos:

    • Habilita el uso compartido de recursos entre distintas implementaciones del adaptador para una clase determinada de dispositivos (archivo CSV, base de datos de SQL Server, formato de registro común del servidor web) o requisito de aplicación, y hace que sea más fácil pasar parámetros de configuración al constructor del adaptador. Por ejemplo, una aplicación puede requerir los tres modelos de evento (punto, intervalo y perimetral). Un único generador puede admitir tres implementaciones del adaptador, una para cada modelo de evento. Otro ejemplo es el de una aplicación que tiene el mismo origen de evento que una tabla de base de datos, pero el origen genera varias estructuras de carga útil de evento desde el mismo origen en función de las consultas que se ejecutan. En este caso, un único generador puede admitir implementaciones del adaptador para controlar cada estructura de carga.

    • Proporciona al adaptador una puerta de enlace al motor en tiempo de ejecución del servidor. El desarrollador de software de adaptadores debe implementar los métodos Dispose() y Create() en el generador del adaptador para la clase de adaptador. El servidor invoca estos métodos durante el inicio y el cierre de la consulta.

    • Proporciona al adaptador una puerta de enlace a la información de configuración previa al tiempo de ejecución. Esto tiene especial importancia para los adaptadores sin tipo, que deben determinar el tipo de cada campo de la estructura a partir de los parámetros de configuración proporcionados durante el tiempo de enlace de consulta. Puede definir la estructura de configuración en la clase de generador y pasarla a través del método Create() al método de constructor de la clase del adaptador. Esta estructura de configuración se serializa mediante DataContractSerialization. Aparte de esta restricción, la metodología de desarrollo proporciona una flexibilidad completa en la definición y el uso de esta estructura de configuración, en cuanto a la forma de rellenarla y usarla en el constructor del adaptador.

    • Proporciona una manera de generar incrementos de tiempo actual (CTI) sin ponerlos explícitamente en cola a través del adaptador de entrada. Mediante la implementación de ITypedDeclareAdvanceTimePolicy (para un generador de adaptador con tipo) e IDeclareAdvanceTimePolicy (para un generador de adaptador sin tipo) en la clase de generador de adaptadores, el usuario puede especificar la frecuencia de CTI y marcas de tiempo. Así se simplifica el código del adaptador y puede afectar a todos los flujos de eventos que el generador crea a través de sus instancias de adaptador. Para obtener más información, vea [AdvanceTimeSettingsClass].

    • En las aplicaciones resistentes, admite la resistencia al proporcionar el límite máximo al adaptador de entrada para la reproducción de eventos perdidos y al proporcionar el límite máximo y el desplazamiento al adaptador de salida para la eliminación de eventos duplicados. Para obtener más información, vea Resistencia de StreamInsight.

  • Compilar y probar el adaptador.

    Compile y genere el adaptador como un ensamblado .NET. Pruebe el adaptador con operaciones básicas en una consulta sencilla de paso a través que lea eventos de un adaptador de entrada y los envíe al adaptador de salida sin ningún procesamiento de consulta complejo. Así comprobará que el adaptador lee y escribe en los dispositivos, y puede poner eventos en cola y sacarlos de ella.

Máquina de estados del adaptador

La máquina de estados que define la interacción entre un adaptador y el servidor de StreamInsight es la misma tanto para los adaptadores de entrada como de salida. Esto resulta significativo porque la máquina de estados proporciona un modelo de desarrollo uniforme. La máquina de estados se muestra en la siguiente ilustración.

Diagrama de estado en cola y fuera de la cola del adaptador

A continuación se indican las características y los requisitos principales para conseguir que esta máquina de estados funcione:

  • Start() y Resume() son métodos a los que llama el servidor de StreamInsight y que debe implementar el desarrollador de adaptadores. Además, también se debe implementar el método de constructor para la clase de adaptador y el método Dispose(), que se hereda de la clase base.

  • A su vez, la implementación del adaptador debe llamar a los siguientes métodos proporcionados por el SDK de adaptadores:

    • Enqueue() para el adaptador de entrada. Devuelve los valores EnqueueOperationResult.Success o EnqueueOperationResult.Full.

    • Dequeue() para el adaptador de salida. Devuelve los valores DequeueOperationResult.Success o DequeueOperationResult.Empty.

    • Ready(). Devuelve un valor booleano TRUE o FALSE.

    • Stopped(). Devuelve un valor booleano TRUE o FALSE.

  • El servidor de StreamInsight llama de forma asincrónica al método interno (se indica como StopQuery()) en nombre del usuario cuando un administrador o un desarrollador de consultas detiene la ejecución de la consulta a través de métodos en la API del servidor.

  • Las llamadas a Enqueue() y Dequeue() devuelven el estado Full y Empty respectivamente cuando el adaptador está en uno de los siguientes estados:

    • Suspendido

    • Deteniéndose

  • Las llamadas a Enqueue() y Dequeue() generan una excepción cuando el adaptador está en uno de los siguientes estados:

    • Creada

    • Detenido

  • Las llamadas a Ready() generan una excepción cuando el adaptador está en uno de los siguientes estados:

    • Creada

    • En ejecución

    • Detenido

  • Un adaptador recorre alguno de los cinco estados o todos ellos (creado, en ejecución, suspendido, deteniéndose o detenido) mientras está en funcionamiento. Una transición de estado tiene lugar antes de que el servidor de StreamInsight llame a Start() o Resume() y después de que el adaptador llame a Enqueue(), Dequeue(), Ready() y Stopped().

  • El servidor de StreamInsight y el adaptador nunca comparten el mismo subproceso. El servidor siempre llama a Start() o Resume() en un subproceso de trabajo independiente. El servidor obtiene este subproceso de un grupo de subprocesos de sistema operativo en nombre del adaptador. Esto implica que los métodos Start() y Resume() cuentan con toda la potencia y flexibilidad necesarias para utilizar el subproceso de trabajo según sea necesario (por ejemplo, para crear más subprocesos para lecturas o escrituras asincrónicas). Por tanto, debe extremar las precauciones y seguir las prácticas recomendadas al usar los recursos del sistema desde este subproceso.

  • La API hace que sea innecesaria la sincronización inherente entre las operaciones (subprocesos) Start() y Resume(). El servidor siempre llama a Resume() después (y solo después) de que el adaptador haya llamado a Ready(). Con todo, se debe tener en cuenta que la sincronización podría ser necesaria para las tareas de dispositivo de lectura, escritura o almacenamiento en búfer de eventos, sobre todo en escenarios de E/S asincrónicos. Lo más recomendable es usar E/S que no sea de bloqueo.

  • Si el adaptador puede estar inactivo, el adaptador debe comprobar periódicamente el estado para determinar si se le ha solicitado que se detenga.

Vigencia de la interacción entre el adaptador y el servidor

El protocolo de enlace entre el servidor de StreamInsight y el adaptador es siempre sincrónico. Así, el adaptador puede comprobar su estado en cualquier punto de su ejecución y reaccionar en función de él. La vigencia de la interacción entre el adaptador y el servidor de StreamInsight se compone de las siguientes operaciones, que corresponden a la máquina de estados mostrada en la ilustración anterior.

  • Creado

    Una instancia de adaptador comienza a interactuar con el servidor de StreamInsight cuando se inicia la consulta (realizando una llamada correspondiente en la API del servidor de StreamInsight.

  • En ejecución

    El servidor asigna al adaptador el estado En ejecución y llama a Start() en el adaptador de forma asincrónica; garantiza que esta llamada solo se realiza una vez. Cuando el adaptador está en el estado En ejecución, el adaptador puede poner o quitar eventos de la cola del servidor.

    Se espera que el adaptador esté en el estado En ejecución la mayor parte del tiempo. El modelo de diseño recomendado consiste en invocar la rutina de lector o escritor, preferiblemente en un subproceso independiente, desde el método Start() y volver de la rutina de Start(), abandonando rápidamente así el subproceso de trabajo.

    La rutina del lector (supongamos que se denomina ProduceEvents() como ejemplo) lee los eventos del origen y llama a Enqueue() para insertar los eventos en el servidor. En el caso de un adaptador de salida, una rutina de escritor (supongamos que se denomina ConsumeEvents() como ejemplo) llama a Dequeue() para extraer eventos del servidor y los escribe en el receptor.

  • Suspendido

    Cuando el servidor no puede recibir un evento en cola o generar un evento para quitarlo de la cola, el adaptador de entrada o de salida pasa al estado Suspendido. Esto hace que las invocaciones de Enqueue() y Dequeue() devuelvan los estados FULL y EMPTY respectivamente. En el estado Suspendido, puede implementar operaciones de mantenimiento como guardar la posición del último registro leído de la base de datos o la línea del archivo. Al final de esta sección opcional, debe invocar el método Ready() para comunicar al servidor que el adaptador está listo para su reanudación. Si la rutina se está ejecutando en el mismo subproceso de trabajo que Start(), debe volver desde la propia rutina Start().

  • Como respuesta a una invocación a Ready(), el servidor devuelve el adaptador al estado En ejecución y siempre llama de forma asincrónica a Resume() en un subproceso de trabajo diferente. Puede diseñar Resume() para que ponga en cola o quite de la cola la última iteración con error y, a continuación, llamar a ProduceEvents() o ConsumeEvents(). Este modelo puede continuar hasta que el adaptador pase a un estado Detenido o Deteniéndose.

  • Deteniéndose

    En cualquier punto de los estados En ejecución o Suspendido, el servidor puede pasar el adaptador a un estado Deteniéndose como respuesta a una solicitud asincrónica para detener la consulta. En este estado, la invocación de Enqueue() o Dequeue() también devuelve los estados FULL o EMPTY respectivamente.

    El estado Deteniéndose proporciona a la implementación del adaptador una área de almacenamiento provisional donde prepararse correctamente para la detención. Puede implementar el adaptador para renunciar a todos los recursos que ha obtenido (subprocesos, memoria) y, a continuación, invocar el método Stopped(). Si no se llama a este método, el servidor no detendrá el adaptador.

    Tenga en cuenta que el adaptador puede pasar a un estado Deteniéndose de forma asincrónica. El adaptador requiere algunos recursos para detectar que ha entrado en el estado Deteniéndose. Tal y como se ha indicado anteriormente, el modelo de diseño consiste en que el adaptador invoque Ready() cuando está suspendido. Como respuesta, el servidor invoca el método Resume() una vez más, habilitando así la detección del estado Deteniéndose en el método Resume(). Se recomienda incluir la comprobación del estado Deteniéndose como primer bloque de código en la implementación de Start() y Resume().

  • Detenido

    El código del adaptador puede llamar a Stopped() en cualquier punto. De esta forma, el adaptador se pone en el estado de detenido. Se recomienda limpiar los recursos que el adaptador obtuvo antes de llamar a Stopped().

    Nota importanteImportante

    Si no se llama al método Stopped(), la última página de memoria asociada a la consulta sigue estando asignada. Esto puede producir pequeñas pérdidas de memoria que se pueden acumular con el tiempo si hay muchos ciclos de inicio y detención de consultas en un proceso.

    En el estado Detenido, el adaptador no puede hacer referencia a construcciones específicas del servidor de StreamInsight ni a la memoria de eventos, ni realizar operaciones de puesta en cola o eliminación de la cola. Estas acciones producirán una excepción. Sin embargo, las actividades de limpieza en el dispositivo y el sistema operativo pueden continuar.

Ejemplos

Para obtener ejemplos de diferentes adaptadores y generadores de adaptadores de entrada y salida, vea los ejemplos disponibles en StreamInsight Samples.

Vea también

Conceptos

Conceptos de servidor de StreamInsight

Arquitectura del servidor de StreamInsight