Adelantar tiempo de aplicación

 

Los desarrolladores de software de StreamInsight deben encontrar el equilibrio entre las necesidades de los orígenes de datos que pueden tener datos desordenados y los requisitos de procesamiento de los eventos con gran agilidad. Si bien adelantar el tiempo de aplicación con más rapidez ayuda a reducir la latencia, reduce el espacio de tiempo disponible para los datos que llegan tarde; es decir, la capacidad de los datos de llegar desordenados. StreamInsight proporciona varias maneras de razonar con respecto al tiempo de aplicación. En este tema se describen los diferentes niveles y directivas para adelantar el tiempo de aplicación en el nivel del adaptador y con enlaces de consultas.

Descripción del modelo de tiempo

El modelo de tiempo de StreamInsight solamente se basa en el tiempo de aplicación y nunca en el tiempo de sistema. Esto significa que todos los operadores de tiempo hacen referencia a la marca de tiempo de los eventos y nunca al reloj de sistema del equipo host. Como resultado, las aplicaciones deben comunicar su tiempo de aplicación actual al servidor de StreamInsight. El tiempo de aplicación de una aplicación determinada depende de muchos aspectos diferentes en el contexto de la aplicación. En última instancia, es responsabilidad del desarrollador de la aplicación proporcionar la tiempo de aplicación adecuado al servidor de StreamInsight. A continuación se indican las consideraciones principales que se deben tener en cuenta sobre el tiempo de aplicación:

  • Orígenes de datos

    Cuando los orígenes de datos comunican información de tiempo, estos datos se pueden utilizar para identificar el momento en que se han recibido todos los eventos del origen de datos. Este momento constituye el tiempo de aplicación actual con respecto a este origen de datos. Tenga en cuenta que los distintos orígenes de datos pueden funcionar a diferentes velocidades.

  • Datos desordenados

    Con algunos orígenes de datos, los eventos no siempre llegan en el orden de sus marcas de tiempo. Es decir, los datos están desordenados. StreamInsight puede aceptar datos desordenados y garantiza que los resultados no dependen del orden en el que los eventos llegan al servidor de StreamInsight. Los desarrolladores de StreamInsight pueden adelantar el tiempo de aplicación con cierta demora para permitir la llegada de eventos desordenados en aquellos orígenes de datos que tienen eventos tardíos.

  • Agilidad del resultado

    Las consultas de StreamInsight generan resultados precisos hasta el tiempo de aplicación actual. Es decir, los resultados emergen de las consultas de StreamInsight a medida que finalizan en el progreso de tiempo de aplicación total.

Incrementos de tiempo actual (CTI)

Durante el procesamiento de la consulta, el tiempo de aplicación se controla mediante eventos de incremento de tiempo actual (CTI). Un evento CTI es un evento de puntuación que constituye un componente central del modelo de tiempo de StreamInsight. Los CTI se usan para confirmar las secuencias de eventos y enviar los resultados calculados al resultado de la consulta mediante la declaración al servidor de StreamInsight de que ciertas partes de la escala de tiempo ya no cambiarán más. Por tanto, es fundamental poner en cola los CTI junto con los eventos del flujo de eventos de entrada para generar los resultados y vaciar el estado de los operadores con estado.

Al poner en cola un CTI, la entrada garantiza que no producirá ningún evento posterior que pueda influir en el período anterior a la marca de tiempo del CTI. Esto implica que, después de poner en cola un CTI en la entrada:

  • Para que los eventos de forma Punto, Intervalo o Perímetro comiencen: la hora de inicio tiene que ser igual o posterior al CTI.

  • Para que los eventos de forma Perímetro terminen: la hora de finalización tiene que ser igual o posterior al CTI.

Si se infringen estas reglas, estaremos hablando de una infracción de CTI. A continuación se describe la forma de tratar estas infracciones.

Existen tres métodos para insertar los eventos CTI en un flujo de entrada.

  1. Poner en cola los CTI mediante programación a través del adaptador de entrada, de forma similar a cómo se ponen en cola los eventos.

  2. Generar eventos CTI mediante declaración con una frecuencia determinada. Esto se puede especificar a través de AdvanceTimeGenerationSettings en el generador de adaptadores o como parte del enlace de consultas.

  3. Definir un flujo de entrada independiente como origen de CTI. Este método solo se puede especificar en el enlace de consultas.

Cuando se implementan los métodos 2 y 3, también se debe implementar una directiva para las infracciones de CTI. En la siguiente sección se describen AdvanceTimeGenerationSettings y las directivas de infracción. En las secciones siguientes se describe cómo utilizar los valores de adelanto de tiempo en el generador de adaptadores así como en el enlace de consultas.

Generación de CTI

La generación de eventos CTI (descrita anteriormente en los métodos 2 y 3) tiene dos dimensiones:

  1. La frecuencia de la generación, que se especifica o como parámetro N entero positivo o como parámetro T del intervalo de tiempo. La directiva de frecuencia de generación inserta CTI después de la aparición del recuento de eventos (N) o intervalo de tiempo (T).

  2. La marca de tiempo de los CTI generados, que se especifica como retraso con respecto al último evento recibido.

Además, puede usar una marca booleana para indicar si se debe insertar un último CTI con una marca de tiempo de infinito positivo cuando se cierre la consulta. Esta opción se usa para vaciar todos los eventos restantes de los operadores de la consulta.

La generación CTI se define a través de la clase AdvanceTimeGenerationSettings, cuyo constructor toma la frecuencia, el retraso y la marca, tal como se muestra en el siguiente ejemplo.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);  

En este ejemplo se indica al motor que inserte un CTI después de cada 10 eventos que procedan del origen del evento. El CTI lleva una marca de tiempo del tiempo del último evento menos 5 segundos. Este mecanismo de retraso implementa de forma efectiva un período de gracia para que el origen del evento pueda poner en cola eventos tardíos sin infringir la semántica de CTI (siempre que los eventos no se retrasen más de 5 segundos). Cuando se cierra la consulta correspondiente, se pone en cola un CTI con tiempo infinito.

Tenga en cuenta que al especificar una frecuencia para la generación CTI a través de AdvanceTimeSettings, no se tienen en cuenta los bordes finales. Tampoco se consideran cuando se usa una duración como una frecuencia. Solo los bordes iniciales están considerados en el caso de eventos perimetrales para frecuencia y duración.

Directivas de infracción de CTI

Es posible que un origen del evento infrinja la semántica de CTI al enviar eventos con una marca de tiempo anterior a los eventos CTI insertados. La configuración de adelanto de tiempo permite especificar una directiva para controlar estas situaciones. La directiva puede tener los dos valores siguientes:

  • Quitar

    Los eventos que infringen el CTI insertado se quitan y no se ponen en cola en la consulta.

  • Ajustar

    Los eventos que infringen el CTI insertado se modifican si su duración se superpone con la marca de tiempo del CTI. Es decir, la marca de tiempo de inicio de los eventos se establece en la marca de tiempo del CTI más reciente, de tal modo que estos eventos pasan a ser válidos. Si tanto la hora de inicio como la de finalización de un evento son anteriores a la marca de tiempo del CTI, se quita el evento.

Configuración de adelanto de tiempo del adaptador

Nota


Los adaptadores de entrada y de salida se presentaron en una versión anterior de StreamInsight. Aunque han quedado sustituidos por el modelo de desarrollo actual, siguen estando disponibles para los desarrolladores que realizan el mantenimiento de código heredado. Para obtener más información acerca del modelo de desarrollo actual, vea Guía del desarrollador de software (StreamInsight).

La configuración para adelantar el tiempo de aplicación se puede especificar en la definición del generador de adaptadores. Del mismo modo que se llama al método Create() del generador cuando se crean instancias de un adaptador, se llama a un método correspondiente para definir la configuración de tiempo de adelanto de la instancia de un adaptador. Para ello, se usa la interfaz ITypedDeclareAdvanceTimeProperties para un adaptador con tipo (o IDeclareAdvanceTimeProperties para un adaptador sin tipo), tal como se muestra en el siguiente ejemplo.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,  
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>  

Esta interfaz requiere que se implemente el siguiente método como parte del generador.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)  
{  
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);  
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);  
    return ats;  
}  

Se llama al método DeclareAdvanceTimeProperties() en cada adaptador para el que se han creado instancias con la misma estructura de configuración y parámetro de forma de evento especificados en la llamada al método Create() correspondiente. Esto permite que el autor del adaptador derive la configuración correcta de generación de CTI a partir de la información de configuración, sin necesidad de que el escritor y el enlazador de la consulta conozcan los aspectos específicos de la configuración de adelanto de tiempo.

El constructor AdapterAdvanceTimeSettings necesita tanto el objeto AdvanceTimeGenerationSettings como la directiva de infracción descritos anteriormente.

Generación de CTI

De forma similar al objeto AdapterAdvanceTimeSettings, la generación de eventos CTI se puede especificar mediante declaración en el enlace de consultas, tal como se muestra en el siguiente ejemplo. Esto permite que el usuario que enlaza la consulta defina el comportamiento del tiempo de aplicación del CTI de forma independiente con respecto a la implementación del adaptador.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);  
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);  

El constructor AdvanceTimeSettings toma los tres argumentos siguientes:

  1. El objeto AdvanceTimeGenerationSettings

  2. El objeto AdvanceTimeImportSettings

  3. La directiva de infracción

Observe que se pueden establecer en NULL los argumentos de la configuración de generación o de la configuración de importación, pero no ambos. Además, se pueden especificar juntos. En la sección siguiente se presenta la clase AdvanceTimeImportSettings.

En el ejemplo anterior se especifica la generación e inserción de un CTI con todos los eventos, con la marca de tiempo del evento (sin retraso). El objeto AdvanceTimeSettings se puede pasar como último parámetro opcional al método CepStream.Create() tal como se muestra en el siguiente ejemplo.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);  
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);  
  
var inputstream = CepStream<MyPayloadType>.Create("inputStream",  
                                                  typeof(MyInputAdapterFactory),  
                                                  new MyConfiguration(),  
                                                  EventShape.Point,  
                                                  ats);  

También se puede utilizar en el modelo de desarrollo del enlazador de consultas:

queryBinder.BindProducer<MyPayloadType>("filterInput",  
                                        inputAdapter,  
                                        new MyConfiguration(),  
                                        EventShape.Point,  
                                        ats);  

Sincronizar con otro flujo

Cuando los CTI se utilizan durante el enlace de consultas, además de (o en lugar de) generarlos en función de una frecuencia, también se pueden copiar desde otro flujo de entrada a la consulta mediante AdvanceTimeImportSettings. Esta característica permite la sincronización de los dos flujos, tal como se muestra en el siguiente ejemplo.

var dataStream = CepStream<DataType>.Create("dataStream ",  
                                            typeof(DataInputAdapterFactory),  
                                            new MyDataAdapterConfiguration(),  
                                            EventShape.Point);  
  
var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);  
  
var lookupStream = CepStream<LookupType>.Create("lookupStream",  
                                                typeof(ReferenceInputAdapterFactory),  
                                                new MyReferenceConfiguration(),  
                                                EventShape.Edge,  
                                                ats);  
  
var joined = from eLeft in dataStream  
             join eRight in lookupStream  
             where ...  

En este ejemplo se muestra un caso de uso típico en el que un flujo de datos "rápido" debe combinarse con un flujo de referencia "lento". El flujo lento puede consistir en datos de búsqueda que cambian con mucha menos frecuencia que el flujo rápido. Para que la combinación genere la salida tan rápido como la entrada más rápida, el flujo de entrada lento se sincroniza con el flujo rápido mediante la importación de sus CTI. En este ejemplo, se considera que el control de tiempo de aplicación del flujo rápido tiene lugar en el adaptador.

Agilidad del resultado

El parámetro de retraso de la configuración de la generación de adelanto de tiempo especifica la marca de tiempo de los CTI insertados. Resulta importante comprender la semántica precisa de los CTI en el marco de StreamInsight para lograr el efecto deseado de agilidad de la salida. Un evento CTI declara al motor que todo lo que existe en la escala de tiempo estrictamente antes de la marca de tiempo del CTI está confirmado. Esto tiene diferentes implicaciones para la agilidad del resultado.

Por ejemplo, considere un flujo de entrada de eventos de punto y una configuración de generación de CTI de frecuencia 1 (todos los eventos) y un retraso 0. Esto genera eventos CTI con la misma marca de tiempo exacta que cada evento de punto. Sin embargo, esto significa que el último evento de punto solo se confirmará con el CTI siguiente, ya que su marca de tiempo no está estrictamente antes que el CTI correspondiente. Para confirmar cada evento de punto en cuanto lo emita el adaptador, se debe asignar la marca de tiempo a los CTI inmediatamente después de los eventos de punto. Esto se traduce en un retraso negativo de un tic, tal como se muestra en el siguiente ejemplo.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);  

Operadores de consulta y CTI

Los CTI se insertan de la manera descrita anteriormente. Se propagan a través de la consulta y son procesados de manera diferente por ciertos operadores. Por ejemplo, los operadores de combinación liberan sus resultados hasta el CTI más antiguo de cualquiera de los lados. Los operadores de unión liberan el resultado más antiguo de los CTI más recientes por cualquier lado. La consulta completa solo liberará su resultado hasta el CTI más reciente.

Por otro lado, ciertos operadores tienen un efecto en marcas de tiempo de CTI. Las ventanas de salto sitúan los CTI al principio de la ventana porque el resultado de la operación en la parte superior de la ventana puede cambiar a medida que los eventos siguen llegando a dicha ventana. Los métodos AlterEventStartTime() y AlterEventLifeTime() cambian la hora de inicio de los eventos, y la misma transformación se aplicará a los CTI.

Vea también

Crear adaptadores de entrada y de salida
Conceptos de servidor de StreamInsight