数据流任务

数据流任务封装数据流引擎,该引擎在源和目标之间移动数据,使用户可以在移动数据时转换、清除和修改数据。将数据流任务添加到包控制流使得包可以提取、转换和加载数据。

数据流至少由一个数据流组件组成,但通常由一组已连接的数据流组件组成:提取数据的源;修改、路由或汇总数据的转换;加载数据的目标。组件在数据流中由路径连接。每个路径都指定分别作为路径起点和终点的两个组件。有关详细信息,请参阅数据流元素

运行时,数据流任务从数据流生成执行计划,而数据流引擎执行该计划。可以创建无数据流的数据流任务,但任务只有在其包含至少一个数据流时才执行。

以下关系图显示含有一个数据流的数据流任务。

数据流

数据流任务可以包含多个数据流。如果任务复制多组数据,而且复制数据的顺序并不重要,则可以更方便地在数据流任务中包含多个数据流。例如,可以创建五个数据流,每个数据流都将数据从平面文件复制到数据仓库星形架构中的不同维度表中。

但是,如果一个数据流任务中存在多个数据流,则由数据流引擎决定执行顺序。因此,当顺序很重要时包应该使用多个数据流任务,每个任务包含一个数据流。然后可以应用优先约束来控制任务的执行顺序。

以下关系图显示具有多个数据流的数据流任务。

数据流

包可以包含多个数据流任务,复杂的包经常如此。例如,如果包需要按指定顺序运行数据流,或需要在数据流之间执行其他任务,则必须对每个数据流使用单独的数据流任务。

数据流任务也管理错误流。在运行时,行级错误可能在数据流组件转换数据、执行查找或计算表达式时发生。例如,值为字符串的数据列不能转换为整数,或尝试除以零的表达式。两种操作都导致错误,而包含错误的行可以用错误流分别进行处理。有关在包数据流中如何使用错误流的详细信息,请参阅处理数据流中的错误

若要从文本文件向 SQL Server 数据库大容量插入数据,可以使用大容量插入任务代替数据流任务和数据流。但是,大容量插入任务无法转换数据。有关详细信息,请参阅大容量插入任务

对数据流元素使用属性表达式

很多数据流组件(源、转换和目标)都支持使用属性表达式来表示它们的某些属性。属性表达式是在加载包时用于替换属性值的表达式。在运行时,包使用更新后的属性值。表达式是使用 Integration Services 表达式语法生成的,并且可以包括 Integration Services 函数、运算符、标识符和变量。有关详细信息,请参阅Integration Services 表达式参考在包中使用表达式在包中使用属性表达式

如果在 Business Intelligence Development Studio 中构造包,则支持属性表达式的任何数据流组件的属性将显示在它们所属的数据流任务上。若要添加、更改或删除数据流组件的属性表达式,请单击数据流任务,然后使用任务的“属性”窗口或编辑器来添加、更改或删除属性表达式。数据流任务自身的属性表达式也在“属性”窗口中进行管理。

如果数据流包含任何使用表达式的组件,则表达式也显示在“属性”窗口中。若要查看表达式,请选择该组件所属的数据流任务。可以按类别或者字母顺序查看属性。如果在“属性”窗口中使用“按分类顺序”视图,任何没有用于特定属性的表达式将显示在**“杂项”**类别中。如果使用字母顺序视图,则按数据流组件的名称顺序列出表达式。

日志项

Integration Services 提供了可用于所有任务的一组日志事件。Integration Services 还提供了可用于多个任务的自定义日志条目。有关详细信息,请参阅在包中实现日志记录日志记录的自定义消息。数据流任务包括下列自定义日志项:

日志项

说明

BufferSizeTuning

指示数据流任务更改了缓冲区的大小。日志条目描述了大小更改的原因,并列出了临时的新缓冲区大小。

OnPipelinePostEndOfRowset

表示组件已经给出它的行集结束信号,该信号由对 ProcessInput 方法的最后调用设置。对于数据流中处理输入的每个组件,都会写入一项。该项包括组件的名称。

OnPipelinePostPrimeOutput

指示组件已经完成它对 PrimeOutput 方法的最后一次调用。取决于数据流,可能写入多个日志条目。如果组件是源组件,此日志条目表示该组件已经完成对行的处理。

OnPipelinePreEndOfRowset

指示组件将要接收它的行集结束信号,该信号由对 ProcessInput 方法的最后一次调用设置。对于数据流中处理输入的每个组件,都会写入一项。该项包括组件的名称。

OnPipelinePrePrimeOutput

指示组件将从 PrimeOutput 方法接收它的调用。取决于数据流,可能写入多个日志条目。

OnPipelineRowsSent

报告对 ProcessInput 方法的调用为组件输入所提供的行数。此日志条目包括组件名。

PipelineBufferLeak

提供在缓冲区管理器退出之后使缓冲区保持活动状态的任何组件的相关信息。如果缓冲区仍保持活动状态,则没有释放缓冲区资源并且可能导致内存泄漏。日志条目提供组件的名称和缓冲区的 ID。

PipelineComponentTime

报告组件在其五个主要处理步骤(Validate、PreExecute、PostExecute、ProcessInput 和 ProcessOutput)中的每个步骤所用的时间长短(毫秒)。

PipelineExecutionPlan

报告数据流的执行计划。执行计划提供有关缓冲区将如何发送到组件的信息。此信息与 PipelineExecutionTrees 日志条目组合,一起描述在数据流任务中所发生的事情。

PipelineExecutionTrees

报告数据流中的布局的执行树。数据流引擎的计划程序使用这些树生成数据流的执行计划。

PipelineInitialization

提供有关任务的初始化信息。此信息包括要用来临时存储 BLOB 数据、默认缓冲区大小和缓冲区行数的目录。取决于数据流任务的配置,可能写入多个日志条目。

每次运行包时,这些日志条目都提供有关数据流任务执行的丰富信息。重复运行包时,可以捕获信息,这些信息随着时间的变化提供有关任务所执行的处理、可能影响性能的问题以及任务处理的数据量的重要历史信息。

有关如何使用这些日志条目监视和改进数据流的性能的详细信息,请参阅下列主题之一:

来自数据流任务的示例消息

下表列出了非常简单的包的日志条目示例消息。该包使用 OLE DB 源来从表中提取数据,使用排序转换来排序数据,并使用 OLE DB 目标将数据写入其他表。

日志条目

消息

BufferSizeTuning

缓冲区类型 0 中的行会使缓冲区大小大于配置的最大值。在此类型的缓冲区中将只有 9637 行。

缓冲区类型 2 中的行会使缓冲区大小大于配置的最大值。在此类型的缓冲区中将只有 9497 行。

缓冲区类型 3 中的行会使缓冲区大小大于配置的最大值。在此类型的缓冲区中将只有 9497 行。

OnPipelinePostEndOfRowset

将向某个组件发送行集结束信号。: 1180 : 排序 : 1181 : 对输入进行排序

将向某个组件发送行集结束信号。: 1291 : OLE DB 目标 : 1304 : OLE DB 目标输入

OnPipelinePostPrimeOutput

某个组件已经从其 PrimeOutput 调用返回。: 1180 : 排序

某个组件已经从其 PrimeOutput 调用返回。: 1 : OLE DB 源

OnPipelinePreEndOfRowset

某个组件已经处理完它的所有行。: 1180 : 排序 : 1181 : 对输入进行排序

某个组件已经处理完它的所有行。: 1291 : OLE DB 目标 : 1304 : OLE DB 目标输入

OnPipelinePrePrimeOutput

将对某个组件调用 PrimeOutput。: 1180 : 排序

将对某个组件调用 PrimeOutput。: 1 : OLE DB 源

OnPipelineRowsSent

已经为数据流组件提供了若干行作为输入。: : 1185 : OLE DB 源输出 : 1180 : 排序 : 1181 : 对输入进行排序 : 76

已经为数据流组件提供了若干行作为输入。: : 1308 : 对输出进行排序 : 1291 : OLE DB 目标 : 1304 : OLE DB 目标输入 : 76

PipelineComponentTime

“Calculate LineItemTotalCost”组件 (3522) 在 ProcessInput 中耗时 356 毫秒。

“Sum Quantity and LineItemTotalCost”组件 (3619) 在 ProcessInput 中耗时 79 毫秒。

“Calculate Average Cost”组件 (3662) 在 ProcessInput 中耗时 16 毫秒。

“Sort by ProductID”组件 (3717) 在 ProcessInput 中耗时 125 毫秒。

“Load Data”组件 (3773) 在 ProcessInput 中耗时 0 毫秒。

“Extract Data”组件 (3869) 在“OLE DB Source Output”输出 (3879) 上的 PrimeOutput 填充缓冲区中耗时 688 毫秒。

“Sum Quantity and LineItemTotalCost”组件 (3619) 在“Aggregate Output 1”输出 (3621) 上的 PrimeOutput 填充缓冲区中耗时 141 毫秒。

“Sort by ProductID”组件 (3717) 在“Sort Output”输出 (3719) 上的 PrimeOutput 填充缓冲区中耗时 16 毫秒。

PipelineExecutionPlan

SourceThread0

驱动器:1

影响: 1180 1291

输出工作列表

为输出 ID 11 执行类型 1 的 CreatePrimeBuffer。

SetBufferListener:“WorkThread0”,针对输入 ID 1181

为输出 ID 12 执行类型 3 的 CreatePrimeBuffer。

对组件 "OLE DB Source" (1) 的 CallPrimeOutput

结束输出工作列表

结束 SourceThread0

WorkThread0

驱动器:1180

影响: 1180 1291

输入工作列表,输入 ID 1181 (期望 1 EOR)

针对视图类型 2 对组件 "Sort" (1180) 的输入 ID 1181 执行的 CallProcessInput

结束输入 1181 的输入工作列表

输出工作列表

为输出 ID 1182 执行类型 4 的 CreatePrimeBuffer。

SetBufferListener:“WorkThread1”,针对输入 ID 1304

对组件 "Sort" (1180) 执行的 CallPrimeOutput

结束输出工作列表

结束 WorkThread0

WorkThread1

驱动器:1291

影响:1291

输入工作列表,输入 ID 1304 (期望 1 EOR)

针对视图类型 5 对组件 "OLE DB Destination" (1291) 的输入 ID 1304 执行的 CallProcessInput

结束输入 1304 的输入工作列表

输出工作列表

结束输出工作列表

结束 WorkThread1

PipelineExecutionTrees

开始执行树 0

输出 "OLE DB 源输出" (11)

输入 "对输入进行排序" (1181)

结束执行树 0

开始执行树 1

输出 "OLE DB 源错误输出" (12)

结束执行树 1

开始执行树 2

输出 "对输出进行排序" (1182)

输入 "OLE DB 目标输入" (1304)

输出 "OLE DB 目标错误输出" (1305)

结束执行树 2

PipelineInitialization

未提供临时 BLOB 数据存储位置。缓冲区管理器将考虑 TEMP 和 TMP 环境变量中的目录。

默认缓冲区大小为 10485760 个字节。

默认情况下,缓冲区将具有 10000 行。

由于数据流的 RunInOptimizedMode 属性已设置为 false,因此数据流不会删除未使用的组件。

许多日志事件都写入多个项,并且多个日志条目的消息都包含复杂的数据。为了便于理解和交流复杂消息的内容,可以对消息文本进行分析。根据日志的位置,可以使用 Transact-SQL 语句或脚本组件用列或其他更有用的格式来分离复杂文本。

例如,下表包含消息“已经为数据流组件提供了若干行作为输入。: : 1185 : OLE DB 源输出 : 1180 : 排序 : 1181 : 对输入进行排序 : 76”,并解析为列。该消息由 OnPipelineRowsSent 事件在将行从 OLE DB 源发送到排序转换时写入。

说明

PathID

OLE DB 源和排序转换之间的路径中的 ID 属性值。

1185

PathName

路径的 Name 属性值。

OLE DB 源输出

ComponentID

排序转换的 ID 属性值。

1180

ComponentName

排序转换的 Name 属性值。

排序

InputID

排序转换的输入的 ID 属性值。

1181

InputName

排序转换的输入的 Name 属性值。

对输入进行排序

RowsSent

向排序转换的输入发送的行数。

76

以下源拥有可以使用属性表达式来更新的属性:

有关详细信息,请参阅源自定义属性

转换

目标

以下目标拥有可以使用属性表达式来更新的属性:

有关详细信息,请参阅目标自定义属性

配置数据流任务

可以在**“属性”**窗口中或以编程方式设置属性。

有关如何在**“属性”**窗口中设置这些属性的详细信息,请单击下列主题:

以编程方式配置数据流任务

有关以编程方式向包中添加数据流任务并设置数据流属性的详细信息,请单击下列主题:

Integration Services 图标(小) 使 Integration Services 保持最新

若要从 Microsoft 获得最新的下载内容、文章、示例和视频,以及从社区获得所选解决方案,请访问 MSDN 或 TechNet 上的 Integration Services 页:

若要获得有关这些更新的自动通知,请订阅该页上提供的 RSS 源。