Entwickeln von Datenflusskomponenten mit mehreren Eingaben

Eine Datenflusskomponente mit mehreren Eingaben verbraucht möglicherweise übermäßig viel Arbeitsspeicher, wenn die Eingaben unregelmäßig Daten erzeugen. Wenn Sie eine benutzerdefinierte Datenflusskomponente entwickeln, die zwei oder mehr Eingaben unterstützt, können Sie diese hohe Speicherauslastung verwalten, indem Sie die folgenden Elemente im Microsoft.SqlServer.Dts.Pipeline-Namespace verwenden:

  • Die DtsPipelineComponentAttribute.SupportsBackPressure-Eigenschaft der DtsPipelineComponentAttribute-Klasse. Legen Sie den Wert dieser Eigenschaft auf true fest, um den Code zu implementieren, der erforderlich ist, damit die benutzerdefinierte Datenflusskomponente unregelmäßige Datenflüsse verwaltet.

  • Die IsInputReady-Methode der PipelineComponent-Klasse. Sie müssen eine Implementierung dieser Methode bereitstellen, wenn Sie die SupportsBackPressure-Eigenschaft auf true festlegen. Wenn Sie keine Implementierung bereitstellen, löst das Datenflussmodul zur Laufzeit eine Ausnahme aus.

  • Die GetDependentInputs-Methode der PipelineComponent-Klasse. Sie müssen auch eine Implementierung dieser Methode bereitstellen, wenn Sie die SupportsBackPressure-Eigenschaft auf true festlegen und die benutzerdefinierte Komponente mehr als zwei Eingaben unterstützt. Wenn Sie keine Implementierung bereitstellen, löst das Datenflussmodul zur Laufzeit eine Ausnahme aus, wenn der Benutzer mehr als zwei Eingaben anfügt.

Zusammen ermöglichen diese Elemente es Ihnen, eine Lösung für hohe Speicherauslastungen zu entwickeln, die der von Microsoft entwickelten Lösung für die Transformationen für Zusammenführen und Zusammenführungsjoin ähnelt.

Festlegen der SupportsBackPressure-Eigenschaft

Der erste Schritt beim Implementieren einer besseren Speicherverwaltung für eine benutzerdefinierte Datenflusskomponente, die mehrere Eingaben unterstützt, besteht darin, den Wert der SupportsBackPressure-Eigenschaft im DtsPipelineComponentAttribute auf true festzulegen. Wenn der Wert von SupportsBackPressure true ist, ruft das Datenflussmodul die IsInputReady-Methode auf. Bei mehr als zwei Eingaben wird außerdem die GetDependentInputs-Methode zur Laufzeit aufgerufen.

Beispiel

Im folgenden Beispiel legt die Implementierung von DtsPipelineComponentAttribute den Wert von SupportsBackPressure auf true fest.

[DtsPipelineComponent(ComponentType = ComponentType.Transform,
        DisplayName = "Shuffler",
        Description = "Shuffle the rows from input.",
        SupportsBackPressure = true,
        LocalizationType = typeof(Localized),
        IconResource = "Microsoft.Samples.SqlServer.Dts.MIBPComponent.ico")
]
public class Shuffler : Microsoft.SqlServer.Dts.Pipeline.PipelineComponent
        {
          ...
        }

Implementieren der IsInputReady-Methode

Wenn Sie den Wert der SupportsBackPressure-Eigenschaft im DtsPipelineComponentAttribute-Objekt auf true festlegen, müssen Sie auch eine Implementierung für die IsInputReady-Methode der PipelineComponent-Klasse bereitstellen.

HinweisHinweis

Die Implementierung der IsInputReady-Methode sollte nicht die Implementierungen in der Basisklasse aufrufen. Die Standardimplementierung dieser Methode in der Basisklasse löst lediglich eine NotImplementedException aus.

Wenn Sie diese Methode implementieren, legen Sie den Status eines Elements im booleschen canProcess-Array für jede Eingabe der Komponente fest. (Die Eingaben werden von ihren ID-Werten im inputIDs-Array identifiziert.) Wenn Sie den Wert eines Elements im canProcess-Array für eine Eingabe auf true festlegen, ruft das Datenflussmodul die ProcessInput-Methode der Komponente auf und stellt für die angegebene Eingabe mehr Daten bereit.

Es sind mehr Upstreamdaten verfügbar, und der Wert des canProcess-Arrayelements muss für mindestens eine Eingabe immer true sein. Andernfalls wird die Verarbeitung angehalten.

Das Datenflussmodul ruft die IsInputReady-Methode vor dem Senden der einzelnen Datenpuffer auf, um zu bestimmen, welche Eingaben auf den Empfang weiterer Daten warten. Wenn der Rückgabewert angibt, dass eine Eingabe blockiert wird, speichert das Datenflussmodul vorübergehend zusätzliche Datenpuffer für diese Eingabe, statt sie an die Komponente zu senden.

HinweisHinweis

Die IsInputReady-Methode oder die GetDependentInputs-Methode rufen Sie nicht in eigenem Code auf. Das Datenflussmodul ruft diese Methoden und die anderen Methoden der PipelineComponent-Klasse auf, die Sie überschreiben, wenn das Datenflussmodul die Komponente ausführt.

Beispiel

Im folgenden Beispiel gibt die Implementierung der IsInputReady-Methode an, dass eine Eingabe auf den Empfang weiterer Daten wartet, wenn die folgenden Voraussetzungen erfüllt sind:

  • Mehr Upstreamdaten sind für die Eingabe verfügbar (!inputEOR).

  • Die Komponente verfügt derzeit nicht über Daten, die für die Eingabe in den Puffern verarbeitet werden können, die die Komponente bereits empfangen hat (inputBuffers[inputIndex].CurrentRow() == null).

Wenn eine Eingabe auf den Empfang weiterer Daten wartet, gibt die Datenflusskomponente dieses an, indem der Wert des Elements im canProcess-Array, das dieser Eingabe entspricht, auf true festgelegt wird.

Wenn die Komponente hingegen weiterhin verfügbare Daten aufweist, die für die Eingabe verarbeitet können, wird im Beispiel die Verarbeitung der Eingabe angehalten. Hierfür wird im Beispiel der Wert des Elements im canProcess-Array, das dieser Eingabe entspricht, auf false festgelegt.

public override void IsInputReady(int[] inputIDs, ref bool[] canProcess)
{
    for (int i = 0; i < inputIDs.Length; i++)
    {
        int inputIndex = ComponentMetaData.InputCollection.GetObjectIndexByID(inputIDs[i]);

        canProcess[i] = (inputBuffers[inputIndex].CurrentRow() == null)
            && !inputEOR[inputIndex];
    }
}

Im vorangehenden Beispiel wird das boolesche inputEOR-Array verwendet, um anzugeben, ob für die jeweiligen Eingaben weitere Upstreamdaten verfügbar sind. EOR im Namen des Arrays steht für "Ende des Rowsets" und bezieht sich auf die EndOfRowset-Eigenschaft von Datenflusspuffern. In einem Teil des Beispiels, der hier nicht enthalten ist, überprüft die ProcessInput-Methode den Wert der EndOfRowset-Eigenschaft für jeden empfangenen Datenpuffer. Wenn durch den Wert true angegeben wird, dass für eine Eingabe keine Upstreamdaten mehr verfügbar sind, wird im Beispiel der Wert des inputEOR-Arrayelements für diese Eingabe auf true festgelegt. In diesem Beispiel für die IsInputReady-Methode wird der Wert des entsprechenden Elements im canProcess-Array für eine Eingabe auf false festgelegt, wenn der Wert des inputEOR-Arrayelements angibt, dass für die Eingabe keine Upstreamdaten mehr verfügbar sind.

Implementieren der GetDependentInputs-Methode

Wenn die benutzerdefinierte Datenflusskomponente mehr als zwei Eingaben unterstützt, müssen Sie auch eine Implementierung für die GetDependentInputs-Methode der PipelineComponent-Klasse bereitstellen.

HinweisHinweis

Die Implementierung der GetDependentInputs-Methode sollte nicht die Implementierungen in der Basisklasse aufrufen. Die Standardimplementierung dieser Methode in der Basisklasse löst lediglich eine NotImplementedException aus.

Das Datenflussmodul ruft die GetDependentInputs-Methode nur auf, wenn der Benutzer mehr als zwei Eingaben an die Komponente anfügt. Wenn eine Komponente nur über zwei Eingaben verfügt und die IsInputReady-Methode angibt, dass eine Eingabe blockiert ist (canProcess = false), erkennt das Datenflussmodul, dass die andere Eingabe auf den Empfang weiterer Daten wartet. Wenn jedoch mehr als zwei Eingaben vorhanden sind und die IsInputReady-Methode angibt, dass eine Eingabe blockiert ist, identifiziert der zusätzliche Code in GetDependentInputs, welche Eingaben auf den Empfang weiterer Daten warten.

HinweisHinweis

Die IsInputReady-Methode oder die GetDependentInputs-Methode rufen Sie nicht in eigenem Code auf. Das Datenflussmodul ruft diese Methoden und die anderen Methoden der PipelineComponent-Klasse auf, die Sie überschreiben, wenn das Datenflussmodul die Komponente ausführt.

Beispiel

Wenn eine bestimmte Eingabe blockiert ist, gibt die folgende Implementierung der GetDependentInputs-Methode eine Auflistung der Eingaben zurück, die auf den Empfang weiterer Daten warten und daher die betreffende Eingabe blockieren. Die Komponente identifiziert die blockierenden Eingaben, indem überprüft wird, ob außer der blockierten Eingabe andere Eingaben derzeit in den Puffern, die die Komponente bereits empfangen hat, nicht über Daten verfügen, die verarbeitet werden können (inputBuffers[i].CurrentRow() == null). Die GetDependentInputs-Methode gibt dann die Auflistung der blockierenden Eingaben als Auflistung von Eingabe-IDs zurück.

        public override Collection<int> GetDependentInputs(int blockedInputID)
        {
            Collection<int> currentDependencies = new Collection<int>();
            for (int i = 0; i < ComponentMetaData.InputCollection.Count; i++)
            {
                if (ComponentMetaData.InputCollection[i].ID != blockedInputID
                    && inputBuffers[i].CurrentRow() == null)
                {
                    currentDependencies.Add(ComponentMetaData.InputCollection[i].ID);
                }
            }
            
            return currentDependencies;
        }