回復性 StreamInsight アプリケーションの作成

このトピックでは、回復性のある StreamInsight アプリケーションを作成する手順について説明します。

回復性は、StreamInsight の Premium Edition でのみ使用できます。詳細については、「StreamInsight エディションの選択」を参照してください。

再生と重複除外を含む回復性アプリケーションのエンド ツー エンド サンプルについては、Codeplex の StreamInsight サンプル ページのチェックポイント サンプルを参照してください。

このトピックの内容

  1. 手順 1. 回復性サーバーの構成

  2. 手順 2. 回復性クエリの定義

  3. 手順 3. チェックポイントのキャプチャ

  4. 手順 4. 入力アダプターでのイベントの再生

  5. 手順 5. 出力アダプターでの重複の削除

  6. 手順 6. 障害からの復旧

  7. 復旧の無効化を伴わないシャットダウン

  8. 使用例

手順 1. 回復性サーバーの構成

必要な設定

回復性サーバーを構成するには、サーバーの作成時に次の構成設定の値を指定します。

  • メタデータ ストア。サーバーのメタデータを格納するには SQL Server Compact を使用する必要があります。メタデータはメモリ内には格納できません。

  • ログ パス。この設定により、回復性クエリのチェックポイント データの格納場所が決定します。パスの既定値は、StreamInsight プロセスの作業ディレクトリです。関連する設定 CreateLogPathIfMissing により、指定されたディレクトリが存在しない場合にそれを作成するかどうかが決定します。

サーバーで回復性を構成すると、チェックポイントをキャプチャできるようになりますが、チェックポイントがキャプチャされる原因にはなりません。チェックポイントの呼び出しの詳細については、「手順 3. チェックポイントのキャプチャ」を参照してください。

チェックポイント ログ パスの管理

  • チェックポイント ファイルの無許可の読み取りや改ざんを防ぐため、それらのファイルを格納しているフォルダーに対する権限が、信頼できるエンティティのみがアクセスできるように設定されていることを確認します。

  • 各 StreamInsight インスタンスには、独自のログ パスが必要です。

  • StreamInsight をホストするプロセスが、指定のフォルダーに対する書き込みおよび読み取りアクセス権を持っていることを確認してください。

  • フォルダーの内容は編集しないでください。StreamInsight では、不要になったチェックポイント ファイルは削除されます。

アウトプロセス サーバー

クライアントが Server.Connect を呼び出して接続するアウトプロセス サーバーの場合、回復性の構成は、サーバーを準備するユーザーが行います。アウトプロセス サーバーで回復性が構成されている場合、クライアントは構成どおりそれを使用できます。サーバーで回復性が構成されていない場合は、クライアントは回復性機能を使用できません。

回復性オプションの指定方法

回復性設定を指定するには、次のいずれかの方法を使用します。

  • Server.Create を呼び出すときに回復性の構成を指定することで、プログラムによって設定を指定します。

  • アプリケーション構成ファイルで宣言することによって設定を指定します。

    • この構成ファイルは、インプロセス サーバーでは app.config ファイルになります。

    • アウトプロセス サーバーでは StreamInsightHost.exe.config ファイルになります。この構成ファイルは StreamInsight インストール フォルダーの Host フォルダーにあります。

これら両方の方法を使用する場合、API 呼び出しで指定する設定が構成ファイルの設定をオーバーライドします。

プログラムを使用した回復性サーバーの作成

次の例は、プログラムによって回復性インプロセス サーバーを作成する方法を示しています。さらに詳しい例については、「例」を参照してください。Server.Create を呼び出したときにチェックポイントで障害が発生する原因となる例外をすべて検出するようにしてください。

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

宣言による回復性サーバーの作成

次の例は、構成ファイルを使用して宣言により回復性サーバーを作成する方法を示しています。

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

トップに戻る

手順 2. 回復性クエリの定義

回復性クエリを作成するには、コードに次の手順を含めます。

  1. 新しいクエリを作成する前に、メタデータにクエリが既に存在するかどうかを確認します。クエリが既に存在する場合は、アプリケーションが障害から復旧したことを意味します。コードによってクエリを作成し直す代わりに、クエリを再開する必要があります。

  2. メタデータにクエリが存在しない場合は、それを作成し、ToQuery メソッドの IsResilient パラメーターに true を指定することにより、回復性クエリとして定義します。IsResilient パラメーターを設定した Application.CreateQuery メソッドを呼び出すこともできます。

クエリで回復性を構成すると、チェックポイントをキャプチャできるようになりますが、チェックポイントがキャプチャされる原因にはなりません。チェックポイントの呼び出しの詳細については、「手順 3. チェックポイントのキャプチャ」を参照してください。

回復性クエリの定義の例

さらに詳しい例については、「例」を参照してください。

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

トップに戻る

手順 3. チェックポイントのキャプチャ

クエリが実行状態になったら、チェックポイントを定期的にキャプチャし、クエリの状態を記録します。

チェックポイントをサポートする API メソッドは、非同期操作の典型的なパターンに従っています。

  1. チェックポイントを呼び出すには、BeginCheckpoint メソッドを呼び出します。オプションの AsyncCallback を指定すると、チェックポイントが完了したときにそれが呼び出されます。BeginCheckpoint の呼び出しで返された IAsyncResult は、このチェックポイント要求を識別し、後で EndCheckpoint または CancelCheckpoint の呼び出しで使用できます。

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. EndCheckpoint メソッドは、チェックポイント操作が完了するまで、ブロックします。チェックポイント操作が成功すると、呼び出しにより true が返されます。エラーが発生すると、例外が発生します。

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. また、CancelCheckpoint を呼び出してチェックポイント処理をキャンセルすることもできます。CancelCheckpoint の呼び出しが成功した場合、その後の EndCheckpoint の呼び出しでは、false が返されます。

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

この非同期パターンは、次の 3 とおりの方法で使用できます。

  • BeginCheckpoint の呼び出しの後に EndCheckpoint を呼び出すことができます。その後、EndCheckpoint は、チェックポイント操作が完了するまでブロックし、結果 (または例外) を返します。このパターンでは、通常 asyncCallback と asyncState は使用されません。

  • BeginCheckpoint を呼び出すと、ユーザーは返された IAsyncResult の IsCompleted プロパティをポーリングできます。IsCompleted が true の場合、EndCheckpoint を呼び出して結果を取得することができます。このパターンでは、通常 asyncCallback と asyncState は使用されません。

  • BeginCheckpoint は、コールバック メソッドと共に呼び出すことができます。この場合、asyncState を使用して呼び出しを識別し、コールバック メソッドに必要な情報をすべて返すことができます。コールバックが実行されると、EndCheckpoint が呼び出され、結果が取得されます。

EndCheckpoint メソッドは、使用したパターンがどれかにかかわらず、チェックポイントがキャンセルされた場合でも、呼び出す必要があります。このメソッドは、呼び出しからユーザーが戻り値を取得する唯一の方法であると共に、StreamInsight が呼び出しの完了を認識するための唯一の方法でもあります。EndCheckpoint を呼び出すまでは次のチェックポイントを開始できません。

チェックポイント処理で発生するエラーにより、関連するクエリが停止することも影響を受けることもありません。チェックポイント操作の進行中にクエリを停止すると、チェックポイントはキャンセルされます。

トップに戻る

手順 4. 入力アダプターでのイベントの再生

復旧の一環としてイベントの再生をサポートするには、入力アダプター ファクトリが IHighWaterMarkInputAdapterFactory インターフェイスまたは IHighWaterMarkTypedInputAdapterFactory インターフェイスを実装する必要があります。その後、アダプター ファクトリの Create メソッドの呼び出しにより、再生するイベントをアダプターが識別するための高ウォーターマークが提供されます。

完全な出力を確保するには、高ウォーター マークが示す時点の後で発生した物理ストリーム内のすべてのイベントがすべての入力アダプターで再生される必要があります。

トップに戻る

手順 5. 出力アダプターでの重複の削除

復旧の一環として重複の削除をサポートするには、出力アダプター ファクトリが IHighWaterMarkOutputAdapterFactory インターフェイスまたは IHighWaterMarkTypedOutputAdapterFactory インターフェイスを実装する必要があります。その後、アダプター ファクトリの Create メソッドの呼び出しにより、重複値をアダプターが識別するための高ウォーターマークとオフセット値が提供されます。チェックポイントに対応する出力ストリームがストリームの任意のポイントにある可能性があるため、このオフセットは必須です。

クエリが初めて開始されると、アダプター ファクトリの Create メソッドが高ウォーターマークとオフセットなしで呼び出されます。サーバーがクエリのチェックポイントを一度もキャプチャしていない場合は、アダプター ファクトリの Create メソッドが DateTime.MinValue の高ウォーターマークと 0 (ゼロ) のオフセットで呼び出されます。

クエリが適切に再生されると、最後のチェックポイントがキャプチャされた後、そして停止の前に生成されたイベントは、再起動時に再び生成されます。これらは重複しているので、出力アダプターで削除される必要があります。これらの削除方法は、出力アダプターによって異なります。元のコピーが破棄されたり、重複コピーが無視されることがあります。

同等の出力を確保するには、すべての入力アダプターで入力イベントが正しく再生され、物理ストリーム内にある停止前に発生した重複イベント、および高ウォーター マーク オフセットで指定された時点の後で発生した重複イベントがすべての出力アダプターで削除される必要があります。

トップに戻る

手順 6. 障害からの復旧

サーバーでは、起動時に自動的に復旧が実行され、すべてのクエリが一貫した状態になります。これは非同期操作であり、その結果 Server.Create の呼び出しは復旧が完了する前に返されます。

  • 非回復性クエリは停止状態になります。この動作は変更されません。

  • 回復性クエリは初期化中状態になります。その後、サーバーは保存されたチェックポイント情報を読み込みます。

この時点で Start を呼び出し、クエリを再開できます。回復性クエリは、初期化が完了すると同時に再開されます。

障害から復旧するには、スタートアップ コードは、以下の手順を実行する必要があります。

  1. メタデータから一連のアプリケーション クエリを取得します。

  2. クエリごとに、メタデータにそのクエリが既に存在するかどうかを確認します。

    1. クエリが既に存在する場合はそれを再開します。

    2. メタデータにクエリが存在しない場合は、それを作成し、前の「手順 2. 回復性クエリの定義」の説明に従って、回復性クエリとして定義します。

復旧の最中に問題が発生した場合は、回復性なしでサーバーを再起動できます。

トップに戻る

復旧の無効化を伴わないシャットダウン

Server の Dispose メソッドを呼び出すことで、復旧を無効にせずにサーバーをシャットダウンできます。

  • 非回復性クエリは停止状態になります。

  • 回復性クエリは中断状態になります。サーバーを再起動すると、中断したクエリの状態の復旧が試みられます。この動作を防ぐには、シャットダウンの前にクエリを停止します。

この方法でサーバーをシャットダウンすると、非回復性クエリと回復性クエリの両方のメタデータが保存されます。

トップに戻る

使用例

再生と重複除外を含む回復性アプリケーションのエンド ツー エンド サンプルについては、Codeplex の StreamInsight サンプル ページのチェックポイント サンプルを参照してください。

トップに戻る

明示的な開発モデルでの回復性クエリの定義

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

チェックポイント - コールバック ランデブー モデル

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

関連項目

概念

StreamInsight の回復性

回復性 StreamInsight アプリケーションの作成

回復性 StreamInsight アプリケーションの監視

回復性 StreamInsight アプリケーションのトラブルシューティング