Ejemplo de StreamInsight: servidor, exponer un servidor insertado

 

Este ejemplo muestra el proceso de crear un servidor de StreamInsight insertado y exponerlo para que los programas cliente lo usen como servidor remoto. Además de crear el servidor y hacer que esté disponible, este ejemplo funciona como un cliente en sí mismo mediante la creación de un origen y una consulta, el enlace a un receptor y la ejecución del enlace como un proceso. Para obtener más información acerca de las entidades StreamInsight, vea Conceptos de StreamInsight.

Las entidades creadas en este ejemplo están diseñadas para que se usen en otros ejemplos de esta sección. Para usar los ejemplos de esta sección conjuntamente, realice lo siguiente:

  1. Ejecute este ejemplo de servidor

  2. Ejecute uno o ambos ejemplos de cliente:

Paso a paso

En general, un cliente StreamInsight típico sigue estos pasos básicos:

  • Crear una instancia de servidor StreamInsight

  • Crear u obtener una aplicación StreamInsight

  • Definir u obtener un origen

  • Crear una consulta del origen

  • Definir u obtener un receptor

  • Enlazar y ejecutar la consulta y el receptor

En este ejemplo, el programa crea todas las entidades que necesita y las implementa en el servidor para que las usen otros clientes.

Crear la instancia de servidor

El proceso de crear un programa de StreamInsight se inicia con la creación de instancias de una instancia del servidor de StreamInsight. En este ejemplo, el servidor está insertado en el programa.

server = Server.Create("Default");  

Se debe crear un servidor mediante un nombre de instancia registrado en el equipo a través del proceso de configuración de StreamInsight (en este ejemplo, Default). Para obtener más información, vea Instalación (StreamInsight).

A continuación, se expone un extremo para el servidor insertado de modo que los programas StreamInsight cliente puedan conectarse y usarlo como servidor de StreamInsight remoto.

  
var host = new ServiceHost(server.CreateManagementService());  
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
  

Los programas cliente también pueden usar el servicio Windows de host de StreamInsight que se crea al instalar StreamInsight. Para obtener más información acerca de las opciones disponibles para conectar a un servidor de StreamInsight, vea Publicar en el servidor de StreamInsight y conectarse a él.

Crear la aplicación

Una aplicación representa una unidad de ámbito en el servidor. Todas las demás entidades se crean en la aplicación.

var myApp = server.CreateApplication("serverApp");  

Definir e implementar un origen

A continuación, se define y se implementa un origen de entrada en el servidor con un nombre para que lo puedan usar otros clientes StreamInsight. En este ejemplo, los datos son un flujo temporal simple de eventos de punto generados cada segundo.

  
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
mySource.Deploy("serverSource");  
  

Crear una consulta del origen

A continuación, cree una consulta del origen de entrada. La consulta usa LINQ como el lenguaje de especificación de consulta. En este ejemplo, la consulta devuelve el valor de todos los eventos con numeración par.

  
var myQuery = from e in mySource  
              where e % 2 == 0  
              select e;  
  

Técnicamente, esta definición se convierte en un operador de filtro que quita todos los eventos de la secuencia que no cumplen el predicado del filtro (where e % 2 == 0) y devuelve el valor de evento. Para obtener más información sobre los operadores de consulta de LINQ, vea Usar LINQ de StreamInsight.

Definir e implementar un receptor

A continuación, se crea un receptor de salida que se puede enlazar a la consulta y procesar la secuencia resultante. En este ejemplo, se crea una función sencilla que simplemente escribe los valores de flujo en la consola.

var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  

A continuación, se implemente el receptor en el servidor con un nombre.

mySink.Deploy("serverSink");  

Enlazar y ejecutar la consulta y el receptor

En este punto, la consulta observable se puede enlazar al receptor de salida del observador y, a continuación, se ejecuta en un proceso en el servidor.

var proc = myQuery.Bind(mySink).Run("serverProcess");  

En el ejemplo completo siguiente, el proceso sigue ejecutándose hasta que el usuario lo detiene al escribir en la consola.

Ejemplo completo

El siguiente ejemplo combina los componentes descritos anteriormente para crear una aplicación completa. Por simplicidad, este ejemplo no comprueba las posibles condiciones de error.

  
using System;  
using System.ServiceModel;  
using Microsoft.ComplexEventProcessing;  
using Microsoft.ComplexEventProcessing.Linq;  
using Microsoft.ComplexEventProcessing.ManagementService;  
using System.Reactive;  
using System.Reactive.Linq;  
  
namespace StreamInsight21_example_Server  
  
    /* This example:  
     * creates an embedded server instance and makes it available to other clients  
     * defines, deploys, binds, and runs a simple source, query, and sink  
     * waits for the user to stop the server  
     */  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            // Create an embedded StreamInsight server  
            using (var server = Server.Create("Default"))  
            {  
                // Create a local end point for the server embedded in this program  
                var host = new ServiceHost(server.CreateManagementService());  
                host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
                host.Open();  
  
                /* The following entities will be defined and available in the server for other clients:  
                 * serverApp  
                 * serverSource  
                 * serverSink  
                 * serverProcess  
                 */  
  
                // CREATE a StreamInsight APPLICATION in the server  
                var myApp = server.CreateApplication("serverApp");  
  
                // DEFINE a simple SOURCE (returns a point event every second)  
                var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
  
                // DEPLOY the source to the server for clients to use  
                mySource.Deploy("serverSource");  
  
                // Compose a QUERY over the source (return every even-numbered event)  
                var myQuery = from e in mySource  
                              where e % 2 == 0  
                              select e;  
  
                // DEFINE a simple observer SINK (writes the value to the server console)  
                var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  
  
                // DEPLOY the sink to the server for clients to use  
                mySink.Deploy("serverSink");  
  
                // BIND the query to the sink and RUN it  
                using (var proc = myQuery.Bind(mySink).Run("serverProcess"))  
                {  
                    // Wait for the user stops the server  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine(" ");  
                    Console.ReadLine();  
                }  
                host.Close();  
            }  
        }  
    }  
}  
  

Vea también

Ejemplos de StreamInsight
Conceptos de StreamInsight