Ejemplo de StreamInsight: cliente B, crear un sujeto

Este ejemplo muestra el proceso de crear un cliente de StreamInsight que usa un servidor remoto y entidades definidas en el servidor. Este ejemplo en concreto muestra cómo crear un sujeto que está enlazado a varios orígenes y receptores. El sujeto acepta los datos a medida que llegan del flujo de origen y lo entrega en ambos receptores. Para obtener más información acerca de las entidades StreamInsight, vea Conceptos de StreamInsight.

Este ejemplo usa el servidor remoto y las entidades creadas en el ejemplo de servidor de esta sección. Para usar los ejemplos de esta sección conjuntamente, realice lo siguiente:

  1. Ejecute el ejemplo de servidor Ejemplo de StreamInsight: servidor, exponer un servidor insertado

  2. Ejecute uno o ambos ejemplos de cliente:

Paso a paso

En general, un cliente StreamInsight típico sigue estos pasos básicos:

  • Crear una instancia de servidor StreamInsight

  • Crear u obtener una aplicación StreamInsight

  • Definir u obtener un origen

  • Crear una consulta del origen

  • Definir u obtener un receptor

  • Enlazar y ejecutar la consulta y el receptor

En este ejemplo, el cliente obtiene una aplicación y las entidades existentes del servidor, pero el programa también crea un origen, un receptor y un sujeto adicionales.

Conectar al servidor

El proceso de crear un programa cliente de StreamInsight se inicia con la creación de instancias de una instancia del servidor de StreamInsight. En este ejemplo, el cliente se conecta a un servidor remoto denominado “MyStreamInsightServer”.

var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))

Para obtener más información acerca de las opciones disponibles para conectar a un servidor de StreamInsight, vea Publicar en el servidor de StreamInsight y conectarse a él.

Obtener la aplicación de servidor

En este ejemplo, el cliente usa la aplicación StreamInsight que se ha creado en el servidor remoto. Todas las entidades de servidor que usará este cliente se han definido dentro de esta aplicación y las nuevas entidades creadas se crearán dentro de la misma aplicación.

myApp = server.Applications["serverApp"];

Obtener el origen de servidor y definir un nuevo origen

A continuación, obtenga el origen que se ha definido en el servidor y defina un nuevo origen. En este ejemplo, los datos de este segundo origen son un flujo temporal simple de eventos de punto generados cada segundo.

var mySource = myApp.GetObservable<int>("serverSource");
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

Crear consultas de los orígenes

A continuación, cree consultas de los dos orígenes. En este ejemplo, la primera consulta recupera los valores de datos pares del origen de servidor y devuelve el valor + 2000. La segunda consulta recupera los valores de datos impares del segundo origen y devuelve el valor + 3000.

var myQuery = from e in mySource
              where e % 2 == 0
              select e + 2000;
var myQueryB = from e in mySourceB
               where e % 2 == 1
               select e + 3000;

Obtener el receptor de servidor y definir un nuevo receptor

A continuación, obtenga el receptor que se ha definido en el servidor y defina un nuevo receptor. En este ejemplo, el segundo receptor es una función sencilla que simplemente escribe los valores de flujo en la consola.

var mySink = myApp.GetObserver<int>("serverSink");
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

Crear un sujeto

A continuación, cree un sujeto. Un sujeto es un objeto en el servidor que se puede enlazar tanto a orígenes como a receptores, que consume datos de los orígenes y que entrega datos a los receptores.

var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

Enlazar y ejecutar las consultas y los receptores

Ahora, enlace el sujeto a cada uno de los receptores: el definido anteriormente en el servidor y el definido por este cliente. Cada enlace se ejecuta como un proceso independiente. Cuando el sujeto está enlazado al receptor, no fluyen datos porque no se ha enlazado ningún origen. Si el origen se ha enlazado al sujeto antes de que se enlazara un receptor, los datos empezarían a fluir inmediatamente hacia el sujeto y se perderían.

var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");

A continuación, enlace los sujetos a las consultas. Cuando estos procesos se ejecutan, los datos empiezan a fluir desde los orígenes a través de las consultas a cada uno de los receptores.

var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");

Este cliente ahora tiene definida una serie de entidades en el servidor:

  • serverSubject_Client_B

  • serverProcess_Client_B_1

  • serverProcess_Client_B_2

  • serverProcess_Client_B_3

  • serverProcess_Client_B_4

Ejemplo completo

El siguiente ejemplo combina los componentes descritos anteriormente para crear una aplicación completa. Por simplicidad, este ejemplo no comprueba las posibles condiciones de error, sino que supone que el servidor del ejemplo, Ejemplo de StreamInsight: servidor, exponer un servidor insertado, está en ejecución y que se han creado las entidades esperadas.

using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace StreamInsight21_example_Client_B
    /* This example:
     * connects to a remote server
     * gets the app and source defined in the server
     * defines a second source
     * creates simple queries over the 2 sources
     * gets the sink defined in the server
     * defines a second sink
     * binds and runs the subject to both sinks
     * binds and runs the subject to both queries
     * waits for the user to stop the program
     */
{
    class Program
    {
        static void Main(string[] args)
        {
            // Connect to the StreamInsight server
            using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
            {
                /* The following entities are expected to be defined in the server:
                 * serverApp
                 * serverSource
                 * serverSink
                 */
                /* The following entities will be defined in the server by this client:
                 * serverSubject_Client_B
                 * serverProcess_Client_B_1
                 * serverProcess_Client_B_2
                 * serverProcess_Client_B_3
                 * serverProcess_Client_B_4
                 */

                // Get the existing StreamInsight APPLICATION
                var myApp = server.Applications["serverApp"];
                   
                // GET the SOURCE from the server
                var mySource = myApp.GetStreamable<long>("serverSource");

                // DEFINE a second SOURCE (returns a point event every second)
                var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

                // COMPOSE a QUERY on the server source (return every even-numbered item + 2000)
                var myQuery = from e in mySource
                              where e % 2 == 0
                              select e + 2000;

                // COMPOSE a QUERY on the second source (return every odd-numbered item + 3000)
                var myQueryB = from e in mySourceB
                               where e % 2 == 1
                               select e + 3000;
                    
                // GET the SINK from the server
                var mySink = myApp.GetObserver<long>("serverSink");

                // DEFINE a second SINK
                var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

                // CREATE a SUBJECT
                var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

                // BIND the SINKS to the SUBJECT
                var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
                var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
                
                // BIND the SOURCES to the SUBJECT
                var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
                var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
                
                // Wait for the user to stop the program
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine("Client B is running, press Enter to exit the client");
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine(" ");
                Console.ReadLine();

                // Remove the entities we created
                myApp.Entities["serverSubject_Client_B"].Delete();
                procB1.Dispose();
                procB2.Dispose();
                procB3.Dispose();
                procB4.Dispose();
            }
        }
    }
}

Vea también

Otros recursos

Ejemplo de StreamInsight: servidor, exponer un servidor insertado

Ejemplos de StreamInsight

Conceptos de StreamInsight