IROWSETFASTLOAD 및 ISEQUENTIALSTREAM을 사용하여 BLOB 데이터를 SQL Server로 보내기(OLE DB)

이 예제는 IRowsetFastLoad를 사용하여 다양한 길이의 BLOB 데이터를 행별로 스트리밍하는 방법을 보여 줍니다.

기본적으로 이 예제는 인라인 바인딩을 사용하여 다양한 길이의 BLOB 데이터를 행별로 보내기 위해 IRowsetFastLoad를 사용하는 방법을 보여 줍니다. 인라인 BLOB 데이터는 가용 메모리를 초과해서는 안 됩니다. 이 방법은 BLOB 데이터가 수 MB 이하일 때 추가적인 스트림 오버헤드가 없기 때문에 최상으로 수행됩니다. 수 MB 이상의 데이터인 경우, 특히 하나의 블록으로 사용할 수 없는 데이터인 경우 스트리밍이 더 나은 성능을 제공합니다.

#define USE_ISEQSTREAM 주석 처리를 제거한 경우 원본 코드에서 이 예제는 ISequentialStream을 사용합니다. 스트림 구현은 예제에 정의되어 있으며 MAX_BLOB만 변경하면 크기에 관계 없이 BLOB 데이터를 보낼 수 있습니다. 따라서 스트림 데이터는 메모리 크기에 맞거나 하나의 블록 크기일 필요가 없습니다. IRowsetFastLoad::InsertRow를 사용하여 이 공급자를 호출합니다. IRowsetFastLoad::InsertRow를 사용하는 포인터를 스트림에서 읽을 수 있는 데이터의 양과 함께 데이터 버퍼(rgBinding.obValue 오프셋)에 있는 스트림 구현으로 전달합니다. 일부 공급자는 바인딩이 발생할 때 데이터의 길이를 반드시 알 필요가 없습니다. 이런 경우 바인딩에서 길이가 생략될 수 있습니다.

이 예제는 공급자에 데이터를 쓰는 데 공급자의 스트림 인터페이스를 사용하지 않습니다. 대신 공급자가 데이터를 읽는 데 소모하는 스트림 개체로 포인터를 전달합니다. 일반적으로 Microsoft 공급자(SQLOLEDB 및 SQLNCLI)는 모든 데이터가 처리될 때까지 개체에서 1024바이트 청크 형식으로 데이터를 읽습니다. SQLOLEDB나 SQLNCLI 모두 소비자가 공급자의 스트림 개체에 데이터를 쓰도록 허용하기 위한 일체의 구현을 갖고 있지 않습니다. 공급자의 스트림 개체를 통해서 보낼 수 있는 데이터는 길이가 0인 데이터뿐입니다.

소비자가 구현한 ISequentialStream 개체는 행 집합 데이터(IRowsetChange::InsertRow, IRowsetChange::SetData)와 함께 사용하거나 매개 변수를 DBTYPE_IUNKNOWN으로 바인딩하여 매개 변수와 함께 사용할 수 있습니다.

DBTYPE_IUNKNOWN은 바인딩에 데이터 형식으로 지정되었으므로 열이나 대상 매개 변수의 유형과 일치해야 합니다. 행 집합 인터페이스에서 ISequentialStream을 통해 데이터를 보낼 때 변환은 불가능합니다. 매개 변수의 경우 ICommandWithParameters::SetParameterInfo의 사용을 피하고 다른 유형을 지정하여 변환을 강제 실행하도록 합니다. 이 경우 공급자는 모든 BLOB 데이터를 SQL Server로 보내기 전에 로컬에서 데이터를 캐싱하고 변환해야 할 수 있습니다. 큰 BLOB을 로컬에서 캐싱하고 변환하면 성능이 저하됩니다.

자세한 내용은 BLOB 및 OLE 개체를 참조하십시오.

보안 정보보안 정보

가능하면 Windows 인증을 사용하십시오. Windows 인증을 사용할 수 없으면 런타임에 사용자에게 자격 증명을 입력하라는 메시지를 표시합니다. 자격 증명은 파일에 저장하지 않는 것이 좋습니다. 자격 증명을 유지하려면 Win32 crypto API를 사용하여 자격 증명을 암호화해야 합니다.

첫 번째(Transact-SQL) 코드 목록을 실행하여 응용 프로그램에서 사용하는 테이블을 만듭니다.

ole32.lib oleaut32.lib를 사용하여 컴파일하고 다음 C++ 코드 목록을 실행합니다. 이 응용 프로그램은 컴퓨터의 기본 SQL Server 인스턴스에 연결됩니다. 일부 Windows 운영 체제에서는 (localhost) 또는 (local)을 해당 SQL Server 인스턴스의 이름으로 변경해야 합니다. 명명된 인스턴스에 연결하려면 연결 문자열을 L"(local)"에서 L"(local)\\name"으로 변경합니다. 여기서 name은 명명된 인스턴스입니다. 기본적으로 SQL Server Express는 명명된 인스턴스에 설치됩니다. INCLUDE 환경 변수에 sqlncli.h가 들어 있는 디렉터리를 포함해야 합니다.

세 번째(Transact-SQL) 코드 목록을 실행하여 응용 프로그램에서 사용하는 테이블을 삭제합니다.

use master
create table fltest(col1 int, col2 int, col3 image)

// compile with: ole32.lib oleaut32.lib
#include <windows.h>

#define DBINITCONSTANTS   // Must be defined to initialize constants in oledb.h
#define INITGUID              

#include <sqloledb.h>
#include <oledb.h>
#include <msdasc.h>
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>

#define MAX_BLOB  200   // For stream binding this can be any size, but for inline it must fit in memory
#define MAX_ROWS  100

#define SAFE_RELEASE(p) { \
   if (p) { \
      (p)->Release(); \
      (p)=NULL; \
   } \
}

#ifdef USE_ISEQSTREAM
// ISequentialStream implementation for streaming data
class MySequentialStream : public ISequentialStream {

private:
   ULONG m_ulRefCount;
   ULONG m_ulBufSize;
   ULONG m_ulReadSize;
   ULONG m_ulBytesLeft;
   ULONG m_ulReadPos;
   BYTE * m_pSrcData;
   BYTE * m_pReadPtr;
   BOOL m_fWasRead;

public:

   MySequentialStream() {
      m_ulRefCount = 1;
      m_ulBufSize = 0;
      m_ulReadSize = 0;
      m_ulBytesLeft = 0;
      m_ulReadPos = 0;
      m_pSrcData = NULL;
      m_pReadPtr = NULL;
      m_fWasRead = FALSE;
   }

   ~MySequentialStream() {}

   virtual ULONG STDMETHODCALLTYPE AddRef() {
      return ++m_ulRefCount;
   }

   virtual ULONG STDMETHODCALLTYPE Release() {
      --m_ulRefCount;
      if (m_ulRefCount == 0) {
         delete this;
         return 0;
      }
      return m_ulRefCount;
   }

   virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj) {
      if (!ppvObj)
         return E_INVALIDARG;
      else
         *ppvObj = NULL;

      if (riid != IID_ISequentialStream && riid != IID_IUnknown)
         return E_NOINTERFACE;

      AddRef();
      *ppvObj = this;
      return S_OK;
   }

   HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize) {
      if (NULL == pSrcData)
         return E_INVALIDARG;

      // Data length must be non-zero
      if (0 == ulBufSize)
         return E_INVALIDARG;

      m_ulBufSize = ulBufSize;
      m_ulReadSize = ulReadSize;
      m_pSrcData = (BYTE *)pSrcData;
      m_pReadPtr = m_pSrcData;
      m_ulBytesLeft = m_ulReadSize;
      m_ulReadPos = 0;
      m_fWasRead = FALSE;

      return S_OK;
   }

   // Can't write data to SQL Server providers (SQLOLEDB/SQLNCLI).  Instead, they read from our object.
   virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * ) {
      return E_NOTIMPL;
   }

   // This implementation simply copies data from the source buffer in whatever size requested.
   // But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.
   virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead) {
      ULONG ulBytesWritten = 0;
      ULONG ulCBToWrite = cb;
      ULONG ulCBToCopy;
      BYTE * pvb = (BYTE *)pv;

      m_fWasRead = TRUE;

      if (NULL == m_pSrcData)
         return E_FAIL;

      if (NULL == pv)
         return STG_E_INVALIDPOINTER;

      while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft) {
         // Make sure we don't write more than our max read size or the size they asked for
         ulCBToCopy = min(m_ulBytesLeft, cb);

         // Make sure we don't read past the end of the internal buffer
         ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);

         memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);
         pvb += ulCBToCopy;
         ulBytesWritten += ulCBToCopy;
         m_ulBytesLeft -= ulCBToCopy;
         cb -= ulCBToCopy;

         // Wrap reads around the src buffer
         m_ulReadPos += ulCBToCopy;
         if (m_ulReadPos >= m_ulBufSize)
            m_ulReadPos = 0;
      }

      if (pcbRead)
         *pcbRead = ulBytesWritten;

      return S_OK;
   }
};

#endif // USE_ISEQSTREAM

HRESULT SetFastLoadProperty(IDBInitialize * pIDBInitialize) {
   HRESULT hr = S_OK;
   IDBProperties * pIDBProps = NULL;
   DBPROP rgProps[1];
   DBPROPSET PropSet;

   VariantInit(&rgProps[0].vValue);

   rgProps[0].dwOptions = DBPROPOPTIONS_REQUIRED;
   rgProps[0].colid = DB_NULLID;
   rgProps[0].vValue.vt = VT_BOOL;
   rgProps[0].dwPropertyID = SSPROP_ENABLEFASTLOAD;

   rgProps[0].vValue.boolVal = VARIANT_TRUE;

   PropSet.rgProperties = rgProps;
   PropSet.cProperties = 1;
   PropSet.guidPropertySet = DBPROPSET_SQLSERVERDATASOURCE;

   if (SUCCEEDED(hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (LPVOID *)&pIDBProps))) {
      hr = pIDBProps->SetProperties(1, &PropSet);
   }

   VariantClear(&rgProps[0].vValue); 

   if (pIDBProps)
      pIDBProps->Release();

   return hr;
}

void wmain() {
   // Setup the initialization options
   ULONG cProperties = 0;
   DBPROP rgProperties[10];
   ULONG cPropSets = 0;
   DBPROPSET rgPropSets[1];
   LPWSTR pwszProgID = L"SQLOLEDB";
   LPWSTR pwszDataSource = NULL;
   LPWSTR pwszUserID = NULL;
   LPWSTR pwszPassword = NULL;
   LPWSTR pwszProviderString = L"server=(local);trusted_connection=yes;";

   IDBInitialize * pIDBInitialize = NULL;
   IDBCreateSession * pIDBCrtSess = NULL;
   IOpenRowset * pIOpenRowset = NULL;
   IDBCreateCommand * pIDBCrtCmd = NULL;
   ICommandText * pICmdText = NULL;
   IAccessor * pIAccessor = NULL;
   IRowsetFastLoad * pIRowsetFastLoad = NULL;
   IDBProperties * pIDBProperties = NULL;
   DBBINDING rgBinding[3];
   DBBINDSTATUS rgStatus[3];
   ULONG ulOffset = 0;
   HACCESSOR hAcc = DB_NULL_HACCESSOR;
   BYTE * pData = NULL;
   ULONG iRow = 0;
   LPWSTR pwszTableName = L"fltest";
   DBID TableID;

   HRESULT hr;

#ifdef USE_ISEQSTREAM
   BYTE bSrcBuf[1024];   // A buffer to hold our data for streaming
   memset((void *)&bSrcBuf, 0xAB, sizeof(bSrcBuf));   // Stream data value 0xAB
   MySequentialStream * pMySeqStream = new MySequentialStream();
   DBOBJECT MyObject = {STGM_READ, IID_ISequentialStream};   // NULL pObject implies STGM_READ and IID_IUnknown, but not recommended
#endif

   memset(rgBinding, 0, ( sizeof(rgBinding) / sizeof(rgBinding[0])) * sizeof(DBBINDING) );
   TableID.eKind = DBKIND_NAME;
   TableID.uName.pwszName = pwszTableName;

   // Col1
   rgBinding[0].iOrdinal = 1;
   rgBinding[0].wType = DBTYPE_I4;
   rgBinding[0].obStatus = ulOffset;
   ulOffset+=sizeof(DBSTATUS);
   rgBinding[0].obLength = ulOffset;
   ulOffset+=sizeof(DBLENGTH);
   rgBinding[0].obValue = ulOffset;
   ulOffset += sizeof(LONG);
   rgBinding[0].cbMaxLen = sizeof(LONG);
   rgBinding[0].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
   rgBinding[0].eParamIO = DBPARAMIO_NOTPARAM;
   rgBinding[0].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

   //Col2
   rgBinding[1].iOrdinal = 2;
   rgBinding[1].wType = DBTYPE_I4;
   rgBinding[1].obStatus = ulOffset;
   ulOffset+=sizeof(DBSTATUS);
   rgBinding[1].obLength = ulOffset;
   ulOffset+=sizeof(DBLENGTH);
   rgBinding[1].obValue = ulOffset;
   ulOffset += sizeof(LONG);
   rgBinding[1].cbMaxLen = sizeof(LONG);
   rgBinding[1].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
   rgBinding[1].eParamIO = DBPARAMIO_NOTPARAM;
   rgBinding[1].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

   //Col3
   rgBinding[2].iOrdinal = 3;
   rgBinding[2].obStatus = ulOffset;
   ulOffset+=sizeof(DBSTATUS);
   rgBinding[2].obLength = ulOffset;
   ulOffset+=sizeof(DBLENGTH);
   rgBinding[2].obValue = ulOffset;
   rgBinding[2].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;   // DBPART_LENGTH not needed for providers that don't require length
   rgBinding[2].eParamIO = DBPARAMIO_NOTPARAM;
   rgBinding[2].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

#ifdef USE_ISEQSTREAM
   rgBinding[2].wType = DBTYPE_IUNKNOWN;
   ulOffset += sizeof(ISequentialStream *);   // Technically should be sizeof(MySequentialStream *), but who's counting?
   rgBinding[2].cbMaxLen = sizeof(ISequentialStream *);
   rgBinding[2].pObject = &MyObject;
#else
   rgBinding[2].wType = DBTYPE_BYTES;
   ulOffset += MAX_BLOB;
   rgBinding[2].cbMaxLen = MAX_BLOB;
#endif

   // Set init props
   for ( ULONG i = 0 ; i < sizeof(rgProperties) / sizeof(rgProperties[0]) ; i++ )
      VariantInit(&rgProperties[i].vValue);

   // Obtain the provider's clsid
   CLSID clsidProv;
   hr = CLSIDFromProgID(pwszProgID, &clsidProv);

   // Get our initial connection
   CoInitialize(NULL);

   if (SUCCEEDED(hr))
      hr = CoCreateInstance(clsidProv, NULL, CLSCTX_ALL, IID_IDBInitialize,(void **)&pIDBInitialize);

   if (SUCCEEDED(hr))
      hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (void **)&pIDBProperties);

   // DBPROP_INIT_DATASOURCE
   if (pwszDataSource) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_INIT_DATASOURCE;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszDataSource);               
      cProperties++;
   }

   // DBPROP_AUTH_USERID
   if (pwszUserID) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_AUTH_USERID;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszUserID);
      cProperties++;
   }

   // DBPROP_AUTH_PASSWORD
   if (pwszPassword) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_AUTH_PASSWORD;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszPassword);
      cProperties++;
   }

   // DBPROP_INIT_PROVIDERSTRING
   if (pwszProviderString) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_INIT_PROVIDERSTRING;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszProviderString);
      cProperties++;
   }

   if (cProperties) {
      rgPropSets[cPropSets].cProperties = cProperties;
      rgPropSets[cPropSets].rgProperties = rgProperties;
      rgPropSets[cPropSets].guidPropertySet = DBPROPSET_DBINIT;
      cPropSets++;
   }

   // Initialize
   if (SUCCEEDED(hr))
      hr = pIDBProperties->SetProperties(cPropSets, rgPropSets);

   if (SUCCEEDED(hr))
      hr = pIDBInitialize->Initialize();

   if (SUCCEEDED(hr)) {
      printf("\tConnected!\r\n");
   }
   else
      printf("Unable to connect\r\n");

   // Set fastload prop
   if (SUCCEEDED(hr))
      hr = SetFastLoadProperty(pIDBInitialize);

   if (SUCCEEDED(hr))
      hr = pIDBInitialize->QueryInterface(IID_IDBCreateSession, (void **)&pIDBCrtSess);

   if (SUCCEEDED(hr))
      hr = pIDBCrtSess->CreateSession(NULL, IID_IOpenRowset, (IUnknown **)&pIOpenRowset);

   if (SUCCEEDED(hr))
      hr = pIOpenRowset->OpenRowset(NULL, &TableID, NULL, IID_IRowsetFastLoad, 0, NULL, (IUnknown **)&pIRowsetFastLoad);

   if (SUCCEEDED(hr))
      hr = pIRowsetFastLoad->QueryInterface(IID_IAccessor, (void **)&pIAccessor);

   if (SUCCEEDED(hr))
      hr = pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 3, rgBinding, ulOffset, &hAcc, (DBBINDSTATUS *)&rgStatus);

   if (SUCCEEDED(hr)) {
      pData = (BYTE *)malloc(ulOffset);

      for (iRow = 0 ; iRow < MAX_ROWS ; iRow++) {
         // Column 1 data        
         *(DBSTATUS *)(pData + rgBinding[0].obStatus) = DBSTATUS_S_OK;
         *(DBLENGTH *)(pData + rgBinding[0].obLength) = 1234567;   // Ignored for I4 data
         *(LONG *)(pData + rgBinding[0].obValue) = iRow;

         // Column 2 data        
         *(DBSTATUS *)(pData + rgBinding[1].obStatus) = DBSTATUS_S_OK;
         *(DBLENGTH *)(pData + rgBinding[1].obLength) = 1234567;   // Ignored for I4 data
         *(LONG *)(pData + rgBinding[1].obValue) = iRow + 1;

         // Column 3 data        
         *(DBSTATUS *)(pData + rgBinding[2].obStatus) = DBSTATUS_S_OK;
         *(DBLENGTH *)(pData + rgBinding[2].obLength) = MAX_BLOB/(iRow + 1);   // Not needed for providers that don't require length
#ifdef USE_ISEQSTREAM
         // DBLENGTH is used to tell the provider how much BLOB data to expect from the stream, not required
         // if provider supports sending data without length
         *(ISequentialStream **)(pData+rgBinding[2].obValue) = (ISequentialStream *)pMySeqStream; 
         pMySeqStream->Init((void *)&bSrcBuf, sizeof(bSrcBuf), MAX_BLOB / (iRow + 1));   // Here we set the size we will let the provider read
         pMySeqStream->AddRef();   // The provider releases the object, so we addref it so it doesn't get destructed
#else
         memset(pData + rgBinding[2].obValue, 0, MAX_BLOB);   // Not strictly necessary
         memset(pData + rgBinding[2].obValue, 0x23, MAX_BLOB / (iRow + 1)); 
#endif
         if (SUCCEEDED(hr))
            hr = pIRowsetFastLoad->InsertRow(hAcc, pData);
      }
   }

   if (SUCCEEDED(hr))
      hr = pIRowsetFastLoad->Commit(TRUE);

   if (hAcc)
      pIAccessor->ReleaseAccessor(hAcc, NULL);

   SAFE_RELEASE(pIDBInitialize);
   SAFE_RELEASE(pIDBCrtSess);
   SAFE_RELEASE(pIOpenRowset);
   SAFE_RELEASE(pIDBCrtCmd);
   SAFE_RELEASE(pICmdText);
   SAFE_RELEASE(pIAccessor);
   SAFE_RELEASE(pIRowsetFastLoad);
   SAFE_RELEASE(pIDBProperties);
#ifdef USE_ISEQSTREAM
   SAFE_RELEASE(pMySeqStream);
#endif

   if (pData)
      free(pData);

   CoUninitialize();
}

use master
drop table fltest