Ejemplo integral de StreamInsight

En este tema se describen los diferentes componentes y pasos relacionados con la creación de una aplicación de StreamInsight y se incluye un ejemplo integral de una aplicación. Una aplicación de StreamInsight combina orígenes de eventos, receptores de eventos y consultas para implementar un escenario de procesamiento de eventos complejos. LA API de StreamInsight ofrece diferentes interfaces para admitir diversos niveles de control y complejidad en la creación y el mantenimiento de aplicaciones de procesamiento de eventos. 

La unidad más pequeña en la implementación de una aplicación es una consulta, que se puede iniciar y detener. En la siguiente ilustración se muestra una manera de crear una consulta. Los orígenes de eventos se representan por medio de un adaptador de entrada. El adaptador aporta un flujo de eventos al árbol de operadores, que representa la lógica de consulta deseada, especificada por el diseñador bajo la forma de una plantilla de consulta. A continuación, el flujo de eventos procesado conduce a un receptor de eventos, normalmente un adaptador de salida.

Consulta con adaptadores de entrada y de salida

Los desarrolladores que no estén familiarizados con la terminología del procesamiento de eventos complejos deben leer Conceptos de servidor de StreamInsight y Arquitectura del servidor de StreamInsight.

Proceso de la aplicación

En esta sección se analiza la experiencia típica de creación de una aplicación integral.

Crear instancias de una instancia del servidor y una aplicación

El proceso se inicia con la creación de instancias de una instancia del servidor de StreamInsight y una aplicación.

server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Se debe crear un servidor con un nombre de instancia registrado en el equipo a través del proceso de configuración de StreamInsight (en el ejemplo anterior, MyInstance). Para obtener más información, vea Instalación (StreamInsight).

Una aplicación representa una unidad de ámbito en el servidor que contiene otras entidades de metadatos.

En el ejemplo anterior se crea una instancia del servidor en el mismo proceso. Sin embargo, otra implementación común consiste en conectarse a un servidor remoto y trabajar en una aplicación existente en él. En el siguiente ejemplo se muestra cómo conectarse a un servidor remoto y tener acceso a una aplicación existente.

server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];

Para obtener más información acerca de los servidores local y remoto, vea Publicar en el servidor de StreamInsight y conectarse a él.

Crear un flujo de entrada

A continuación, se crea un flujo de entrada en una implementación del adaptador existente. Para ser precisos, se debe especificar el generador de adaptadores, tal como se muestra en el siguiente ejemplo.

var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

Así se crea un objeto CepStream, que representa un flujo de eventos, que se genera (una vez iniciada la consulta) en un adaptador del que se han creado instancias mediante la clase de generador determinada. El flujo recibe un nombre que se puede utilizar más adelante para recuperar diagnósticos específicos del flujo. Además, se proporciona una instancia de la estructura de configuración para el generador de adaptadores. La estructura de configuración pasa información específica en tiempo de ejecución al generador así como la forma de evento deseada (modelo de evento). Para obtener más información sobre cómo el generador utiliza estos parámetros, vea Crear adaptadores de entrada y de salida.

Definir la consulta

El objeto CepStream se utiliza como base para la definición de la lógica de consulta real. La consulta utiliza LINQ como el lenguaje de especificación de consulta:

var filtered = from e in inputstream
               where e.Value > 95
               select e;

En este ejemplo, suponemos que la clase o estructura denominada MyDataType definida en el ejemplo anterior para crear el objeto de flujo de entrada contiene un campo denominado Value. Esta definición se convierte en un operador de filtro que quita todos los eventos del flujo que no cumplen el predicado del filtro where e.Value > 95. Para obtener más información sobre los operadores de consulta de LINQ, vea Escribir plantillas de consulta en LINQ.

Crear un adaptador de salida

En este punto, el tipo de la variable filtered todavía es CepStream. Esto permite convertir el flujo en una consulta que se puede iniciar. Para generar una instancia de consulta que se pueda iniciar, se debe especificar un adaptador de salida, tal como se muestra en el siguiente ejemplo.

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

De la misma forma que en el flujo de entrada, el adaptador de salida requiere la especificación de un generador de adaptadores de salida, un objeto de configuración, la forma deseada del flujo de salida y la clasificación temporal.

La especificación de la forma del evento garantiza la forma de evento correspondiente en el resultado de la consulta:

  1. EventShape.Point: cualquier duración de evento del resultado se reduce a un evento de punto.

  2. EventShape.Interval: cualquier evento del resultado se interpreta como un evento de intervalo. Es decir, solo se genera el resultado si un evento de incremento de tiempo actual (CTI) confirma su duración completa.

  3. EventShape.Edge: cualquier evento del resultado se interpretará como un evento perimetral. Es decir, su hora de inicio se genera como borde de inicio y su hora de finalización como borde final correspondiente.

El parámetro de orden de eventos del flujo afecta a la agilidad de los flujos de salida de eventos de intervalo. FullyOrdered implica que los eventos de intervalo siempre se generan según el orden de sus horas de inicio, en tanto que ChainOrdered genera una secuencia de salida ordenada por las horas de finalización del intervalo.

Además, se debe proporcionar un objeto de aplicación como el primer parámetro, que ahora contiene la consulta, y un nombre y descripción de la consulta, que identifican de forma más completa esta consulta en el almacén de metadatos.

Iniciar la consulta

El último paso es iniciar la consulta. En este ejemplo, una pulsación de tecla proporcionada por el usuario detiene la consulta.

query.Start();

Console.ReadLine();

query.Stop();

Este ejemplo integral muestra cómo utilizar un enlace implícito de un origen de eventos con una plantilla de consulta a través de sobrecargas de CepStream.Create() y ToQuery() para crear rápidamente una consulta activa. Para un control más explícito sobre el enlace de los objetos CEP, vea Utilizar el enlazador de consultas.

Ejemplo completo

El siguiente ejemplo combina los componentes descritos anteriormente para crear una aplicación completa.

Server server = null;

using (Server server = Server.Create(”MyInstance”))
{
    try
    {
        Application myApp = server.CreateApplication("MyApp");

        var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                       typeof(MyInputAdapterFactory),
                                                       new InputAdapterConfig { someFlag = true },
                                                       EventShape.Point);

        var filtered = from e in inputstream
                       where e.Value > 95
                       select e;

        var query = filtered.ToQuery(myApp,
                                     "filterQuery",
                                     "Filter out Values over 95",
                                     typeof(MyOutputAdapterFactory),
                                     new OutputAdapterConfig { someString = "foo" },
                                     EventShape.Point,
                                     StreamEventOrder.FullyOrdered);

        query.Start();
        Console.ReadLine();
        query.Stop();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Vea también

Conceptos

Utilizar el enlazador de consultas