From f6a1a147e786e885ae43f0b744da14754bc1aecb Mon Sep 17 00:00:00 2001 From: Hiroshi Inoue Date: Fri, 19 May 2006 23:08:33 +0000 Subject: [PATCH] The first cut for MSDTC supprort and the version is now 8.2.0004. 1. Improve unknown-size estimation in case of the *longest* option. 2. Improve SQL_DEFAULT_PARAM handling. --- columninfo.h | 2 +- connection.h | 2 +- convert.c | 9 +- environ.c | 3 +- msdtc_enlist.cpp | 1074 ++++++++++++++++++++++++++++++++++++++++++++++ odbcapi.c | 6 +- pgapi30.c | 3 +- pgxalib.cpp | 579 +++++++++++++++++++++++++ version.h | 6 +- 9 files changed, 1670 insertions(+), 14 deletions(-) create mode 100755 msdtc_enlist.cpp create mode 100755 pgxalib.cpp diff --git a/columninfo.h b/columninfo.h index 85ced66..f1da6e3 100644 --- a/columninfo.h +++ b/columninfo.h @@ -17,7 +17,7 @@ struct ColumnInfoClass_ char **name; /* list of type names */ Oid *adtid; /* list of type ids */ Int2 *adtsize; /* list type sizes */ - Int2 *display_size; /* the display size (longest row) */ + Int4 *display_size; /* the display size (longest row) */ Int4 *atttypmod; /* the length of bpchar/varchar */ Oid *relid; /* list of relation ids */ Oid *attid; /* list of attribute ids */ diff --git a/connection.h b/connection.h index 8ae2108..c035b0e 100644 --- a/connection.h +++ b/connection.h @@ -519,7 +519,7 @@ enum { #ifdef WIN32 #ifdef _HANDLE_ENLIST_IN_DTC_ RETCODE EnlistInDtc(ConnectionClass *conn, void *pTra, int method); -RETCODE DtcOnDisconnect(ConnectionClass *, BOOL); +RETCODE DtcOnDisconnect(ConnectionClass *); #endif /* _HANDLE_ENLIST_IN_DTC_ */ #endif /* WIN32 */ diff --git a/convert.c b/convert.c index ee27273..f78d3d5 100644 --- a/convert.c +++ b/convert.c @@ -3110,19 +3110,20 @@ inolog("ipara=%x paramType=%d %d proc_return=%d\n", ipara, ipara ? ipara->paramT } req_bind = (0 != (FLGB_BUILDING_BIND_REQUEST & qb->flags)); - /* Handle DEFAULT_PARAM parameter data */ + /* Handle DEFAULT_PARAM parameter data. Should be NULL ? if (used == SQL_DEFAULT_PARAM) { return SQL_SUCCESS; - } + } */ if (req_bind) { npos = qb->npos; qb->npos += 4; } /* Handle NULL parameter data */ - if ((ipara && SQL_PARAM_OUTPUT == ipara->paramType) || - used == SQL_NULL_DATA) + if ((ipara && SQL_PARAM_OUTPUT == ipara->paramType) + || used == SQL_NULL_DATA + || used == SQL_DEFAULT_PARAM) { if (req_bind) { diff --git a/environ.c b/environ.c index 403223d..17627eb 100644 --- a/environ.c +++ b/environ.c @@ -332,8 +332,9 @@ PGAPI_ConnectError( HDBC hdbc, /* general error */ break; case CONN_UNSUPPORTED_OPTION: - pg_sqlstate_set(env, szSqlState, "IM001", "IM001"); + pg_sqlstate_set(env, szSqlState, "HYC00", "IM001"); /* driver does not support this function */ + break; case CONN_INVALID_ARGUMENT_NO: pg_sqlstate_set(env, szSqlState, "HY009", "S1009"); /* invalid argument value */ diff --git a/msdtc_enlist.cpp b/msdtc_enlist.cpp new file mode 100755 index 0000000..19656ed --- /dev/null +++ b/msdtc_enlist.cpp @@ -0,0 +1,1074 @@ +/*------ + * Module: msdtc_enlist.cpp + * + * Description: + * This module contains routines related to + * the enlistment in MSDTC. + * + *------- + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0400 +#endif /* _WIN32_WINNT */ + +#include +#include +/*#include */ +#include "connection.h" + +/*#define _SLEEP_FOR_TEST_*/ +#include +#include +#include +#include +#include +#ifndef WIN32 +#include +#endif /* WIN32 */ + +#include "qresult.h" +#include "dlg_specific.h" + +#include "pgapifunc.h" + +static CRITICAL_SECTION life_cs; +static CRITICAL_SECTION map_cs; +static class INIT_CRIT +{ +public: + INIT_CRIT() { + InitializeCriticalSection(&life_cs); + InitializeCriticalSection(&map_cs); + } + ~INIT_CRIT() { + DeleteCriticalSection(&life_cs); + DeleteCriticalSection(&map_cs); + } +} init_crit; +#define LIFELOCK_ACQUIRE EnterCriticalSection(&life_cs) +#define LIFELOCK_RELEASE LeaveCriticalSection(&life_cs) +#define MLOCK_ACQUIRE EnterCriticalSection(&map_cs) +#define MLOCK_RELEASE LeaveCriticalSection(&map_cs) + + +static const char *XidToText(const XID &xid, char *rtext) +{ + int glen = xid.gtrid_length, blen = xid.bqual_length; + int i, j; + + for (i = 0, j = 0; i < glen; i++, j += 2) + sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); + strcat(rtext, "-"); j++; + for (; i < glen + blen; i++, j += 2) + sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); + return rtext; +} + + +EXTERN_C extern HMODULE s_hModule; + +static char xalibpath[_MAX_PATH] = ""; +static char xalibname[_MAX_FNAME] = ""; +static LONG g_cComponents = 0; +static LONG g_cServerLocks = 0; + +static char *GetXaLibName() +{ + char dllpath[_MAX_PATH], drive[_MAX_DRIVE], dir[_MAX_DIR], + fname[_MAX_FNAME], ext[_MAX_EXT]; + if (!xalibpath[0]) + { + GetModuleFileName(s_hModule, dllpath, sizeof(dllpath)); + /* In Windows XP SP2, the dllname should be specified */ + /* instead of the dll path name, because it looks up */ + /* the HKLM\Microfost\MSDTC\XADLL\(dllname) registry */ + /* entry for security reason. */ + _splitpath(dllpath, drive, dir, fname, ext); + // snprintf(xalibname, sizeof(xalibname), "%s%s", fname, ext); + strcpy(xalibname, "pgxalib.dll"); + snprintf(xalibpath, sizeof(xalibpath), "%s%s%s", drive, dir, xalibname); + } + return xalibname; +} + +// +// ˆÈ‰º‚ÌITransactionResourceAsyncƒIƒuƒWƒFƒNƒg‚Í”CˆÓ‚̃XƒŒƒbƒh‚©‚ç +// Ž©—R‚ɃAƒNƒZƒX‰Â”\‚Ȃ悤‚ÉŽÀ‘•‚·‚éBŠeRequest‚ÌŒ‹‰Ê‚ð•Ô‚·‚½‚ß‚É +// Žg—p‚·‚éITransactionEnlistmentAsyncƒCƒ“ƒ^[ƒtƒFƒCƒX‚à‚»‚̂悤‚É +// ŽÀ‘•‚³‚ê‚Ä‚¢‚éi‚ÆŽv‚í‚ê‚éA‰º‹LŽQÆj‚̂ŌĂяo‚µ‚ÉCOM‚̃Aƒp[ +// ƒgƒƒ“ƒg‚ðˆÓޝ‚·‚é(CoMarshalInterThreadInterfaceInStream/CoGetIn +// terfaceAndReleaseStream‚ðŽg—p‚·‚éj•K—v‚͂Ȃ¢B +// ‚±‚ÌDLL“à‚ÅŽg—p‚·‚éITransactionResourceAsync‚ÆITransactionEnlist +// mentAsync‚̃Cƒ“ƒ^[ƒtƒFƒCƒXƒ|ƒCƒ“ƒ^[‚Í”CˆÓ‚̃XƒŒƒbƒh‚©‚ç’¼ÚŽg—p +// ‚·‚邱‚Æ‚ª‚Å‚«‚éB +// + +// OLE Transactions Standard +// +// OLE Transactions is the Microsoft interface standard for transaction +// management. Applications use OLE Transactions-compliant interfaces to +// initiate, commit, abort, and inquire about transactions. Resource +// managers use OLE Transactions-compliant interfaces to enlist in +// transactions, to propagate transactions to other resource managers, +// to propagate transactions from process to process or from system to +// system, and to participate in the two-phase commit protocol. +// +// The Microsoft DTC system implements most OLE Transactions-compliant +// objects, interfaces, and methods. Resource managers that wish to use +// OLE Transactions must implement some OLE Transactions-compliant objects, +// interfaces, and methods. +// +// The OLE Transactions specification is based on COM but it differs in the +// following respects: +// +// OLE Transactions objects cannot be created using the COM CoCreate APIs. +// References to OLE Transactions objects are always direct. Therefore, +// no proxies or stubs are created for inter-apartment, inter-process, +// or inter-node calls and OLE Transactions references cannot be marshaled +// using standard COM marshaling. +// All references to OLE Transactions objects and their sinks are completely +// free threaded and cannot rely upon COM concurrency control models. +// For example, you cannot pass a reference to an IResourceManagerSink +// interface on a single-threaded apartment and expect the callback to occur +// only on the same single-threaded apartment. + +/*#define _LOCK_DEBUG_ */ +class IAsyncPG : public ITransactionResourceAsync +{ + friend class AsyncThreads; +private: + IDtcToXaHelperSinglePipe *helper; + DWORD RMCookie; + ConnectionClass *conn; + ConnectionClass *xaconn; + LONG refcnt; + CRITICAL_SECTION as_spin; // to make this object Both + CRITICAL_SECTION as_exec; // to make this object Both + XID xid; + bool prepared; + HANDLE eThread[3]; + HRESULT prepare_result; + bool requestAccepted; + HRESULT commit_result; +#ifdef _LOCK_DEBUG_ + int spin_cnt; + int cs_cnt; +#endif /* _LOCK_DEBUG_ */ + +public: + enum { + PrepareExec = 0 + ,CommitExec + ,AbortExec + }; + + ITransactionEnlistmentAsync *enlist; + + HRESULT STDMETHODCALLTYPE QueryInterface(REFIID iid, void ** ppvObject); + ULONG STDMETHODCALLTYPE AddRef(void); + ULONG STDMETHODCALLTYPE Release(void); + + HRESULT STDMETHODCALLTYPE PrepareRequest(BOOL fRetaining, + DWORD grfRM, + BOOL fWantMoniker, + BOOL fSinglePhase); + HRESULT STDMETHODCALLTYPE CommitRequest(DWORD grfRM, XACTUOW * pNewUOW); + HRESULT STDMETHODCALLTYPE AbortRequest(BOID * pboidReason, + BOOL fRetaining, + XACTUOW * pNewUOW); + HRESULT STDMETHODCALLTYPE TMDown(void); + + IAsyncPG(); + ~IAsyncPG(); + void SetHelper(IDtcToXaHelperSinglePipe *pHelper, DWORD dwRMCookie) {helper = pHelper; RMCookie = dwRMCookie;} + + HRESULT RequestExec(DWORD type, HRESULT res); + HRESULT ReleaseConnection(void); + void SetConnection(ConnectionClass *sconn) {SLOCK_ACQUIRE(); conn = sconn; SLOCK_RELEASE();} + void SetXid(const XID *ixid) {SLOCK_ACQUIRE(); xid = *ixid; SLOCK_RELEASE();} +private: +#ifdef _LOCK_DEBUG_ + void SLOCK_ACQUIRE() {forcelog("SLOCK_ACQUIRE %d\n", spin_cnt); EnterCriticalSection(&as_spin); spin_cnt++;} + void SLOCK_RELEASE() {forcelog("SLOCK_RELEASE=%d\n", spin_cnt); LeaveCriticalSection(&as_spin); spin_cnt--;} +#else + void SLOCK_ACQUIRE() {EnterCriticalSection(&as_spin);} + void SLOCK_RELEASE() {LeaveCriticalSection(&as_spin);} +#endif /* _LOCK_DEBUG_ */ + void ELOCK_ACQUIRE() {EnterCriticalSection(&as_exec);} + void ELOCK_RELEASE() {LeaveCriticalSection(&as_exec);} + ConnectionClass *getLockedXAConn(void); + ConnectionClass *generateXAConn(bool spinAcquired); + void SetPrepareResult(HRESULT res) {SLOCK_ACQUIRE(); prepared = true; prepare_result = res; SLOCK_RELEASE();} + void SetDone(HRESULT); + void Reset_eThread(int idx) {SLOCK_ACQUIRE(); eThread[idx] = NULL; SLOCK_RELEASE();} + void Wait_pThread(bool slock_hold); + void Wait_cThread(bool slock_hold, bool once); +}; + +// +// For thread control. +// +class AsyncWait { +private: + IAsyncPG *obj; + DWORD type; + int waiting_count; +public: + AsyncWait(IAsyncPG *async, DWORD itype) : obj(async), type(itype), waiting_count(0) {} + AsyncWait(const AsyncWait &a_th) : obj(a_th.obj), type(a_th.type), waiting_count(a_th.waiting_count) {} + ~AsyncWait() {} + IAsyncPG *GetObj() const {return obj;} + DWORD GetType() const {return type;} + int WaitCount() const {return waiting_count;} + int StartWaiting() {return ++waiting_count;} + int StopWaiting() {return --waiting_count;} +}; +// +// List of threads invoked from IAsyncPG objects. +// +class AsyncThreads { +private: + static std::map th_list; +public: + static void insert(HANDLE, IAsyncPG *, DWORD); + static void CleanupThreads(DWORD millisecond); + static bool WaitThread(IAsyncPG *, DWORD type, DWORD millisecond); +}; + + +IAsyncPG::IAsyncPG(void) : helper(NULL), RMCookie(0), enlist(NULL), conn(NULL), xaconn(NULL), refcnt(1), prepared(false), requestAccepted(false) +{ + InterlockedIncrement(&g_cComponents); + InitializeCriticalSection(&as_spin); + InitializeCriticalSection(&as_exec); + eThread[0] = eThread[1] = eThread[2] = NULL; + memset(&xid, 0, sizeof(xid)); +#ifdef _LOCK_DEBUG_ + spin_cnt = 0; + cs_cnt = 0; +#endif /* _LOCK_DEBUG_ */ +} + +IAsyncPG::~IAsyncPG(void) +{ + ConnectionClass *fconn = NULL; + + if (conn) + { + conn->asdum = NULL; + conn = NULL; + } + if (xaconn) + { + fconn = xaconn; + xaconn->asdum = NULL; + xaconn = NULL; + } + SLOCK_RELEASE(); + LIFELOCK_RELEASE; + if (fconn) + PGAPI_FreeConnect((HDBC) fconn); + DeleteCriticalSection(&as_spin); + ELOCK_RELEASE(); + DeleteCriticalSection(&as_exec); + InterlockedDecrement(&g_cComponents); +} +HRESULT STDMETHODCALLTYPE IAsyncPG::QueryInterface(REFIID riid, void ** ppvObject) +{ +forcelog("%x QueryInterface called\n", this); + if (riid == IID_IUnknown || riid == IID_ITransactionResourceAsync) + { + *ppvObject = this; + AddRef(); + return S_OK; + } + *ppvObject = NULL; + return E_NOINTERFACE; +} +ULONG STDMETHODCALLTYPE IAsyncPG::AddRef(void) +{ + mylog("%x->AddRef called\n", this); + SLOCK_ACQUIRE(); + refcnt++; + SLOCK_RELEASE(); + return refcnt; +} +ULONG STDMETHODCALLTYPE IAsyncPG::Release(void) +{ + mylog("%x->Release called refcnt=%d\n", this, refcnt); + SLOCK_ACQUIRE(); + refcnt--; + if (refcnt <= 0) + { + SLOCK_RELEASE(); + ELOCK_ACQUIRE(); + LIFELOCK_ACQUIRE; + SLOCK_ACQUIRE(); + if (refcnt <=0) + { + mylog("delete %x\n", this); + delete this; + } + else + { + SLOCK_RELEASE(); + LIFELOCK_RELEASE; + ELOCK_RELEASE(); + } + } + else + SLOCK_RELEASE(); + return refcnt; +} + +void IAsyncPG::Wait_pThread(bool slock_hold) +{ + mylog("Wait_pThread %d in\n", slock_hold); + HANDLE wThread; + int wait_idx = PrepareExec; + bool th_found; + if (!slock_hold) + SLOCK_ACQUIRE(); + while (NULL != eThread[wait_idx]) + { + wThread = eThread[wait_idx]; + SLOCK_RELEASE(); + /* + if (WAIT_OBJECT_0 == WaitForSingleObject(wThread, 2000)) + { + SLOCK_ACQUIRE(); + if (eThread[wait_idx]) + { + CloseHandle(wThread); + eThread[wait_idx] = NULL; + } + } + else + SLOCK_ACQUIRE(); + */ + th_found = AsyncThreads::WaitThread(this, wait_idx, 2000); + SLOCK_ACQUIRE(); + if (th_found) + break; + } + if (!slock_hold) + SLOCK_RELEASE(); + mylog("Wait_pThread out\n"); +} + +void IAsyncPG::Wait_cThread(bool slock_hold, bool once) +{ + HANDLE wThread; + int wait_idx; + bool th_found; + + mylog("Wait_cThread %d,%d in\n", slock_hold, once); + if (!slock_hold) + SLOCK_ACQUIRE(); + if (NULL != eThread[CommitExec]) + wait_idx = CommitExec; + else + wait_idx = AbortExec; + while (NULL != eThread[wait_idx]) + { + wThread = eThread[wait_idx]; + SLOCK_RELEASE(); + /* + if (WAIT_OBJECT_0 == WaitForSingleObject(wThread, 2000)) + { + SLOCK_ACQUIRE(); + if (cThread) + { + CloseHandle(wThread); + cThread = NULL; + } + } + else + SLOCK_ACQUIRE(); + */ + th_found = AsyncThreads::WaitThread(this, wait_idx, 2000); + SLOCK_ACQUIRE(); + if (once || th_found) + break; + } + if (!slock_hold) + SLOCK_RELEASE(); + mylog("Wait_cThread out\n"); +} + +/* Processing Prepare/Commit Request */ +typedef +struct RequestPara { + DWORD type; + LPVOID lpr; + HRESULT res; +} RequestPara; + +void IAsyncPG::SetDone(HRESULT res) +{ + LIFELOCK_ACQUIRE; + SLOCK_ACQUIRE(); + prepared = false; + requestAccepted = true; + commit_result = res; + if (conn || xaconn) + { + if (conn) + { + conn->asdum = NULL; + conn = NULL; + } + SLOCK_RELEASE(); + LIFELOCK_RELEASE; + ELOCK_ACQUIRE(); + if (xaconn) + { + xaconn->asdum = NULL; + PGAPI_FreeConnect(xaconn); + xaconn = NULL; + } + ELOCK_RELEASE(); + } + else + { + SLOCK_RELEASE(); + LIFELOCK_RELEASE; + } + // Release(); /* unneccesary */ +} + +ConnectionClass *IAsyncPG::generateXAConn(bool spinAcquired) +{ + if (!spinAcquired) + SLOCK_ACQUIRE(); + if (prepared && !xaconn) + { + SLOCK_RELEASE(); + ELOCK_ACQUIRE(); + LIFELOCK_ACQUIRE; + SLOCK_ACQUIRE(); + if (prepared && !xaconn) + { + PGAPI_AllocConnect(conn->henv, (HDBC *) &xaconn); + memcpy(&xaconn->connInfo, &conn->connInfo, sizeof(ConnInfo)); + conn->asdum = NULL; + conn = NULL; + SLOCK_RELEASE(); + LIFELOCK_RELEASE; + CC_connect(xaconn, AUTH_REQ_OK, NULL); + } + else + { + SLOCK_RELEASE(); + LIFELOCK_RELEASE; + } + ELOCK_RELEASE(); + } + else + SLOCK_RELEASE(); + return xaconn; +} + +// +// [in] +// ELOCK is acquired. +// +// [out] +// If the return connection != NULL +// the lock for the connection is acquired. +// +ConnectionClass *IAsyncPG::getLockedXAConn() +{ + SLOCK_ACQUIRE(); + if (!xaconn && conn && !CC_is_in_trans(conn)) + { + if (TRY_ENTER_CONN_CS(conn)) + { + if (CC_is_in_trans(conn)) + { + LEAVE_CONN_CS(conn); + } + else + { + SLOCK_RELEASE(); + return conn; + } + } + } + generateXAConn(true); + if (xaconn) + ENTER_CONN_CS(xaconn); + return xaconn; +} + +HRESULT IAsyncPG::RequestExec(DWORD type, HRESULT res) +{ + HRESULT ret; + bool bReleaseEnlist = false; + ConnectionClass *econn; + QResultClass *qres; + char pgxid[258], cmd[512]; + + mylog("%x->RequestExec type=%d\n", this, type); + XidToText(xid, pgxid); +#ifdef _SLEEP_FOR_TEST_ + /*Sleep(2000);*/ +#endif /* _SLEEP_FOR_TEST_ */ + /* AddRef(); unnecessary */ + ELOCK_ACQUIRE(); + switch (type) + { + case PrepareExec: + if (XACT_S_SINGLEPHASE == res) + { + if (!CC_commit(conn)) + res = E_FAIL; + bReleaseEnlist = true; + } + else if (E_FAIL != res) + { + snprintf(cmd, sizeof(cmd), "PREPARE TRANSACTION '%s'", pgxid); + qres = CC_send_query(conn, cmd, NULL, 0, NULL); + if (!QR_command_maybe_successful(qres)) + res = E_FAIL; + QR_Destructor(qres); + } + ret = enlist->PrepareRequestDone(res, NULL, NULL); + SetPrepareResult(res); + break; + case CommitExec: + Wait_pThread(false); + if (E_FAIL != res) + { + econn = getLockedXAConn(); + if (econn) + { + snprintf(cmd, sizeof(cmd), "COMMIT PREPARED '%s'", pgxid); + qres = CC_send_query(econn, cmd, NULL, 0, NULL); + if (!QR_command_maybe_successful(qres)) + res = E_FAIL; + QR_Destructor(qres); + LEAVE_CONN_CS(econn); + } + } + SetDone(res); + ret = enlist->CommitRequestDone(res); + bReleaseEnlist = true; + break; + case AbortExec: + Wait_pThread(false); + if (prepared) + { + econn = getLockedXAConn(); + if (econn) + { + snprintf(cmd, sizeof(cmd), "ROLLBACK PREPARED '%s'", pgxid); + qres = CC_send_query(econn, cmd, NULL, 0, NULL); + if (!QR_command_maybe_successful(qres)) + res = E_FAIL; + QR_Destructor(qres); + LEAVE_CONN_CS(econn); + } + } + SetDone(res); + ret = enlist->AbortRequestDone(res); + bReleaseEnlist = true; + break; + default: + ret = -1; + } + if (bReleaseEnlist) + { + helper->ReleaseRMCookie(RMCookie, TRUE); + enlist->Release(); + } + ELOCK_RELEASE(); + /* Release(); unnecessary */ + mylog("%x->Done ret=%d\n", this, ret); + return ret; +} + +HRESULT IAsyncPG::ReleaseConnection(void) +{ + mylog("%x->ReleaseConnection\n", this); + ConnectionClass *iconn; + bool done = false; + + SLOCK_ACQUIRE(); + if (iconn = conn) + { + Wait_pThread(true); + if (NULL != eThread[CommitExec] || NULL != eThread[AbortExec] || requestAccepted) + { + if (prepared) + { + Wait_cThread(true, true); + if (!prepared) + done = true; + } + else + done = true; + if (done) + Wait_cThread(true, false); + } + if (conn && CONN_CONNECTED == conn->status && !done) + { + generateXAConn(true); + } + else + SLOCK_RELEASE(); + } + else + SLOCK_RELEASE(); + mylog("%x->ReleaseConnection exit\n", this); + return SQL_SUCCESS; +} + +EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para); +HRESULT STDMETHODCALLTYPE IAsyncPG::PrepareRequest(BOOL fRetaining, DWORD grfRM, + BOOL fWantMoniker, BOOL fSinglePhase) +{ + HRESULT ret, res; + RequestPara *reqp; + + mylog("%x PrepareRequest called grhRM=%d enl=%x\n", this, grfRM, enlist); + SLOCK_ACQUIRE(); + if (0 != CC_get_errornumber(conn)) + res = ret = E_FAIL; + else + { + ret = S_OK; + if (fSinglePhase) + { + res = XACT_S_SINGLEPHASE; + mylog("XACT is singlePhase\n"); + } + else + res = S_OK; + } + SLOCK_RELEASE(); + ELOCK_ACQUIRE(); +#ifdef _SLEEP_FOR_TEST_ + Sleep(2000); +#endif /* _SLEEP_FOR_TEST_ */ + reqp = new RequestPara; + reqp->type = PrepareExec; + reqp->lpr = (LPVOID) this; + reqp->res = res; + AddRef(); + HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL); + if (NULL == hThread) + { + delete(reqp); + ret = E_FAIL; + } + else + { + AsyncThreads::insert(hThread, this, reqp->type); + } + ELOCK_RELEASE(); + Release(); + return ret; +} +HRESULT STDMETHODCALLTYPE IAsyncPG::CommitRequest(DWORD grfRM, XACTUOW * pNewUOW) +{ + HRESULT res = S_OK, ret = S_OK; + RequestPara *reqp; + mylog("%x CommitRequest called grfRM=%d enl=%x\n", this, grfRM, enlist); + + SLOCK_ACQUIRE(); + if (!prepared) + ret = E_UNEXPECTED; + else if (S_OK != prepare_result) + ret = E_UNEXPECTED; + SLOCK_RELEASE(); + if (S_OK != ret) + return ret; + AddRef(); + ELOCK_ACQUIRE(); +#ifdef _SLEEP_FOR_TEST_ + Sleep(1000); +#endif /* _SLEEP_FOR_TEST_ */ + reqp = new RequestPara; + reqp->type = CommitExec; + reqp->lpr = (LPVOID) this; + reqp->res = res; + enlist->AddRef(); + HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL); + if (NULL == hThread) + { + delete(reqp); + enlist->Release(); + ret = E_FAIL; + } + else + { + AsyncThreads::insert(hThread, this, reqp->type); + } + mylog("CommitRequest ret=%d\n", ret); + requestAccepted = true; + ELOCK_RELEASE(); + Release(); + return ret; +} +HRESULT STDMETHODCALLTYPE IAsyncPG::AbortRequest(BOID * pboidReason, BOOL fRetaining, + XACTUOW * pNewUOW) +{ + HRESULT res = S_OK, ret = S_OK; + RequestPara *reqp; + + mylog("%x AbortRequest called\n", this); + AddRef(); + ELOCK_ACQUIRE(); + if (!prepared && conn) + CC_abort(conn); + reqp = new RequestPara; + reqp->type = AbortExec; + reqp->lpr = (LPVOID) this; + reqp->res = res; + enlist->AddRef(); + HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL); + if (NULL == hThread) + { + delete(reqp); + enlist->Release(); + ret = E_FAIL; + } + else + { + AsyncThreads::insert(hThread, this, reqp->type); + } + mylog("AbortRequest ret=%d\n", ret); + requestAccepted = true; + ELOCK_RELEASE(); + Release(); + return ret; +} +HRESULT STDMETHODCALLTYPE IAsyncPG::TMDown(void) +{ +forcelog("%x TMDown called\n", this); + return S_OK; +} + +// +// +// + +std::map AsyncThreads::th_list; +void AsyncThreads::insert(HANDLE th, IAsyncPG *obj, DWORD type) +{ + if (!obj) return; + MLOCK_ACQUIRE; + th_list.insert(std::pair(th, AsyncWait(obj, type))); + obj->SLOCK_ACQUIRE(); + obj->eThread[type] = th; + obj->SLOCK_RELEASE(); + MLOCK_RELEASE; +} + +bool AsyncThreads::WaitThread(IAsyncPG *obj, DWORD type, DWORD millisecond) +{ + HANDLE th = NULL; + DWORD gtype; + bool typematch; + int wait_count; + + MLOCK_ACQUIRE; + std::map::iterator p; + for (p = th_list.begin(); p != th_list.end(); p++) + { + gtype = p->second.GetType(); + typematch = (gtype == type); + if (p->second.GetObj() == obj && typematch) + { + th = p->first; + break; + } + } + if (NULL == th) + { + MLOCK_RELEASE; + forcelog("WaitThread thread(%x, %d) not found\n", obj, type); + return false; + } + p->second.StartWaiting(); + MLOCK_RELEASE; + + DWORD ret = WaitForSingleObject(th, millisecond); + MLOCK_ACQUIRE; + wait_count = p->second.StopWaiting(); + if (WAIT_OBJECT_0 == ret) + { + IAsyncPG *async = p->second.GetObj(); + + if (type >= 0 && type <= IAsyncPG::AbortExec) + async->Reset_eThread(type); + if (wait_count <= 0) + { + th_list.erase(th); + MLOCK_RELEASE; + CloseHandle(th); + if (type >= IAsyncPG::CommitExec) + { + async->Release(); + } + } + else + MLOCK_RELEASE; + } + else + MLOCK_RELEASE; + return true; +} + +void AsyncThreads::CleanupThreads(DWORD millisecond) +{ + int msize; + DWORD nCount; + + MLOCK_ACQUIRE; + if (msize = th_list.size(), msize <= 0) + { + MLOCK_RELEASE; + return; + } + + mylog("CleanupThreads size=%d\n", msize); + HANDLE *hds = new HANDLE[msize]; + std::map::iterator p; + for (p = th_list.begin(), nCount = 0; p != th_list.end(); p++) + { + hds[nCount++] = p->first; + p->second.StartWaiting(); + } + MLOCK_RELEASE; + int i; + while (nCount > 0) + { + DWORD ret = WaitForMultipleObjects(nCount, hds, 0, millisecond); + if (ret >= nCount) + break; + HANDLE th = hds[ret]; + MLOCK_ACQUIRE; + p = th_list.find(th); + if (p != th_list.end()) + { + int wait_count = p->second.StopWaiting(); + DWORD type = p->second.GetType(); + IAsyncPG * async = p->second.GetObj(); + + if (type >= IAsyncPG::PrepareExec && type <= IAsyncPG::AbortExec) + async->Reset_eThread(type); + if (wait_count <= 0) + { + th_list.erase(th); + MLOCK_RELEASE; + CloseHandle(th); + if (type >= IAsyncPG::CommitExec) + { + async->Release(); + } + } + else + MLOCK_RELEASE; + } + else + MLOCK_RELEASE; + for (i = ret; i < (int) nCount - 1; i++) + hds[i] = hds[i + 1]; + nCount--; + } + for (i = 0; i < (int) nCount; i++) + { + p = th_list.find(hds[i]); + if (p != th_list.end()) + p->second.StopWaiting(); + } + delete [] hds; +} + + +EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para) +{ + RequestPara *reqp = (RequestPara *) para; + DWORD type = reqp->type; + IAsyncPG *async = (IAsyncPG *) reqp->lpr; + HRESULT res = reqp->res, ret; + + mylog("DtcRequestExec type=%d", reqp->type); + delete(reqp); + ret = async->RequestExec(type, res); + mylog(" Done ret=%d\n", ret); + return ret; +} + +CSTR regKey = "SOFTWARE\\Microsoft\\MSDTC\\XADLL"; + +RETCODE static EnlistInDtc_1pipe(ConnectionClass *conn, ITransaction *pTra, ITransactionDispenser *pDtc) +{ + CSTR func = "EnlistInDtc_1pipe"; + static IDtcToXaHelperSinglePipe *pHelper = NULL; + ITransactionResourceAsync *pRes = NULL; + IAsyncPG *asdum; + HRESULT res; + bool retry, errset; + DWORD dwRMCookie; + XID xid; + + if (!pHelper) + { + res = pDtc->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void **) &pHelper); + if (res != S_OK || !pHelper) + { + forcelog("DtcToXaHelperSingelPipe get error %d\n", res); + pHelper = NULL; + return SQL_ERROR; + } + } + res = (NULL != (asdum = new IAsyncPG)) ? S_OK : E_FAIL; + if (S_OK != res) + { + mylog("CoCreateInstance error %d\n", res); + return SQL_ERROR; + } + +mylog("dllname=%s dsn=%s\n", xalibname, conn->connInfo.dsn); res = 0; + retry = false; + errset = false; + ConnInfo *ci = &(conn->connInfo); + char dtcname[1024]; + snprintf(dtcname, sizeof(dtcname), "DRIVER={%s};SERVER=%s;PORT=%s;DATABASE=%s;UID=%s;PWD=%s;" ABBR_SSLMODE "=%s;" ABBR_DEBUG "=%d", + ci->drivername, ci->server, ci->port, ci->database, ci->username, ci->password, ci->sslmode, ci->drivers.debug); + do { + res = pHelper->XARMCreate(dtcname, (char *) GetXaLibName(), &dwRMCookie); + if (S_OK == res) + break; + mylog("XARMCreate error code=%x\n", res); + if (XACT_E_XA_TX_DISABLED == res) + { + CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMcreate error:Please enable XA transaction in MSDTC security configuration", func); + errset = true; + } + else if (!retry) + { + LONG ret; + HKEY sKey; + DWORD rSize; + + ret = ::RegOpenKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, KEY_QUERY_VALUE | KEY_SET_VALUE, &sKey); + if (ERROR_SUCCESS == ret) + { + switch (ret = ::RegQueryValueEx(sKey, "XADLL", NULL, NULL, NULL, &rSize)) + { + case ERROR_SUCCESS: + if (rSize > 0) + break; + default: + ret = ::RegSetValueEx(sKey, xalibname, 0, REG_SZ, + (CONST BYTE *) xalibpath, strlen(xalibpath) + 1); + if (ERROR_SUCCESS == ret) + { + retry = true; + continue; // retry + } + CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMCreate error:Please register HKLM\\SOFTWARE\\Microsoft\\MSDTC\\XADLL", func); + break; + } + } + } + if (!errset) + CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "MSDTC XARMCreate error", func); + return SQL_ERROR; + } while (1); + res = pHelper->ConvertTridToXID((DWORD *) pTra, dwRMCookie, &xid); + if (res != S_OK) + { + mylog("ConvertTridToXid error %d\n", res); + return SQL_ERROR; + } +{ +char pgxid[258]; +XidToText(xid, pgxid); +mylog("ConvertTridToXID -> %s\n", pgxid); +} + asdum->SetXid(&xid); + /* Create an IAsyncPG instance by myself */ + /* DLLGetClassObject(GUID_IAsyncPG, IID_ITransactionResourceAsync, (void **) &asdum); */ + + asdum->SetHelper(pHelper, dwRMCookie); + res = pHelper->EnlistWithRM(dwRMCookie, pTra, asdum, &asdum->enlist); + if (res != S_OK) + { + mylog("EnlistWithRM error %d\n", res); + pHelper->ReleaseRMCookie(dwRMCookie, TRUE); + return SQL_ERROR; + } + + mylog("asdum=%x start transaction\n", asdum); + CC_set_autocommit_off(conn); + asdum->SetConnection(conn); + conn->asdum = asdum; + + return SQL_SUCCESS; +} + + +EXTERN_C RETCODE EnlistInDtc(ConnectionClass *conn, void *pTra, int method) +{ + static ITransactionDispenser *pDtc = NULL; + ConnInfo *ci = &(conn->connInfo); + + if (!pTra) + { + IAsyncPG *asdum = (IAsyncPG *) conn->asdum; + if (asdum) + { + /* asdum->Release(); */ + } + if (ci->autocommit_normal) + CC_set_autocommit_on(conn); + return SQL_SUCCESS; + } + if (CC_is_in_trans(conn)) + { + CC_abort(conn); + ci->autocommit_normal = (CC_is_in_autocommit(conn) ? 1 : 0); + } + if (!pDtc) + { + HRESULT res; + + res = DtcGetTransactionManager(NULL, NULL, IID_ITransactionDispenser, + 0, 0, NULL, (void **) &pDtc); + if (res != S_OK || !pDtc) + { + forcelog("TransactionManager get error %d\n", res); + pDtc = NULL; + } + } + return EnlistInDtc_1pipe(conn, (ITransaction *) pTra, pDtc); +} + +EXTERN_C RETCODE DtcOnDisconnect(ConnectionClass *conn) +{ + mylog("DtcOnDisconnect\n"); + LIFELOCK_ACQUIRE; + IAsyncPG *asdum = (IAsyncPG *) conn->asdum; + if (asdum) + { + asdum->AddRef(); + LIFELOCK_RELEASE; + asdum->ReleaseConnection(); + asdum->Release(); + } + else + LIFELOCK_RELEASE; + return SQL_SUCCESS; +} + +EXTERN_C RETCODE DtcOnRelease(void) +{ + AsyncThreads::CleanupThreads(2000); + return SQL_SUCCESS; +} diff --git a/odbcapi.c b/odbcapi.c index 5d996d1..3745424 100644 --- a/odbcapi.c +++ b/odbcapi.c @@ -294,11 +294,11 @@ SQLDisconnect(HDBC ConnectionHandle) ConnectionClass *conn = (ConnectionClass *) ConnectionHandle; mylog("[%s for %x]", func, ConnectionHandle); - ENTER_CONN_CS(conn); - CC_clear_error(conn); #ifdef _HANDLE_ENLIST_IN_DTC_ - DtcOnDisconnect(conn, TRUE); + DtcOnDisconnect(conn); /* must be called without holding the connection lock */ #endif /* _HANDLE_ENLIST_IN_DTC_ */ + ENTER_CONN_CS(conn); + CC_clear_error(conn); ret = PGAPI_Disconnect(ConnectionHandle); LEAVE_CONN_CS(conn); return ret; diff --git a/pgapi30.c b/pgapi30.c index 2b87342..6e03810 100644 --- a/pgapi30.c +++ b/pgapi30.c @@ -1593,7 +1593,8 @@ PGAPI_SetConnectAttr(HDBC ConnectionHandle, #ifdef WIN32 #ifdef _HANDLE_ENLIST_IN_DTC_ mylog("SQL_ATTR_ENLIST_IN_DTC %x request received\n", Value); - return EnlistInDtc(conn, Value, conn->connInfo.xa_opt); /* telling a lie */ + if (conn->connInfo.xa_opt != 0) + return EnlistInDtc(conn, Value, conn->connInfo.xa_opt); #endif /* _HANDLE_ENLIST_IN_DTC_ */ #endif /* WIN32 */ unsupported = TRUE; diff --git a/pgxalib.cpp b/pgxalib.cpp new file mode 100755 index 0000000..a39fb47 --- /dev/null +++ b/pgxalib.cpp @@ -0,0 +1,579 @@ +/*------ + * Module: pgxalib.cpp + * + * Description: + * This module implements XA like routines + * invoked from MSDTC process. + * + * xa_open(), xa_close(), xa_commit(), + * xa_rollback() and xa_recover() + * are really invoked AFAIC. + *------- + */ + +#include +/*#define _SLEEP_FOR_TEST_*/ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +#define _BUILD_DLL_ +#ifdef _BUILD_DLL_ + +EXTERN_C { + +#define DIRSEPARATOR "\\" +#define PG_BINARY_A "ab" +#define MYLOGDIR "c:" +#define MYLOGFILE "mylog_" + +static void +generate_filename(const char *dirname, const char *prefix, char *filename) +{ + int pid = 0; + + pid = getpid(); + if (dirname == 0 || filename == 0) + return; + + strcpy(filename, dirname); + strcat(filename, DIRSEPARATOR); + if (prefix != 0) + strcat(filename, prefix); + sprintf(filename, "%s%u%s", filename, pid, ".log"); + return; +} + +static HENV env = NULL; +static void FreeEnv() +{ + if (env) + { + SQLFreeHandle(SQL_HANDLE_ENV, env); + env = NULL; + } +} +static FILE *LOGFP = NULL; +static CRITICAL_SECTION mylog_cs; + +static void +mylog(const char *fmt,...) +{ + va_list args; + char filebuf[80]; + static BOOL init = TRUE; + + EnterCriticalSection(&mylog_cs); + va_start(args, fmt); + + if (init) + { + if (!LOGFP) + { + generate_filename(MYLOGDIR, MYLOGFILE, filebuf); + LOGFP = fopen(filebuf, PG_BINARY_A); + } + if (!LOGFP) + { + generate_filename("C:\\podbclog", MYLOGFILE, filebuf); + LOGFP = fopen(filebuf, PG_BINARY_A); + } + if (LOGFP) + setbuf(LOGFP, NULL); + } + init = FALSE; + if (LOGFP) + { + time_t ntime; + char ctim[128]; + + time(&ntime); + strcpy(ctim, ctime(&ntime)); + ctim[strlen(ctim) - 1] = '\0'; + fprintf(LOGFP, "[%d.%d(%s)]", GetCurrentProcessId(), GetCurrentThreadId(), ctim); + vfprintf(LOGFP, fmt, args); + } + va_end(args); + LeaveCriticalSection(&mylog_cs); +} +static int initialize_globals(void) +{ + static int init = 1; + + if (!init) + return 0; + init = 0; + InitializeCriticalSection(&mylog_cs); + + return 0; +} + +static void XatabClear(void); +static void finalize_globals(void) +{ + XatabClear(); + FreeEnv(); + /* my(q)log is unavailable from here */ + mylog("DETACHING PROCESS\n"); + DeleteCriticalSection(&mylog_cs); + fclose(LOGFP); + LOGFP = NULL; +} + +HINSTANCE s_hModule; /* Saved module handle. */ +/* This is where the Driver Manager attaches to this Driver */ +BOOL WINAPI +DllMain(HANDLE hInst, ULONG ul_reason_for_call, LPVOID lpReserved) +{ + WORD wVersionRequested; + WSADATA wsaData; + + switch (ul_reason_for_call) + { + case DLL_PROCESS_ATTACH: + s_hModule = (HINSTANCE) hInst; /* Save for dialog boxes */ + + /* Load the WinSock Library */ + wVersionRequested = MAKEWORD(1, 1); + + if (WSAStartup(wVersionRequested, &wsaData)) + return FALSE; + + /* Verify that this is the minimum version of WinSock */ + if (LOBYTE(wsaData.wVersion) != 1 || + HIBYTE(wsaData.wVersion) != 1) + { + WSACleanup(); + return FALSE; + } + initialize_globals(); + break; + + case DLL_THREAD_ATTACH: + break; + + case DLL_PROCESS_DETACH: + finalize_globals(); + WSACleanup(); + return TRUE; + + case DLL_THREAD_DETACH: + break; + + default: + break; + } + return TRUE; +} + +} /* end of EXTERN_C */ + +#endif /* _BUILD_DLL_ */ + +using namespace std; + +static CRITICAL_SECTION map_cs; +#define MLOCK_ACQUIRE EnterCriticalSection(&map_cs) +#define MLOCK_RELEASE LeaveCriticalSection(&map_cs) + +class XAConnection +{ +private: + string connstr; + HDBC xaconn; + vector qvec; + int pos; +public: + XAConnection(LPCTSTR str) : connstr(str), xaconn(NULL), pos(-1) {} + ~XAConnection(); + HDBC ActivateConnection(void); + void SetPos(int spos) {pos = spos;} + HDBC GetConnection(void) const {return xaconn;} + vector &GetResultVec(void) {return qvec;} + int GetPos(void) {return pos;} + string GetConnstr(void) {return connstr;} +}; + +XAConnection::~XAConnection() +{ + qvec.clear(); + if (xaconn) + SQLFreeHandle(SQL_HANDLE_DBC, xaconn); +} + +HDBC XAConnection::ActivateConnection(void) +{ + RETCODE ret; + + MLOCK_ACQUIRE; + if (!env) + { + ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env); + if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret) + return NULL; + } + MLOCK_RELEASE; + if (!xaconn) + { + ret = SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (PTR) SQL_OV_ODBC3, 0); + ret = SQLAllocHandle(SQL_HANDLE_DBC, env, &xaconn); + if (SQL_SUCCESS == ret || SQL_SUCCESS_WITH_INFO) + { + ret = SQLDriverConnect(xaconn, NULL, (SQLCHAR *) connstr.c_str(), SQL_NTS, NULL, SQL_NULL_DATA, NULL, SQL_DRIVER_COMPLETE); + if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret) + { + mylog("SQLDriverConnect return=%d\n", ret); + SQLFreeHandle(SQL_HANDLE_DBC, xaconn); + xaconn = NULL; + } + } + } + return xaconn; +} + +static map xatab; + +static class INIT_CRIT +{ +private: +public: + INIT_CRIT() {InitializeCriticalSection(&map_cs);} + ~INIT_CRIT() + { +mylog("Leaving INIT_CRIT\n"); + FreeEnv(); + xatab.clear(); + DeleteCriticalSection(&map_cs); + } +} init_crit; + +static void XatabClear(void) +{ + xatab.clear(); +} + +static const char *XidToText(const XID &xid, char *rtext) +{ + int glen = xid.gtrid_length, blen = xid.bqual_length; + int i, j; + + for (i = 0, j = 0; i < glen; i++, j += 2) + sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); + strcat(rtext, "-"); j++; + for (; i < glen + blen; i++, j += 2) + sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); + return rtext; +} + +static int +pg_hex2bin(const UCHAR *src, UCHAR *dst, int length) +{ + UCHAR chr; + const UCHAR *src_wk; + UCHAR *dst_wk; + int i, val; + BOOL HByte = TRUE; + + for (i = 0, src_wk = src, dst_wk = dst; i < length; i++, src_wk++) + { + chr = *src_wk; + if (!chr) + break; + if (chr >= 'a' && chr <= 'f') + val = chr - 'a' + 10; + else if (chr >= 'A' && chr <= 'F') + val = chr - 'A' + 10; + else + val = chr - '0'; + if (HByte) + *dst_wk = (val << 4); + else + { + *dst_wk += val; + dst_wk++; + } + HByte = !HByte; + } + return length; +} + +static int TextToXid(XID &xid, const char *rtext) +{ + int slen, glen, blen; + char *sptr; + + slen = strlen(rtext); + sptr = strchr(rtext, '-'); + if (sptr) + { + glen = (int) (sptr - rtext); + blen = slen - glen - 1; + } + else + { + glen = slen; + blen = 0; + } + xid.gtrid_length = glen / 2; + xid.bqual_length = blen / 2; + pg_hex2bin((const UCHAR *) rtext, (UCHAR *) &xid.data[0], glen); + pg_hex2bin((const UCHAR *) sptr + 1, (UCHAR *) &xid.data[glen / 2], blen); + return (glen + blen) / 2; +} + + +EXTERN_C static int __cdecl xa_open(char *xa_info, int rmid, long flags) +{ + mylog("xa_open %s rmid=%d flags=%ld\n", xa_info, rmid, flags); + MLOCK_ACQUIRE; + xatab.insert(pair(rmid, XAConnection(xa_info))); + MLOCK_RELEASE; + return S_OK; +} +EXTERN_C static int __cdecl xa_close(char *xa_info, int rmid, long flags) +{ + mylog("xa_close rmid=%d flags=%ld\n", rmid, flags); + MLOCK_ACQUIRE; + xatab.erase(rmid); + if (xatab.size() == 0) + FreeEnv(); + MLOCK_RELEASE; + return XA_OK; +} + +// +// Dummy implmentation +// +EXTERN_C static int __cdecl xa_start(XID *xid, int rmid, long flags) +{ + char pgxid[258]; + + XidToText(*xid, pgxid); + mylog("xa_start %s rmid=%d flags=%ld\n", pgxid, rmid, flags); + xatab.find(rmid)->second.ActivateConnection(); + return XA_OK; +} +// +// Dummy implementation +// +EXTERN_C static int __cdecl xa_end(XID *xid, int rmid, long flags) +{ + char pgxid[258]; + + XidToText(*xid, pgxid); + mylog("xa_end %s rmid=%d flags=%ld\n", pgxid, rmid, flags); + return XA_OK; +} + +EXTERN_C static int __cdecl xa_rollback(XID *xid, int rmid, long flags) +{ + int rmcode = XAER_RMERR; + char pgxid[258]; + + XidToText(*xid, pgxid); + mylog("xa_rollback %s rmid=%d flags=%ld\n", pgxid, rmid, flags); + map::iterator p; + p = xatab.find(rmid); + if (p != xatab.end()) + { + HDBC conn = p->second.ActivateConnection(); + if (conn) + { + SQLCHAR cmdmsg[512], sqlstate[8]; + HSTMT stmt; + RETCODE ret; + + ret = SQLAllocHandle(SQL_HANDLE_STMT, conn, &stmt); + if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret) + { + mylog("Statement allocation error\n"); + return rmcode; + } + _snprintf((char *) cmdmsg, sizeof(cmdmsg), "ROLLBACK PREPARED '%s'", pgxid); + ret = SQLExecDirect(stmt, (SQLCHAR *) cmdmsg, SQL_NTS); + switch (ret) + { + case SQL_SUCCESS: + case SQL_SUCCESS_WITH_INFO: + rmcode = XA_OK; + break; + case SQL_ERROR: + SQLGetDiagRec(SQL_HANDLE_STMT, stmt, + 1, sqlstate, NULL, cmdmsg, + sizeof(cmdmsg), NULL); + mylog("xa_commit error %s '%s'\n", sqlstate, cmdmsg); + if (_stricmp((char *) sqlstate, "42704") == 0) + rmcode = XA_HEURHAZ; + break; + } + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + } + } + return rmcode; +} +// +// Dummy implementation +// It's almost impossible to implement this routine properly. +// +EXTERN_C static int __cdecl xa_prepare(XID *xid, int rmid, long flags) +{ + char pgxid[258]; + + XidToText(*xid, pgxid); + mylog("xa_prepare %s rmid=%d\n", pgxid, rmid); +#ifdef _SLEEP_FOR_TEST_ + Sleep(2000); +#endif /* _SLEEP_FOR_TEST_ */ + map::iterator p; + p = xatab.find(rmid); + if (p != xatab.end()) + { + HDBC conn = p->second.GetConnection(); + if (conn) + { + } + } + return XAER_RMERR; +} +EXTERN_C static int __cdecl xa_commit(XID *xid, int rmid, long flags) +{ + int rmcode = XAER_RMERR; + char pgxid[258]; + + XidToText(*xid, pgxid); + mylog("xa_commit %s rmid=%d flags=%ld\n", pgxid, rmid, flags); +#ifdef _SLEEP_FOR_TEST_ + Sleep(2000); +#endif /* _SLEEP_FOR_TEST_ */ + map::iterator p; + p = xatab.find(rmid); + if (p != xatab.end()) + { + HDBC conn = p->second.ActivateConnection(); + if (conn) + { + SQLCHAR cmdmsg[512], sqlstate[8]; + HSTMT stmt; + RETCODE ret; + + SQLAllocHandle(SQL_HANDLE_STMT, conn, &stmt); + _snprintf((char *) cmdmsg, sizeof(cmdmsg), "COMMIT PREPARED '%s'", pgxid); + ret = SQLExecDirect(stmt, (SQLCHAR *) cmdmsg, SQL_NTS); + switch (ret) + { + case SQL_SUCCESS: + case SQL_SUCCESS_WITH_INFO: + rmcode = XA_OK; + break; + case SQL_ERROR: + SQLGetDiagRec(SQL_HANDLE_STMT, stmt, + 1, sqlstate, NULL, cmdmsg, + sizeof(cmdmsg), NULL); + if (_stricmp((char *) sqlstate, "42704") == 0) + rmcode = XA_HEURHAZ; + break; + } + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + } + } + return rmcode; +} +EXTERN_C static int __cdecl xa_recover(XID *xids, long count, int rmid, long flags) +{ + int rmcode = XAER_RMERR, rcount; + + mylog("xa_recover rmid=%d count=%d flags=%ld\n", rmid, count, flags); + map::iterator p; + p = xatab.find(rmid); + if (p == xatab.end()) + return rmcode; + HDBC conn = p->second.ActivateConnection(); + if (!conn) + return rmcode; + vector &vec = p->second.GetResultVec(); + int pos = p->second.GetPos(); + if ((flags & TMSTARTRSCAN) != 0) + { + HSTMT stmt; + RETCODE ret; + char buf[512]; + + vec.clear(); + SQLAllocHandle(SQL_HANDLE_STMT, conn, &stmt); + ret = SQLExecDirect(stmt, (SQLCHAR *) "select gid from pg_prepared_xacts", SQL_NTS); + if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret) + { + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + pos = -1; + goto onExit; + } + SQLBindCol(stmt, 1, SQL_C_CHAR, buf, sizeof(buf), NULL); + ret = SQLFetch(stmt); + while (SQL_NO_DATA_FOUND != ret) + { + vec.push_back(buf); + ret = SQLFetch(stmt); + } + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + pos = 0; + } + rcount = vec.size(); + rmcode = rcount - pos; + if (rmcode > count) + rmcode = count; + for (int i = 0; i < rmcode; i++, pos++) + TextToXid(xids[i], vec[pos].c_str()); + + if ((flags & TMENDRSCAN) != 0) + { + vec.clear(); + pos = -1; + } + mylog("return count=%d\n", rmcode); +onExit: + p->second.SetPos(pos); + return rmcode; +} + +// +// I'm not sure if this is invoked from MSDTC +// Anyway there's nothing to do with it. +// +EXTERN_C static int __cdecl xa_forget(XID *xid, int rmid, long flags) +{ + char pgxid[258]; + + XidToText(*xid, pgxid); + mylog("xa_forget %s rmid=%d\n", pgxid, rmid); + return XA_OK; +} +// +// I'm not sure if this invoked from MSDTC. +// +EXTERN_C static int __cdecl xa_complete(int *handle, int *retval, int rmid, long flags) +{ + mylog("xa_complete rmid=%d\n", rmid); + return XA_OK; +} + +EXTERN_C static xa_switch_t xapsw = { "psotgres_xa", TMNOMIGRATE, + 0, xa_open, xa_close, xa_start, xa_end, xa_rollback, + xa_prepare, xa_commit, xa_recover, xa_forget, + xa_complete}; + +EXTERN_C HRESULT __cdecl GetXaSwitch (XA_SWITCH_FLAGS XaSwitchFlags, + xa_switch_t ** ppXaSwitch) +{ + mylog("GetXaSwitch called\n"); + + *ppXaSwitch = &xapsw; + return S_OK; +} diff --git a/version.h b/version.h index c215393..bcb64fe 100644 --- a/version.h +++ b/version.h @@ -9,8 +9,8 @@ #ifndef __VERSION_H__ #define __VERSION_H__ -#define POSTGRESDRIVERVERSION "08.02.0003" -#define POSTGRES_RESOURCE_VERSION "08.02.0003\0" -#define PG_DRVFILE_VERSION 8,2,0,03 +#define POSTGRESDRIVERVERSION "08.02.0004" +#define POSTGRES_RESOURCE_VERSION "08.02.0004\0" +#define PG_DRVFILE_VERSION 8,2,0,04 #endif -- 2.39.5