Building Resilient StreamInsight Applications

 

This topic describes the steps to create a resilient StreamInsight application.

Resiliency is available only in the Premium edition of StreamInsight. For more information, see Choosing a StreamInsight Edition.

For an end-to-end code sample of a resilient application that includes replay and deduplication, see the StreamInsight 2.1 Checkpointing Sample on the StreamInsight Samples page on Codeplex.

Step 1. Configuring a Resilient Server

Required settings

To configure a resilient server, provide values for the following configuration settings when you create the server:

  • A metadata store. You must use SQL Server Compact Edition to store metadata for the server; metadata cannot be stored in memory for resilient servers.

  • A log path. This setting determines where the checkpoint data is stored for resilient queries. The default value of the path is the working directory of the StreamInsight process. A related setting, CreateLogPathIfMissing, determines whether to create the specified directory if it does not exist.

Configuring a server for resiliency makes it possible to capture checkpoints, but does not cause checkpoints to be captured. For information about invoking checkpoints, see Step 3. Capturing Checkpoints.

Managing the checkpoint log path

  • In order to avoid unauthorized reading or tampering with the checkpoint files, ensure that the permissions of the containing folder are set so that only trusted entities have access.

  • Each instance of StreamInsight should have its own log path.

  • Ensure that the process hosting StreamInsight has read and write access to the specified folder.

  • Do not edit the contents of the folder. StreamInsight deletes checkpoint files when they are no longer needed.

Out-of-process servers

In the case of an out-of-process server, to which the client connects by calling Server.Connect, the resiliency configuration is provided by the person who provisions the server. If the out-of-process server has a resiliency configuration, then the client can use it as configured; if the server does not have a resiliency configuration, then the client cannot use resiliency features.

Methods for specifying resiliency options

You can specify the resiliency settings by one of the following methods:

  • Specify the settings programmatically by providing the resiliency configuration as parameters to the Server.Create method.

  • Specify the settings declaratively in the application configuration file.

    • For an in-process server, this is the app.config file.

    • For an out-of-process server, this is the StreamInsightHost.exe.config file, which can be found in the Host folder under the StreamInsight installation folder.

If you use both methods, then the settings that you specify in the API call override the settings in the configuration file.

Creating a resilient server programmatically

The following example shows how to create a resilient in-process server programmatically. For more detailed examples, see Examples. Try to catch any exceptions that would cause checkpointing to fail when you call Server.Create.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();  
metadataConfig.CreateDataSourceIfMissing = true;  
metadataConfig.DataSource = "C:\\CepMetadata.sdf";  
  
CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();  
recoveryConfig.CreateLogPathIfMissing = true;  
recoveryConfig.LogPath = "C:\\CepLogPath";  
  
using(var server = Server.Create("Default", metadataConfig, recoveryConfig))  

Creating a resilient server declaratively

The following example shows how to create a resilient server declaratively by using a configuration file.

  
<?xml version="1.0" encoding="utf-8"?>  
<configuration>  
…  
  
    <appSettings>  
            <add key="InstanceName" value="Default"/>  
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>  
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>  
            <add key="CheckpointEnabled" value="true"/>  
            <add key="CheckpointLogPath" value="CepLogPath"/>  
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>  
    </appSettings>  
    <runtime>  
        <gcServer enabled="true"/>  
    </runtime>  
</configuration>  

Step 2. Defining a Resilient Process

In StreamInsight, you create a process when you run a binding (call the Run() method on IRemoteBinding or IRemoteStreamableBinding objects). StreamInsight supports two types of processes: resilient (checkpointable) and non-resilient (non-checkpointable). StreamInsight imposes no restrictions on creating non-resilient processes; they can be created for temporal and non-temporal queries, and they can be hosted either on resilient or non-resilient servers. Resilient processes, however, can be created only on resilient servers and only for Streamable (temporal) bindings.

To create a resilient process out of IRemoteStreamableBinding, simply call the RunCheckpointable(string id**)** method. Although the process id parameter is optional for the Run() method, it is mandatory for the RunCheckpointable() method.

Typically, you should include the following steps in your code.

  1. Before creating a new process, check to see whether the process already exists in the metadata. If the process already exists, this indicates that the application has recovered from a failure; your code must resume the process rather than recreate it.

  2. If the process does not exist in the metadata, you can define its binding and create the process by calling RunCheckpointable().

By creating a resilient process, you make it possible for the process to capture checkpoints, but this does not cause checkpoints to be captured. For information about invoking checkpoints, see Step 3. Capturing Checkpoints.

The following is an example of how to define a resilient process. For more detailed examples, see Examples.

  
IRemoteStreamableBinding binding = …  
CepCheckpointableProcess process = binding.RunCheckpointable(“MyProcess”);  
  

Step 3. Capturing Checkpoints

After the process enters the "Running" state, you can begin to capture checkpoints to record the state of the running process.

The API methods that support checkpointing follow the typical .NET Task pattern used for long-running asynchronous operations. For more information on .NET Tasks, see Task Class.

  1. To start a new checkpoint for a running checkpointable process, invoke the CheckpointAsync() method on the process object. The CheckpointAsync() method returns a Task object which should handle the required checkpoint until its completion. You can provide an optional CancellationToken parameter to CheckpointAsync() which will be passed to the newly created Task object and can be used to cancel the checkpoint Task if needed.

    /// <summary>  
    /// Take an asynchronous checkpoint for the process.  
    /// </summary>  
    /// <param name="process">Checkpointable process</param>  
    /// <param name="CancellationToken">An optional CancellationToken, to be used to cancel the current checkpoint if needed</param>  
    /// <returns>Task object to process required checkpoint</returns>  
    CancellationTokenSource cancel = new CancellationTokenSource();  
    Task checkpoint = process.CheckpointAsync(cancel.Token);  
    
  2. As a typical .NET Task, you should wait on the checkpoint Task until it completes, times-out, is cancelled, or an error occurs.

    /// <summary>  
    /// Waits for the pending asynchronous checkpoint request to complete.  
    /// </summary>  
    /// <param name="timeout">TimeSpan object used to specify the maximum waiting time for the checkpoint to complete.</param>  
    /// <returns>True if the checkpoint succeeded, false if it timed out.</returns>  
    bool status = checkpoint.Wait(TimeSpan.FromMinutes(1));  
    
  3. You can cancel a checkpoint request by invoking the Cancel() method on the CancellationTokenSource object whose token is passed as a parameter to the initial call to CheckpointAsync(). According to the typical Task pattern, when a checkpoint request is cancelled, you should expect to receive an AggegateException for task cancellation upon Waiting on the Task object.

    /// <summary>  
    /// Cancels the current asynchronous checkpoint request.  
    /// </summary>  
    cancel.Cancel();  
    

This asynchronous Task pattern can be used in two different ways:

  • Follow a call to process.CheckpointAsync with a call to Task.Wait() which blocks the current thread until the checkpoint operation is complete, and then returns the result (or exception).

  • Call process.CheckpointAsync, and then poll the IsCompleted or Status property of the returned Task. When IsCompleted is true, call Task.Wait() to retrieve the result of the checkpoint request.

The Task.Wait() method must be called, or the Task.Exception property must be accessed, regardless of which pattern is used, and even when the checkpoint is canceled.

Errors that occur in the checkpointing process do not stop or affect the associated processes. If you dispose a process while a checkpoint operation is in progress, the checkpoint is canceled.

Step 4. Replaying Events in the Source

To support the replay of events as part of recovery in enumerable and observable sources, StreamInsight supports sequence definition overloads which accept a high water mark as a parameter during recovery time. To ensure output that is complete, all sources must replay all the events in the physical stream that occur at or after the position indicated by the high-water mark.

For example, assume the following enumerable source.

  
var stream = app.DefineEnumerable(()=>  
  Enumerable.Range(0, 1000)  
 .Select(i => PointEvent<long>.CreateInsert(startTime.AddSeconds(i),i))  
 .ToStreamable(AdvanceTimeSettings.IncreasingStartTime);  

This source can be modified as follows to use the high water mark parameter.

  
// Allow the source to be replayable  
var replayableStream = app.DefineEnumerable((DateTimeOffset? hwm) =>  
  Enumerable.Range(0, 1000)  
 .Select(i => PointEvent<long>.CreateInsert(startTime.AddSeconds(i),i))  
 .Where(e => !hwm.HasValue || e.StartTime >= hwm.Value))  
 .ToStreamable(AdvanceTimeSettings.IncreasingStartTime);  

The following is a similar example of an observable source.

  
var stream = app.DefineObservable(()=>  
  Observable.Range(0, 1000)  
 .Select(i => PointEvent<long>.CreateInsert(startTime.AddSeconds(i),i))  
 .ToStreamable(AdvanceTimeSettings.IncreasingStartTime);  

The source can be modified to use the high water mark.

  
// To allow the source shown above to be replayable, we change the  
// definition as shown below  
var replayableStream = app.DefineObservable((DateTimeOffset? hwm) =>  
  Observable.Range(0, 1000)  
 .Select(i => PointEvent<long>.CreateInsert(startTime.AddSeconds(i),i))  
 .Where(e => !hwm.HasValue || e.StartTime >= hwm.Value))  
 .ToStreamable(AdvanceTimeSettings.IncreasingStartTime);  

Note


To support the replay of events as part of recovery in an input adapter, the input adapter factory must implement either the IHighWaterMarkInputAdapterFactory or the IHighWaterMarkTypedInputAdapterFactory interface. Then the call to the Create method of the adapter factory supplies the high-water mark that helps the adapter to identify the events to replay.

Step 5. Eliminating Duplicates in the Sink

To support the elimination of duplicates as part of recovery in an observer sink, StreamInsight supports an observer definition overload which accepts a high water mark and offset as parameters that help the sink to identify duplicate values. This offset is necessary because the location in the output stream corresponding to the checkpoint may fall at any point in the stream. StreamInsight populates these values based on the last available checkpoint during recovery time.

If a query is properly replayed, then any events that were produced after the last checkpoint was captured but before the outage will be produced again upon restart. These are the duplicates that the sink must remove. How these are removed is up to the sink: the original copies can be abandoned, or the duplicate copies can be ignored.

To ensure output that is equivalent, all sources must properly replay input events, and all sinks must remove all the duplicate events in the physical stream that occurred before the outage and that occur at or after the position indicated by the high-water mark offset.

For example, assume the following observer sink.

  
var sink = app.DefineObserver( ()=>  
              Observer.Create<IntervalEvent<long>>(v => output.Add(v))  
           );  

This sink can be modified as follows to remove duplicates after failure.

  
Func<DateTimeOffset?, int, List<IntervalEvent<long>>, IObserver<IntervalEvent<long>>> observer = delegate(DateTimeOffset? hwm, int offset, List<IntervalEvent<long>> lst)  
{  
  if (hwm.HasValue)  
  {  
    var forget = output.SkipWhile(e => e.StartTime < hwm.Value)  
                       .Skip(offset).Count();  
    output.RemoveRange(output.Count() - forget, forget);  
  }  
  return Observer.Create<IntervalEvent<long>>(v => output.Add(v));  
};  
var sink = app.DefineObserver( (DateTimeOffset? hwm, int offset) =>    
             observer(hwm, offset, output)  
           );  

Note


To support the elimination of duplicates as part of recovery in an output adapter, the output adapter factory must implement either the IHighWaterMarkOutputAdapterFactory or the IHighWaterMarkTypedOutputAdapterFactory interface. Then the call to the Create method of the adapter factory supplies the high-water mark and the offset value. When the query is started for the first time, then the adapter factory's Create method is called without the high-water mark and offset. If the server has not yet captured any checkpoints for the query, then the adapter factory's Create method is called with a high-water mark of DateTime.MinValue and an offset of 0 (zero).

Step 6. Recovering from failure

The server automatically performs recovery on startup and brings all queries and processes into a consistent state. This is an asynchronous operation; as a result, the call to Server.Create returns before the recovery has finished.

  • Queries of non-resilient processes are put into the Stopped state. This behavior has not changed.

  • Queries of resilient processes are put into the Initializing state. Then the server loads the state of each query using the most recent checkpoint available in the server log path.

You can call Start at this point to restart queries or call Resume to resume a process. Queries of resilient processes will be restarted as soon as the initialization is complete.

The startup code must perform the following steps to recover from failure:

  1. Retrieve the list of the application's queries and processes from the metadata.

  2. For each query or process, check to see whether it already exists in the metadata.

    1. If it already exists, restart it.

    2. If it does not exist in the metadata, create it and define it as resilient, as described above under Step 2. Defining a Resilient Process.

If a problem occurs during recovery itself, you can restart the server without resiliency.

Shutting down without disabling recovery

You can shut down the server without disabling recovery by invoking the Dispose() method on the Server object.

  • Queries of non-resilient processes are stopped.

  • Queries of resilient processes are suspended. When you restart the server, the server tries to recover the state of suspended queries; once the state of a suspended query is recovered, it can be resumed. StreamInsight resilient servers allow queries to be started and processes to be resumed even before the underlying queries complete recovery.

The metadata for both non-resilient and resilient processes is preserved when you shut down the server in this way.

Examples

For an end-to-end code sample of a resilient application that includes replay and de-duplication, see the StreamInsight 2.1 Checkpointing Sample on the StreamInsight Samples page on Codeplex.

Creating a Resilient Server

  
private static Server CreateResilientServer()  
{  
  var mdConfig = new SqlCeMetadataProviderConfiguration {  
                       DataSource = MetadataFileName,  
                       CreateDataSourceIfMissing = true  
                     };  
  var recoveryConfig = new CheckpointConfiguration {  
                         LogPath = LogSubdirectoryName,  
                         CreateLogPathIfMissing = true  
                      };  
  return Server.Create(InstanceName, mdConfig, recoveryConfig);  
}  

Defining a Remote Streamable Binding with Replayable Input Adapter and De-duplication Output Adapter

  
private static IRemoteStreamableBinding CreateBinding  
       ( Application app, List<IntervalEvent<long>> output )  
{  
 DateTimeOffset startTime = new DateTimeOffset(2012,1,1,12,0,0,TimeSpan.Zero);  
 IQStreamable<long> stream1 = app.DefineEnumerable((DateTimeOffset? hwm) =>  
       Enumerable.Range(0,10000)  
       .Select(i => PointEvent<long>.CreateInsert(startTime.AddSeconds(i), i))  
       .Where(e => !hwm.HasValue || e.StartTime >= hwm.Value))  
       .ToStreamable(AdvanceTimeSettings.IncreasingStartTime);  
  
 IQStreamable<long> stream2 = app.DefineObservable((DateTimeOffset? hwm) =>  
       Observable.Interval(TimeSpan.FromMilliseconds(5))  
       .Select(i => PointEvent<long>.CreateInsert(startTime.AddSeconds(i),i))  
       .Where(e => !hwm.HasValue || e.StartTime >= hwm.Value))  
       .ToStreamable(AdvanceTimeSettings.StrictlyIncreasingStartTime);  
  
 IQStreamable<long> query = from win in stream1.Union(stream2).HoppingWindow(  
                                          TimeSpan.FromSeconds(10),  
                                          TimeSpan.FromSeconds(5))  
                            from x in (from e in win   
                                       orderby e descending  
                                       select e).Take(100)  
                            select x;  
 var observable = query.ToIntervalObservable(StreamEventOrder.FullyOrdered);  
 var observerCreator = app.DefineObserver((DateTimeOffset? hwm, int offset) =>  
                                DefineDedupObserver(hwm, offset, output));  
 return query.Bind(observerCreator);  
}  
  
private static IObserver<IntervalEvent<long>> DefineDedupObserver  
  ( DateTimeOffset? hwm, int offset, List<IntervalEvent<long>> output)  
{  
   Action<IntervalEvent<long>> onNextHandler = delegate(IntervalEvent<long> v)  
   {  
      Console.WriteLine("Head: OnNext: {0}", v.EventKind == EventKind.Cti ?  
        v.StartTime.ToString("o") :   
        "Payload:" + v.Payload + " ST: " + v.StartTime + " ET: " + v.EndTime);  
      output.Add(v);  
    };  
  
    if (hwm.HasValue)  
    {  
      var forget = output.SkipWhile(e => e.StartTime < hwm.Value)  
                         .Skip(offset).Count();  
      output.RemoveRange(output.Count() - forget, forget);  
    }  
  
    return Observer.Create<IntervalEvent<long>>(  
                 onNextHandler,  
                 e => Console.WriteLine("Head: Error: {0}", e),  
                 () => Console.WriteLine("Head: Completed")  
                );  
}  

Continuous Checkpointing Thread

  
private static Thread CheckpointingThread  
  (CepCheckpointableProcess process, Func<bool> abortThread)  
{  
  Thread t = new Thread(delegate()  
  {  
    int checkpoints = 0;  
    var rnd = new Random();  
    bool cancelRequested = false;  
    var server = process.Application.Server;  
    var queryUri = server.Enumerate(new Uri(process.Name + "/Query")).First();  
    while (true)  
    {  
      try  
      {  
       var cancel = new CancellationTokenSource();  
       var dv = server.GetDiagnosticView(queryUri);  
       var state = (string)dv[DiagnosticViewProperty.QueryState];  
       if (state == "Running")  
       {  
         Task checkpointTask = process.CheckpointAsync(cancel.Token);  
         if (rnd.Next() % 5 == 0)  
         {  
           Thread.Sleep(rnd.Next() % 10);  
           cancel.Cancel();  
           cancelRequested = true;  
          }  
          bool checkpointStatus = checkpointTask.Wait(TimeSpan.FromMinutes(1));  
          if (checkpointStatus == true)  
            Console.WriteLine("Checkpoint #{0} completed ", ++checkpoints);  
          else  
            Console.WriteLine("Checkpoint #{0} timed out !", ++checkpoints);  
       }  
       if (abortThread()) break;  
       Thread.Sleep(500);  
      }  
      catch (AggregateException e)  
      {  
       if (cancelRequested)  
       {  
         Console.WriteLine("Checkpoint #{0} was cancelled !", ++checkpoints);  
         cancelRequested = false;  
       }  
       else  
         Console.WriteLine(e);  
      }  
    }  
    });  
    t.Start();  
    return t;  
}  

Checkpointing and Recovery

  
private static void Main(string[] args)  
{  
 List<IntervalEvent<long>> output = new List<IntervalEvent<long>>();  
 bool abortCheckpointing = false;  
 for (int i = 0; i < 2; i++)  
 {  
  using (Server server = CreateResilientServer())  
  {  
    if (server.Applications.ContainsKey(ApplicationName))  
     Console.WriteLine("Connecting to Application '" + ApplicationName + "'");  
    else {  
      Console.WriteLine("Creating Application '" + ApplicationName + "'");  
      server.CreateApplication(ApplicationName);  
    }  
    Application app = server.Applications[ApplicationName];  
    CepCheckpointableProcess process = null;  
    if (app.CheckpointableProcesses.ContainsKey(ProcessName))  
    {  
      Console.WriteLine("Resuming Process '" + ProcessName + "'");  
      process = app.CheckpointableProcesses[ProcessName];  
      process.Resume();  
    } else {  
      var hydra = CreateBinding(app, output);  
      Console.WriteLine("Creating Process '" + ProcessName + "'");  
      process = hydra.RunCheckpointable(ProcessName);  
    }  
    Console.WriteLine("*** Press <enter> to end. ***");  
    // Take checkpoints only in first round  
    if (i == 0)  
    {  
      var t = CheckpointingThread(process, () => abortCheckpointing);  
      Console.ReadLine();  
      abortCheckpointing = true;  
      while (t.IsAlive)  
        Thread.Sleep(100);  
    }  
    else  
      Console.ReadLine();  
    Console.WriteLine("*** Disposing Server ***");  
  }  
 }  
 Console.WriteLine("*** Exiting. ***");  
}  

See Also

StreamInsight Resiliency
Building Resilient StreamInsight Applications
Monitoring Resilient StreamInsight Applications
Troubleshooting Resilient StreamInsight Applications