Export (0) Print
Expand All

Developing a Custom Source Component

SQL Server Integration Services gives developers the ability to write source components that can connect to custom data sources and supply data from those sources to other components in a data flow task. The ability to create custom sources is valuable when you must connect to data sources that cannot be accessed by using one of the existing Integration Services sources.

Source components have one or more outputs and zero inputs. At design time, source components are used to create and configure connections, read column metadata from the external data source, and configure the source's output columns based on the external data source. During execution they connect to the external data source and add rows to an output buffer. The data flow task then provides this buffer of data rows to downstream components.

For a sample source component, see the the Integration Services samples on Codeplex. For a general overview of data flow component development, see Developing a Custom Data Flow Component.

Implementing the design-time functionality of a source component involves specifying a connection to an external data source, adding and configuring output columns that reflect the data source, and validating that the component is ready to execute. By definition, a source component has zero inputs and one or more asynchronous outputs.

Creating the Component

Source components connect to external data sources by using ConnectionManager objects defined in a package. They indicate their requirement for a connection manager by adding an element to the RuntimeConnectionCollection collection of the ComponentMetaData property. This collection serves two purposes—to hold references to connection managers in the package used by the component, and to advertise the need for a connection manager to the designer. When an IDTSRuntimeConnection100 has been added to the collection, the Advanced Editor displays the Connection Properties tab, which lets users select or create a connection in the package.

The following code example shows an implementation of ProvideComponentProperties that adds an output, and adds a IDTSRuntimeConnection100 object to the RuntimeConnectionCollection.

using System;
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;

namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
    public class MyComponent : PipelineComponent
    {
        public override void ProvideComponentProperties()
        {
            // Reset the component.
            base.RemoveAllInputsOutputsAndCustomProperties();
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();

            IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
            output.Name = "Output";

            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
            connection.Name = "ADO.NET";
        }

Connecting to an External Data Source

After a connection has been added to the RuntimeConnectionCollection, you override the AcquireConnections method to establish a connection to the external data source. This method is called during both design and execution time. The component should establish a connection to the connection manager specified by the run-time connection, and subsequently, to the external data source.

After the connection is established, it should be cached internally by the component and released when the ReleaseConnections method is called. The ReleaseConnections method is called at design and execution time, like the AcquireConnections method. Developers override this method, and release the connection established by the component during AcquireConnections.

The following code example shows a component that connects to an ADO.NET connection in the AcquireConnections method and closes the connection in the ReleaseConnections method.

private SqlConnection sqlConnection;

public override void AcquireConnections(object transaction)
{
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
    {
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;

        if (cmado == null)
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");

        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
        sqlConnection.Open();
    }
}

public override void ReleaseConnections()
{
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
        sqlConnection.Close();
}

Creating and Configuring Output Columns

The output columns of a source component reflect the columns from the external data source that the component adds to the data flow during execution. At design time, you add output columns after the component has been configured to connect to an external data source. The design-time method that a component uses to add the columns to its output collection can vary based on the needs of the component, although they should not be added during Validate or AcquireConnections. For example, a component that contains a SQL statement in a custom property that controls the data set for the component may add its output columns during the SetComponentProperty method. The component checks to see whether it has a cached connection, and, if it does, connects to the data source and generates its output columns.

After an output column has been created, set its data type properties by calling the SetDataTypeProperties method. This method is necessary because the DataType, Length, Precision, and CodePage properties are read-only and each property is dependent on the settings of the other. This method enforces the need for these values to be set consistently, and the data flow task validates that they are set correctly.

The DataType of the column determines the values that are set for the other properties. The following table shows the requirements on the dependent properties for each DataType. The data types not listed have their dependent properties set to zero.

DataType

Length

Scale

Precision

CodePage

DT_DECIMAL

0

Greater than 0 and less than or equal to 28.

0

0

DT_CY

0

0

0

0

DT_NUMERIC

0

Greater than 0 and less than or equal to 28, and less than Precision.

Greater than or equal to 1 and less than or equal to 38.

0

DT_BYTES

Greater than 0.

0

0

0

DT_STR

Greater than 0 and less than 8000.

0

0

Not 0, and a valid code page.

DT_WSTR

Greater than 0 and less than 4000.

0

0

0

Because the restrictions on the data type properties are based on the data type of the output column, you must choose the correct SSIS data type when you work with managed types. The base class provides three helper methods, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType, and DataRecordTypeToBufferType, to assist managed component developers in selecting an SSIS data type given a managed type. These methods convert managed data types to SSIS data types, and vice versa.

The following code example shows how the output column collection of a component is populated based on the schema of a table. The helper methods of the base class are used to set the data type of the column, and the dependent properties are set based on the data type.

SqlCommand sqlCommand;

private void CreateColumnsFromDataTable()
{
    // Get the output.
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    // Start clean, and remove the columns from both collections.
    output.OutputColumnCollection.RemoveAll();
    output.ExternalMetadataColumnCollection.RemoveAll();

    this.sqlCommand = sqlConnection.CreateCommand();
    this.sqlCommand.CommandType = CommandType.Text;
    this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
    SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
    DataTable dataTable = schemaReader.GetSchemaTable();

    // Walk the columns in the schema, 
    // and for each data column create an output column and an external metadata column.
    foreach (DataRow row in dataTable.Rows)
    {
        IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
        IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();

        // Set column data type properties.
        bool isLong = false;
        DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
        dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
        int length = 0;
        int precision = (short)row["NumericPrecision"];
        int scale = (short)row["NumericScale"];
        int codepage = dataTable.Locale.TextInfo.ANSICodePage;

        switch (dt)
        {
            // The length cannot be zero, and the code page property must contain a valid code page.
            case DataType.DT_STR:
            case DataType.DT_TEXT:
                length = precision;
                precision = 0;
                scale = 0;
                break;

            case DataType.DT_WSTR:
                length = precision;
                codepage = 0;
                scale = 0;
                precision = 0;
                break;

            case DataType.DT_BYTES:
                precision = 0;
                scale = 0;
                codepage = 0;
                break;

            case DataType.DT_NUMERIC:
                length = 0;
                codepage = 0;

                if (precision > 38)
                    precision = 38;

                if (scale > 6)
                    scale = 6;
                break;

            case DataType.DT_DECIMAL:
                length = 0;
                precision = 0;
                codepage = 0;
                break;

            default:
                length = 0;
                precision = 0;
                codepage = 0;
                scale = 0;
                break;

        }

        // Set the properties of the output column.
        outColumn.Name = (string)row["ColumnName"];
        outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
    }
}

Validating the Component

You should validate a source component and verify that the columns defined in its output column collections match the columns at the external data source. Sometimes, verifying the output columns against the external data source can be impossible, such as in a disconnected state, or when it is preferable to avoid lengthy round trips to the server. In these situations, the columns in the output can still be validated by using the ExternalMetadataColumnCollection of the output object. For more information, see Validating a Data Flow Component.

This collection exists on both input and output objects and you can populate it with the columns from the external data source. You can use this collection to validate the output columns when SSIS Designer is offline, when the component is disconnected, or when the ValidateExternalMetadata property is false. The collection should be first populated at the same time that the output columns are created. Adding external metadata columns to the collection is relatively easy because the external metadata column should initially match the output column. The data type properties of the column should have already been set correctly, and the properties can be copied directly to the IDTSExternalMetadataColumn100 object.

The following sample code adds an external metadata column that is based on a newly created output column. It assumes that the output column has already been created.

private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
    
    // Set the properties of the external metadata column.
    IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
    externalColumn.Name = outputColumn.Name;
    externalColumn.Precision = outputColumn.Precision;
    externalColumn.Length = outputColumn.Length;
    externalColumn.DataType = outputColumn.DataType;
    externalColumn.Scale = outputColumn.Scale;

    // Map the external column to the output column.
    outputColumn.ExternalMetadataColumnID = externalColumn.ID;

}

During execution, components add rows to output buffers that are created by the data flow task and provided to the component in PrimeOutput. Called once for source components, the method receives an output buffer for each IDTSOutput100 of the component that is connected to a downstream component.

Locating Columns in the Buffer

The output buffer for a component contains the columns defined by the component and any columns added to the output of a downstream component. For example, if a source component provides three columns in its output, and the next component adds a fourth output column, the output buffer provided for use by the source component contains these four columns.

The order of the columns in a buffer row is not defined by the index of the output column in the output column collection. An output column can only be accurately located in a buffer row by using the FindColumnByLineageID method of the BufferManager. This method locates the column with the specified lineage ID in the specified buffer, and returns its location in the row. The indexes of the output columns are typically located in the PreExecute method, and stored for use during PrimeOutput.

The following code example finds the location of the output columns in the output buffer during a call to PreExecute, and stores them in an internal structure. The name of the column is also stored in the structure and is used in the code example for the PrimeOutput method in the next section of this topic.

ArrayList columnInformation;

private struct ColumnInfo
{
    public int BufferColumnIndex;
    public string ColumnName;
}

public override void PreExecute()
{
    this.columnInformation = new ArrayList();
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
    {
        ColumnInfo ci = new ColumnInfo();
        ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
        ci.ColumnName = col.Name;
        columnInformation.Add(ci);
    }
}

Processing Rows

Rows are added to the output buffer by calling the AddRow method, which creates a new buffer row with empty values in its columns. The component then assigns values to the individual columns. The output buffers provided to a component are created and monitored by the data flow task. As they become full, the rows in the buffer are moved to the next component. There is no way to determine when a batch of rows has been sent to the next component because the movement of rows by the data flow task is transparent to the component developer, and the RowCount property is always zero on output buffers. When a source component is finished adding rows to its output buffer, it notifies the data flow task by calling the SetEndOfRowset method of the PipelineBuffer, and the remaining rows in the buffer are passed to the next component.

While the source component reads rows from the external data source, you may want to update the "Rows read" or "BLOB bytes read" performance counters by calling the IncrementPipelinePerfCounter method. For more information, see Monitoring the Performance of the Data Flow Engine.

The following code example shows a component that adds rows to an output buffer in PrimeOutput. The indexes of the output columns in the buffer were located using PreExecute in the previous code example.

public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
    PipelineBuffer buffer = buffers[0];

    SqlDataReader dataReader = sqlCommand.ExecuteReader();

    // Loop over the rows in the DataReader, 
    // and add them to the output buffer.
    while (dataReader.Read())
    {
        // Add a row to the output buffer.
        buffer.AddRow();

        for (int x = 0; x < columnInformation.Count; x++)
        {
            ColumnInfo ci = (ColumnInfo)columnInformation[x];
            int ordinal = dataReader.GetOrdinal(ci.ColumnName);

            if (dataReader.IsDBNull(ordinal))
                buffer.SetNull(ci.BufferColumnIndex);
            else
            {
                buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
            }
        }
    }
    buffer.SetEndOfRowset();
}

The following sample shows a simple source component that uses a File connection manager to load the binary contents of files into the data flow. This sample does not demonstrate all the methods and functionality discussed in this topic. It demonstrates the important methods that every custom source component must override, but does not contain code for design-time validation. For a more complete sample source component, see the Readme_ADO Source Component Sample.

using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace BlobSrc
{
  [DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
  public class BlobSrc : PipelineComponent
  {
    IDTSConnectionManager100 m_ConnMgr;
    int m_FileNameColumnIndex = -1;
    int m_FileBlobColumnIndex = -1;

    public override void ProvideComponentProperties()
    {
      IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
      output.Name = "BLOB File Inserter Output";

      IDTSOutputColumn100 column = output.OutputColumnCollection.New();
      column.Name = "FileName";
      column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);

      column = output.OutputColumnCollection.New();
      column.Name = "FileBLOB";
      column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);

      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
      conn.Name = "FileConnection";
    }

    public override void AcquireConnections(object transaction)
    {
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
      m_ConnMgr = conn.ConnectionManager;
    }

    public override void ReleaseConnections()
    {
      m_ConnMgr = null;
    }

    public override void PreExecute()
    {
      IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

      m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
      m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
    }

    public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
    {
      string strFileName = (string)m_ConnMgr.AcquireConnection(null);

      while (strFileName != null)
      {
        buffers[0].AddRow();

        buffers[0].SetString(m_FileNameColumnIndex, strFileName);

        FileInfo fileInfo = new FileInfo(strFileName);
        byte[] fileData = new byte[fileInfo.Length];
        FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
        fs.Read(fileData, 0, fileData.Length);

        buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);

        strFileName = (string)m_ConnMgr.AcquireConnection(null);
      }

      buffers[0].SetEndOfRowset();
    }
  }
}
Integration Services icon (small) Stay Up to Date with Integration Services

For the latest downloads, articles, samples, and videos from Microsoft, as well as selected solutions from the community, visit the Integration Services page on MSDN or TechNet:

For automatic notification of these updates, subscribe to the RSS feeds available on the page.

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback

Community Additions

ADD
Show:
© 2014 Microsoft