Building Resilient StreamInsight Applications

This topic describes the steps to create a resilient StreamInsight application.

Resiliency is available only in the Premium edition of StreamInsight. For more information, see Choosing a StreamInsight Edition.

For an end-to-end code sample of a resilient application that includes replay and deduplication, see the Checkpointing Sample on the StreamInsight Samples page on Codeplex.

In This Topic

  1. Step 1. Configuring a Resilient Server

  2. Step 2. Defining a Resilient Query

  3. Step 3. Capturing Checkpoints

  4. Step 4. Replaying Events in the Input Adapter

  5. Step 5. Eliminating Duplicates in the Output Adapter

  6. Step 6. Recovering from failure

  7. Shutting down without disabling recovery

  8. Examples

Step 1. Configuring a Resilient Server

Required settings

To configure a resilient server, provide values for the following configuration settings when you create the server:

  • A metadata store. You must use SQL Server Compact to store metadata for the server; metadata cannot be stored in memory.

  • A log path. This setting determines where the checkpoint data is stored for resilient queries. The default value of the path is the working directory of the StreamInsight process. A related setting, CreateLogPathIfMissing, determines whether to create the specified directory if it does not exist.

Configuring a server for resiliency makes it possible to capture checkpoints, but does not cause checkpoints to be captured. For information about invoking checkpoints, see Step 3. Capturing Checkpoints.

Managing the checkpoint log path

  • In order to avoid unauthorized reading or tampering with the checkpoint files, ensure that the permissions of the containing folder are set so that only trusted entities have access.

  • Each instance of StreamInsight should have its own log path.

  • Ensure that the process hosting StreamInsight has read and write access to the specified folder.

  • Do not edit the contents of the folder. StreamInsight deletes checkpoint files when they are no longer needed.

Out-of-process servers

In the case of an out-of-process server, to which the client connects by calling Server.Connect, the resiliency configuration is provided by the person who provisions the server. If the out-of-process server has a resiliency configuration, then the client can use it as configured; if the server does not have a resiliency configuration, then the client cannot use resiliency features.

Methods for specifying resiliency options

You can specify the resiliency settings by one of the following methods:

  • Specify the settings programmatically by providing the resiliency configuration when you call Server.Create.

  • Specify the settings declaratively in the application configuration file.

    • For an in-process server, this is the app.config file.

    • For an out-of-process server, this is the StreamInsightHost.exe.config file, which can be found in the Host folder under the StreamInsight installation folder.

If you use both methods, then the settings that you specify in the API call override the settings in the configuration file.

Creating a resilient server programmatically

The following example shows how to create a resilient in-process server programmatically. For more detailed examples, see Examples. Try to catch any exceptions that would cause checkpointing to fail when you call Server.Create.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

Creating a resilient server declaratively

The following example shows how to create a resilient server declaratively by using a configuration file.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

TOP

Step 2. Defining a Resilient Query

To create a resilient query, include the following steps in your code.

  1. Before creating a new query, check to see whether the query already exists in the metadata. If the query already exists, this indicates that the application has recovered from a failure. Your code must restart the query rather than recreate it.

  2. If the query does not exist in the metadata, create it and define it as resilient by specifying true for the IsResilient parameter of the ToQuery method. You can also call the Application.CreateQuery method with the IsResilient parameter.

Configuring a query for resiliency makes it possible to capture checkpoints, but does not cause checkpoints to be captured. For information about invoking checkpoints, see Step 3. Capturing Checkpoints.

Example of defining a resilient query

For more detailed examples, see Examples.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

TOP

Step 3. Capturing Checkpoints

After the query or queries are running, capture checkpoints periodically to record the state of the queries.

The API methods that support checkpointing follow the typical pattern for an asynchronous operation.

  1. To invoke a checkpoint call the BeginCheckpoint method. If you provide the optional AsyncCallback, it will be called when the checkpoint is complete. The IAsyncResult returned from the call to BeginCheckpoint identifies this checkpoint request and can be used later in calls to EndCheckpoint or CancelCheckpoint.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. The EndCheckpoint method blocks until the checkpoint operation is complete. If the checkpoint operation succeeds, the call returns true; if errors occur, the call raises an exception.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. You can also call CancelCheckpoint to cancel the checkpointing process. When the call the CancelCheckpoint succeeds, the subsequent call to EndCheckpoint returns false.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

This asynchronous pattern can be used in three different ways:

  • A call to BeginCheckpoint can be followed by a call to EndCheckpoint. EndCheckpoint then blocks until the checkpoint operation is complete, and then returns the result (or exception). In this pattern, the asyncCallback and asyncState are typically not used.

  • BeginCheckpoint can be called, and the user can then poll the IsCompleted property of the returned IAsyncResult. When IsCompleted is true, EndCheckpoint can be called to retrieve the result. In this pattern, the asyncCallback and asyncState are typically not used.

  • BeginCheckpoint can be called with a callback method. In this case, the asyncState can be used to identify the call and return any necessary information to the callback method. When the callback runs, it calls EndCheckpoint to retrieve the result.

The EndCheckpoint method must be called, regardless of which pattern is used, and even when the checkpoint is canceled. This method is the only way for the user to get a return value from the call, and the only way for StreamInsight to know that the call is complete. You cannot begin another checkpoint until you have called EndCheckpoint.

Errors that occur in the checkpointing process do not stop or affect the associated queries. If you stop a query while a checkpoint operation is in progress, the checkpoint is canceled.

TOP

Step 4. Replaying Events in the Input Adapter

To support the replay of events as part of recovery, the input adapter factory must implement either the IHighWaterMarkInputAdapterFactory or the IHighWaterMarkTypedInputAdapterFactory interface. Then the call to the Create method of the adapter factory supplies the high-water mark that helps the adapter to identify the events to replay.

To ensure output that is complete, all input adapters must replay all the events in the physical stream that occur at or after the position indicated by the high-water mark.

TOP

Step 5. Eliminating Duplicates in the Output Adapter

To support the elimination of duplicates as part of recovery, the output adapter factory must implement either the IHighWaterMarkOutputAdapterFactory or the IHighWaterMarkTypedOutputAdapterFactory interface. Then the call to the Create method of the adapter factory supplies the high-water mark and the offset value that help the adapter to identify duplicate values. This offset is necessary because the location in the output stream corresponding to the checkpoint may fall at any point in the stream.

When the query is started for the first time, then the adapter factory's Create method is called without the high-water mark and offset. If the server has not yet captured any checkpoints for the query, then the adapter factory's Create method is called with a high-water mark of DateTime.MinValue and an offset of 0 (zero).

If a query is properly replayed, then any events that were produced after the last checkpoint was captured but before the outage will be produced again upon restart. These are the duplicates that the output adapter must remove. How these are removed is up to the output adapter: the original copies can be abandoned, or the duplicate copies can be ignored.

To ensure output that is equivalent, all input adapters must properly replay input events, and all output adapters must remove all the duplicate events in the physical stream that occurred before the outage and that occur at or after the position indicated by the high-water mark offset.

TOP

Step 6. Recovering from failure

The server automatically performs recovery on startup and brings all queries into a consistent state. This is an asynchronous operation; as a result, the call to Server.Create returns before the recovery has finished.

  • Non-resilient queries are put into the Stopped state. This behavior has not changed.

  • Resilient queries are put into the Initializing state. Then the server loads the saved checkpoint information.

You can call Start at this point to restart queries. Resilient queries will be restarted as soon as the initialization is complete.

The startup code must perform the following steps to recover from failure:

  1. Retrieve the list of the application's queries from the metadata.

  2. For each query, check to see whether the query already exists in the metadata.

    1. If the query already exists, restart it.

    2. If the query does not exist in the metadata, create it and define it as resilient, as described above under Step 2. Defining a Resilient Query.

If a problem occurs during recovery itself, you can restart the server without resiliency.

TOP

Shutting down without disabling recovery

You can shut down the server without disabling recovery by calling the Dispose method of the Server.

  • Non-resilient queries are stopped.

  • Resilient queries are suspended. When you restart the server, the server tries to recover the state of the suspended queries. To prevent this behavior, stop the queries before shutting down.

The metadata for both non-resilient and resilient queries is preserved when you shut down the server in this way.

TOP

Examples

For an end-to-end code sample of a resilient application that includes replay and deduplication, see the Checkpointing Sample on the StreamInsight Samples page on Codeplex.

TOP

Defining a resilient query with the explicit development model

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Checkpointing - the Callback Rendezvous Model

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

See Also

Concepts

StreamInsight Resiliency

Building Resilient StreamInsight Applications

Monitoring Resilient StreamInsight Applications

Troubleshooting Resilient StreamInsight Applications