创建输入和输出适配器
本主题提供使用 StreamInsight 平台为复杂事件处理 (CEP) 应用程序创建输入适配器和输出适配器所需的一般信息。适配器是将事件传递到 StreamInsight 服务器或从中传出的软件转换器。
了解事件流和控制
创建适配器时,需要了解事件如何通过 StreamInsight 服务器流动以及输入适配器和输出适配器如何控制此事件流,这一点很重要。如下图所示,从源到固定查询再到接收器的事件流是单向的。输入适配器从源读取事件,然后将其传递到查询。输入事件或从处理输入事件产生的新事件从查询中的一个运算符推送到下一个运算符。查询将已处理的事件传递到输出适配器,输出适配器再将事件传递到接收器。下图描绘了这样一种应用场景:某个 StreamInsight 查询绑定到 a1 和 a2 这两个输入适配器实例以及输出适配器实例 a4。
虽然从源到接收器的事件流是单向的,但在组件之间的某些交互点处针对事件检索和传输的流和执行控制却可以是双向的。这些交互点在图中显示为 READ、ENQUEUE、DEQUEUE 和 WRITE。
您的输入适配器实现应该使用特定于源设备(如文件或数据库)的访问机制执行 READ 操作,并且使用适配器 API 执行 ENQUEUE 操作。同样,输出适配器实现应该使用特定于接收器设备的访问机制执行 WRITE 操作,并且使用适配器 API 执行 DEQUEUE 操作。您必须根据适配器状态转换图(在本主题下文中说明)所指定的设计模式来实现 ENQUEUE 和 DEQUEUE 操作。
从事件流控制的角度来看,您可以想像事件从提供者“推送”到使用者(以从左到右的宽箭头表示),或想像由使用者向提供者“请求”事件(以钩状箭头表示)。在 READ 和 WRITE 交互点处,您的适配器实现可以采用推送或请求方法来控制事件流。对于这种交互,需要考虑以下几个因素:源或接收器能够处理的事件率、适配器对源或接收器的限制能力,以及您可以实现的任何缓冲功能。
对于以非常短的滞后时间推送出事件且难以限制的源设备,典型的做法是实现一个适配器,源设备可以将事件推送到该适配器中。这类设备的示例包括:传感器(机器驱动的事件)、股票行情显示系统和网络端口。对于滞后时间较长的设备(文件、数据库),请考虑由适配器向源请求数据的实现方案。同样,在输出端,对于能够以非常高的吞吐速度接受事件的设备,可以实现一个输出适配器,用该适配器将事件推送到设备中。对于吞吐率较低的输出设备,则可以采用这样的方式:每当该设备可以使用事件时就轮询适配器。
在 ENQUEUE 交互点处,StreamInsight 服务器支持推送模型。这意味着该适配器设计模式允许在任意时点让尽量多的事件(只要引擎可以使用)排入队列。在 DEQUEUE 交互点处,StreamInsight 服务器支持请求模型。这意味着该适配器设计模式期望以“引擎能够提供的最快速度”向服务器请求事件。
有鉴于此,StreamInsight 服务器的限制策略可以说是显而易见的。假定有一个没有任何阻塞操作的简单传递查询,StreamInsight 服务器可以从 ENQUEUE 交互点处的输入适配器使用事件的速率仅受到输出适配器可以从 DEQUEUE 交互点处的服务器使用事件的速率限制。StreamInsight 服务器在 ENQUEUE 期间向输入适配器推送回事件的程度取决于查询能够多快地释放输出,以及输出适配器能够多快地使用此输出。StreamInsight 提供了一整套全面的诊断视图,可帮助您衡量在上述每个交互点处的事件率。有关详细信息,请参阅监视 StreamInsight 服务器和查询。
适配器开发任务
使用以下核对清单开发适配器。
确定所需的适配器类型(输入或输出)。
输入适配器以提供传入事件所用的格式读取这些事件,并将此数据转换为 StreamInsight 服务器可以使用的格式。
输出适配器接收 StreamInsight 服务器处理过的事件,将事件转换为输出设备所需的格式,并将数据发送给该设备。
确定事件类型。
对于输入适配器,定义描述源提供的事件负载的事件类型。对于输出适配器,指定描述接收器使用的事件负载的事件类型。有关事件负载的详细信息,请参阅 StreamInsight 服务器概念。
您需要为始终生成或使用固定负载格式(事先了解字段数目和字段类型)的事件的源或接收器指定和生成类型化适配器。类型化适配器的主要优点是:创建要排入 StreamInsight 服务器队列中的事件的实现方案相对简单。由于字段类型是已知的,因此您可以使用 Visual Studio 的 IntelliSense(或其他集成开发环境中的等效功能)来填充这些字段。
如果源或接收器生成或使用不同的负载格式,则需指定和生成非类型化的适配器。非类型化适配器的主要优点是:在进行查询绑定时提供灵活的方式来指定事件类型,而不是将适配器实现绑定到特定的事件类型。与类型化适配器相比,非类型化适配器的实现更为常用。必须按以下方式编写非类型化输入适配器:可以根据查询绑定期间提供的配置参数确定每个字段的类型,一次填充一个字段,然后对事件进行排队。同样,非类型化输出适配器必须能够基于输出时提供的配置信息,从取消排队的事件中检索查询处理的结果。
特别需要注意的是,绑定到查询的适配器实例(无论是类型化还是非类型化的)总是发出包含一个特定类型的负载的事件。有关详细信息,请参阅创建事件类型。
确定事件模型。
确定输入和输出事件的事件模型。StreamInsight 支持三种事件模型:间隔、点和边缘。如果源提供固定事件模型的事件,则可以单为该事件模型设计输入适配器。同样,如果接收器需要特定模型的事件,则可以单为该事件模型设计输出适配器。不过,多数应用程序对于特定事件类型都可能需要所有事件模型。建议为每一事件模型生成类型化或非类型化的适配器。有关事件模型的详细信息,请参阅 StreamInsight 服务器概念。
输入和输出 AdapterFactory 类支持您将这些适配器打包在一起。在基于配置参数进行查询绑定时,可以实例化正确的适配器。
选择相应的适配器基类。
基于事件类型和事件模型,选择相应的适配器基类。类命名法遵循模式 [类型化][点 | 间隔 | 边缘][输入 | 输出]。非类型化适配器不具有类型化的前缀。
适配器类型
输入适配器基类
输出适配器基类
类型化点
TypedPointInputAdapter
TypedPointOutputAdapter
非类型化点
PointInputAdapter
PointOutputAdapter
类型化间隔
TypedIntervalInputAdapter
TypedIntervalOutputAdapter
非类型化间隔
IntervalInputAdapter
IntervalOutputAdapter
类型化边缘
TypedEdgeInputAdapter
TypedEdgeOutputAdapter
非类型化边缘
EdgeInputAdapter
EdgeOutputAdapter
有关详细信息,请参阅 Microsoft.ComplexEventProcessing.Adapters。
设计输入和输出 AdapterFactory 类。
AdapterFactory 是一个适配器容器类。必须实现一个工厂类。基工厂类按如下所示组织。
适配器类型
输入适配器基类
输出适配器基类
类型化
ITypedInputAdapterFactory
ITypedOutputAdapterFactory
非类型化
IInputAdapterFactory
IOutputAdapterFactory
支持弹性的类型化
IHighWaterMarkTypedInputAdapterFactory
IHighWaterMarkTypedOutputAdapterFactory
支持弹性的非类型化
IHighWaterMarkInputAdapterFactory
IHighWaterMarkOutputAdapterFactory
工厂类具有以下作用:
针对给定的设备类(CSV 文件、SQL Server 数据库、Web 服务器 Common Log Format(常用日志格式))或应用程序要求,在不同适配器实现之间支持资源共享,并帮助将配置参数传递到适配器构造函数。例如,某个应用程序可能要求使用全部三个事件模型(点、间隔和边缘)。一个工厂可以支持三个适配器实现,每个事件模型对应一个实现。又如,应用程序可能具有相同的事件源(如数据库表),但是基于执行的查询,源会从相同源中生成多个事件负载结构。在这种情况下,一个工厂可以支持多个适配器实现来处理每个负载结构。
它提供了从适配器到服务器运行时的网关。适配器开发人员必须在适配器类的适配器工厂中实现 Create() 和 Dispose() 方法。在查询启动和关闭期间,服务器会调用这些方法。
它为适配器访问运行时之前的配置信息提供了网关。这对非类型化适配器尤其重要,这些适配器必须从在查询绑定期间提供的配置参数中确定结构中每个字段的类型。可以在工厂类中定义配置结构,并将此配置结构通过 Create() 方法传递到适配器类的构造函数方法中。此配置结构使用 DataContractSerialization 进行序列化。除了这一点限制,对于如何在适配器构造函数中填充和使用此配置结构,开发方法为您提供了充分灵活的定义和使用方式。
它提供了一种生成当前时间增量 (CTI) 的方法,该方法不要求通过输入适配器对 CTI 进行显式排队。通过在适配器工厂类中实现 ITypedDeclareAdvanceTimePolicy(对于类型化适配器工厂),和 IDeclareAdvanceTimePolicy(对于非类型化适配器工厂)接口,用户可以指定 CTI 频率和时间戳。这简化了适配器代码,并且可以影响工厂通过其适配器实例生成的每个事件流。有关详细信息,请参阅 [AdvanceTimeSettingsClass]。
在弹性应用程序中,通过对输入适配器提供高水印以便用于缺失事件的重播,以及通过对输出适配器提供高水印和偏移值以便删除重复的事件,对弹性提供支持。有关详细信息,请参阅 StreamInsight 弹性。
生成和测试适配器。
将适配器作为 .NET 程序集编译和生成。针对一个简单的传递查询测试适配器的基本操作,该查询从输入适配器读取事件,并将其输出到输出适配器,而无需任何复杂查询处理。这将验证适配器能否在设备中执行读写操作,以及能否对事件进行排队和取消排队。
适配器的状态机
无论是输入适配器还是输出适配器,定义适配器与 StreamInsight 服务器之间的交互的状态机都是相同的。这一点很重要,因为状态机为您提供了前后一致的开发模型。下图演示了状态机。
此状态机的主要功能和工作要求如下:
Start() 和 Resume() 是由 StreamInsight 服务器调用的方法,您必须以适配器开发人员的身份实现这两种方法。此外,还必须实现适配器类的构造函数方法和 Dispose() 方法(从基类继承)。
相应地,您的适配器实现必须调用由适配器 SDK 提供的以下方法:
Enqueue(),用于输入适配器。这将返回值 EnqueueOperationResult.Success 或 EnqueueOperationResult.Full。
Dequeue(),用于输出适配器。这将返回值 DequeueOperationResult.Success 或 DequeueOperationResult.Empty。
Ready().这将返回布尔值 TRUE 或 FALSE。
Stopped().这将返回布尔值 TRUE 或 FALSE。
在管理员或查询开发人员通过 StreamInsight 服务器 API 中的方法停止执行查询时,该服务器将代表用户异步调用内部方法(表示为 StopQuery())。
当适配器处于以下状态之一时,Enqueue() 和 Dequeue() 分别返回状态 Full 和 Empty:
已挂起
正在停止
调用 Enqueue() 和 Dequeue() 时,如果适配器处于下列状态之一,则会引发异常:
已创建
已停止
调用 Ready() 时,如果适配器处于下列状态之一,则会引发异常:
已创建
正在运行
已停止
适配器在操作期间,将在其全部五个状态(已创建、正在运行、已挂起、正在停止和已停止)或部分状态间转换。状态转换发生在 StreamInsight 服务器调用 Start() 或 Resume() 之前,适配器调用 Enqueue()、Dequeue()、Ready() 和 Stopped() 之后。
StreamInsight 服务器和适配器从不共享同一线程。服务器总是在单独的工作线程上调用 Start() 或 Resume()。服务器以适配器的名义从操作系统线程池获取此线程。这表示 Start() 和 Resume() 方法完全能够按需灵活地使用工作线程(例如,用于衍生更多线程进行异步读/写)。故此,在通过此线程使用系统资源时,必须格外小心并采用最佳做法。
该 API 不需要在 Start() 和 Resume() 操作(线程)之间保持内在同步。服务器总在(且只在)适配器调用 Ready() 之后才调用 Resume()。但请注意,面向设备的读、写或缓冲事件任务(尤其在异步 I/O 情况下)可能需要进行同步。我们建议最好使用非阻塞 I/O。
如果适配器可以空闲,则应定期检查状态,以确定是否有停止要求。
适配器与服务器交互的生命周期
StreamInsight 服务器与适配器之间的握手过程始终是同步的。所以在服务器的执行过程中,适配器可随时检查其状态并采取相应的操作。适配器与 StreamInsight 服务器交互的生命周期包括下列操作,这些操作对应于上面的上图所演示的状态机。
已创建
启动查询后(通过在 StreamInsight 服务器 API 中发出相应的调用),某个适配器实例开始与 StreamInsight 服务器交互。
正在运行
服务器将适配器置于“正在运行”状态,然后对该适配器异步调用 Start(),并保证只调用这一次。当适配器处于“正在运行”状态时,该适配器可以将事件排入服务器队列或将事件从服务器队列中移出。
理想情况下,适配器大多时间都处于“正在运行”状态。建议的设计模式是:最好在单独的线程中从 Start() 方法调用读/写例程,然后从 Start() 例程返回,这样可快速释放工作线程。
读例程(例如,假定该例程名为 ProduceEvents())从源中读取事件,并调用 Enqueue() 将事件推送到服务器中。对于输出适配器,写例程(例如,假定该例程名为 ConsumeEvents())调用 Dequeue(),以向服务器请求事件并将这些事件写入接收器。
已挂起
当服务器无法接收入队事件或无法输出事件以移出队列时,输入或输出适配器将被置于“已挂起”状态。这将导致针对 Enqueue() 和 Dequeue() 的调用分别返回 FULL 和 EMPTY 状态。在“已挂起”状态中,可以执行日常管理操作,如保存从数据库中读取的最后一条记录的位置,或从文件中读取的最后一行的位置。在此可选部分的最后,必须调用 Ready() 方法以向服务器传达适配器已准备就绪,可以继续操作。如果该例程在与 Start() 本身相同的工作线程上执行,则必须自 Start() 例程本身返回。
为响应 Ready() 调用,服务器将适配器返回到“正在运行”状态,并始终在不同的工作线程上异步调用 Resume()。可以设计 Resume() 将最后一个失败的迭代排入队列或移出队列,然后调用 ProduceEvents() 或 ConsumeEvents()。此模式可持续操作,直到适配器转变为“已停止”或“正在停止”状态。
正在停止
处于“正在运行”或“已挂起”状态时,服务器可随时将适配器置于“正在停止”状态,以响应停止查询的异步请求。在此状态下,调用 Enqueue() 或 Dequeue() 也分别返回状态 FULL 或 EMPTY。
“正在停止”状态为适配器实现提供了一个临时区域,使其为停止做好适当准备。您可以执行适配器以释放其所获得的所有资源(线程、内存),然后调用 Stopped() 方法。只有调用此方法后,服务器才会停止适配器。
请注意,可通过异步方式使适配器转换为“正在停止”状态。适配器需要通过某种方法来检测其是否已进入“正在停止”状态。如上所述,该设计模式可供适配器在挂起时调用 Ready()。相应地,服务器再次调用 Resume() 方法,从而支持在 Resume() 方法中检测“正在停止”状态。作为最佳做法,我们建议在您的 Start() 和 Resume() 实现中将“正在停止”状态检查放入第一个代码块。
已停止
适配器代码可以在随时调用 Stopped()。这将使适配器转为“已停止”状态。我们建议您在设计时,最好先清理适配器所获得的资源,然后再调用 Stopped()。
重要提示 调用 Stopped() 方法失败将导致与查询关联的最后一页内存仍处于已分配状态。这可能导致轻微内存泄漏,并且如果一个进程中有很多查询启停循环,这些轻微的泄漏会随着时间推移而逐渐累积。
在“已停止”状态下,适配器无法引用任何 StreamInsight 服务器特定的构造或事件内存,也无法执行排队或取消排队操作。这些操作会引发异常。但是,操作系统和面向设备的清理操作可以继续。
示例
有关各种输入适配器和输出适配器以及适配器工厂的示例,请参阅 StreamInsight Samples(StreamInsight 示例)中提供的示例。