Creazione di query in fase di esecuzione

La composizione di query di StreamInsight in fase di esecuzione offre flessibilità di query, riutilizzabilità, utilizzo efficiente delle risorse e semplicità di manutenzione. È possibile eseguire le operazioni seguenti:

  • Fornire il risultato di una query alle altre query nello stesso server.

  • Utilizzare l'output di altre query in esecuzione, analogamente a come si utilizzano gli eventi da un adattatore di input.

Due query composte, ad esempio una query 1 che viene inserita in una query 2, vengono eseguite in isolamento. Un errore della query 1 non influisce sullo stato della query 2 e viceversa. È possibile avviare e arrestare la query 1 e la query 2 in modo indipendente. È ad esempio possibile arrestare la query 1, sostituirla con una query diversa e avviarla nuovamente.

In questo argomento vengono descritti diversi casi di utilizzo ed esempi di composizione dinamica di query in fase di esecuzione.

Riutilizzo dell'output di una query esistente

Un caso di utilizzo comune per più query è rappresentato dalla necessità di progettare e distribuire una query primaria che pre-elabori i dati e li invii a un adattatore di output, mentre altre query utilizzano il risultato di questa query e inviano i propri risultati agli altri adattatori di output. Questo scenario è illustrato nella figura seguente.

La query 2 utilizza i dati della query 1

Nell'esempio seguente viene rappresentata una query, creata in un'applicazione myApp esistente in un server StreamInsight.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

Per trasmettere il flusso dei risultati di questa query in una seconda query, viene utilizzato il metodo Query.ToStream(). Il tipo che corrisponde al payload di output della query primaria viene specificato come parametro generico, come illustrato nell'esempio seguente.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

In questo esempio viene eseguito l'accesso al flusso di output della query primaria e viene applicato un operatore di proiezione per introdurre un nuovo campo denominato Status. La seconda chiamata a ToQuery() non richiede più un oggetto applicazione perché può essere derivato dalla query primaria.

Il metodo ToStream() accetta un oggetto AdvanceTimeSettings facoltativo se è necessario inserire CTI (Current Time Increment) in tale punto. L'inserimento di CTI può consentire di aumentare la dinamicità per determinate configurazioni di query.

Si noti che la modalità di creazione dell'oggetto query primario non è importante. Il modello precedente rappresenta un esempio di utilizzo dell'API CepStream.ToQuery(). Di seguito sono riportate le altre modalità possibili di creazione della query:

  • Tramite uno strumento di associazione di query. Ad esempio, myApp.CreateQuery("filterQuery", queryBinder, "description");.

  • Recuperandola tramite l'API del modello a oggetti dal server. Ad esempio, myApp.Queries["filterQuery"].

Output di query non associato

Nell'esempio precedente viene illustrato come riutilizzare il risultato di una query esistente in cui l'output è già associato a un adattatore di output. In alternativa, le query possono anche disporre di un flusso di output non associato, in modo che non venga prodotto alcun output a meno che il risultato non venga utilizzato da una o più delle altre query. Questo scenario è illustrato nella figura seguente.

La query 1 ha un flusso di query non associato

A tale scopo, viene utilizzato un overload di CepStream.ToQuery() che non richiede un adattatore:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

Questa query può essere avviata. Una seconda query può quindi utilizzare il flusso di risultati in un secondo momento specificandolo come illustrato nell'esempio precedente per la query validationQuery. In assenza di un consumer, i risultati della query primaria vengono rimossi.

Questo modello consente inoltre di trasmettere il flusso dei risultati di una query a più adattatori di output. Nel caso più semplice, è possibile eseguire questa operazione utilizzando query pass-through all'inizio di una query non associata, una per ogni adattatore di output (query 2 e 3 nella figura precedente).

Flussi pubblicati

Fino a questo momento, negli esempi è stato utilizzato l'oggetto query effettivo per creare un nuovo flusso di input per un'altra query. Per un'astrazione per gli oggetti sul lato client, è possibile utilizzare l'URI del flusso pubblicato come input per una o più altre query, come illustrato nella figura seguente.

Query che utilizzano un flusso pubblicato come input

Ogni query dispone di un URI (Uniform Resource Identifier) del flusso pubblicato predefinito, che è il nome stesso della query. È inoltre possibile assegnare in modo esplicito un nome di flusso pubblicato personalizzato alla query, tramite il membro appropriato della classe CepStream.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

In questo modo viene creata una query con un output non associato, ma denominato in modo esplicito. Si noti che i nomi di flussi pubblicati devono seguire la convenzione "<application_name>/PublishedStream/<stream_name>".

A questo punto, un'altra query può fare riferimento a questo URI come flusso di input, come illustrato nell'esempio seguente.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Si noti che il consumer del flusso pubblicato deve specificare la forma dell'evento di input, che deve corrispondere alla forma di output della query di riferimento.

La connessione a una query primaria tramite un nome di flusso pubblicato è meno rigorosa rispetto alla connessione tramite l'oggetto query. Quando si definisce la query secondaria, è pertanto necessario che a un'applicazione venga fornito quanto segue:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Adattatori del flusso pubblicato

Quando si recuperano gli adattatori di una query composta (ad esempio, tramite Query.InputStreamBindings), si noterà che per la loro connessione vengono utilizzati adattatori incorporati speciali. La funzionalità di composizione di query tramite CepStream.ToQuery, Query.ToStream() e così via, come illustrato in precedenza, offre comode superfici nella parte iniziale di questi adattatori incorporati. Essi possono essere utilizzati anche in modo esplicito come adattatori comuni, con la propria struttura di configurazione che contiene il nome del flusso pubblicato, come illustrato nell'esempio seguente:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Nello stesso modo, una query può utilizzare l'adattatore di output del flusso pubblicato, che ha le stesse funzionalità di CepStream.toPublishedStreamQuery():

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Utilizzo dello strumento di associazione di query

Il modello di sviluppo dello strumento di associazione di query consente il controllo completo sui vari oggetti di metadati di StreamInsight e separa in modo netto l'associazione e l'utilizzo della query dalla fase di progettazione del modello di query. Questo modello consente inoltre la composizione dinamica della query, sia sul lato dell'associazione di input che su quello dell'associazione di output. Per ulteriori informazioni, vedere Utilizzo dello strumento di associazione di query.

Associazione a un'altra query come input

Analogamente al modo in cui lo strumento di associazione di query può associare un modello di query a un adattatore di input come un producer di eventi, l'associazione può essere eseguita a una query esistente. Si presupponga che sia presente una query primaria (con output associato o non associato), come illustrato nel primo esempio.

var query = filtered.ToQuery(myApp, ...);

È quindi possibile utilizzare lo strumento di associazione di query come indicato di seguito, facendo riferimento alla query precedente nell'overload appropriato di BindProducer.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

In alternativa, lo strumento di associazione di query si può fare riferimento a un flusso pubblicato come un producer di eventi.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Come nel caso della firma di Query.ToStream(), è possibile specificare un oggetto AdvanceTimeSettings facoltativo in BindProducer().

Associazione a un flusso pubblicato come output

Sul lato dell'output, lo strumento di associazione di query consente la trasmissione di un flusso in un flusso pubblicato definito in modo esplicito.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Non appena viene avviata la query basata su questo strumento di associazione di query, altre query possono eseguire l'associazione a un flusso pubblicato come descritto negli esempi precedenti e utilizzare gli eventi del risultato.

Associazione ad adattatori del flusso pubblicato

Nel modello dello strumento di associazione di query è inoltre possibile utilizzare adattatori del flusso pubblicato. Essi possono essere recuperati dall'oggetto applicazione e utilizzati in BindProducer e AddConsumer come adattatori comuni:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Vedere anche

Concetti

Esempio end-to-end di StreamInsight

Avanzamento del tempo applicazione