The first cut for MSDTC supprort and the version is now 8.2.0004.
authorHiroshi Inoue <inoue@tpf.co.jp>
Fri, 19 May 2006 23:08:33 +0000 (23:08 +0000)
committerHiroshi Inoue <inoue@tpf.co.jp>
Fri, 19 May 2006 23:08:33 +0000 (23:08 +0000)
1. Improve unknown-size estimation in case of the *longest* option.
2. Improve SQL_DEFAULT_PARAM handling.

columninfo.h
connection.h
convert.c
environ.c
msdtc_enlist.cpp [new file with mode: 0755]
odbcapi.c
pgapi30.c
pgxalib.cpp [new file with mode: 0755]
version.h

index 85ced66f3030be7a9e819ef750a2cc6b9dab9d9a..f1da6e3bf3eab3f4c9b01a69e4f5baf72a7fc55c 100644 (file)
@@ -17,7 +17,7 @@ struct ColumnInfoClass_
    char      **name;           /* list of type names */
    Oid        *adtid;          /* list of type ids */
    Int2       *adtsize;        /* list type sizes */
-   Int2       *display_size;   /* the display size (longest row) */
+   Int4       *display_size;   /* the display size (longest row) */
    Int4       *atttypmod;      /* the length of bpchar/varchar */
    Oid    *relid;      /* list of relation ids */
    Oid    *attid;      /* list of attribute ids */
index 8ae2108284126ea6794c6fcf651ad88b5dcb9400..c035b0ed670626c1548d28e8018920a9fd9b48d7 100644 (file)
@@ -519,7 +519,7 @@ enum {
 #ifdef WIN32
 #ifdef _HANDLE_ENLIST_IN_DTC_
 RETCODE    EnlistInDtc(ConnectionClass *conn, void *pTra, int method);
-RETCODE    DtcOnDisconnect(ConnectionClass *, BOOL);
+RETCODE    DtcOnDisconnect(ConnectionClass *);
 #endif /* _HANDLE_ENLIST_IN_DTC_ */
 #endif /* WIN32 */
 
index ee27273f758422ebe4624125176c1107babb8da7..f78d3d570c51222e6a65c2ec83ba571fd6c28d4a 100644 (file)
--- a/convert.c
+++ b/convert.c
@@ -3110,19 +3110,20 @@ inolog("ipara=%x paramType=%d %d proc_return=%d\n", ipara, ipara ? ipara->paramT
    }   
 
    req_bind = (0 != (FLGB_BUILDING_BIND_REQUEST & qb->flags));
-   /* Handle DEFAULT_PARAM parameter data */
+   /* Handle DEFAULT_PARAM parameter data. Should be NULL ?
    if (used == SQL_DEFAULT_PARAM)
    {
        return SQL_SUCCESS;
-   }
+   } */
    if (req_bind)
    {
        npos = qb->npos;
        qb->npos += 4;
    }
    /* Handle NULL parameter data */
-   if ((ipara && SQL_PARAM_OUTPUT == ipara->paramType) ||
-       used == SQL_NULL_DATA)
+   if ((ipara && SQL_PARAM_OUTPUT == ipara->paramType)
+       || used == SQL_NULL_DATA
+       || used == SQL_DEFAULT_PARAM)
    {
        if (req_bind)
        {
index 403223d78dd92688994f45fed4f54bb10e78f359..17627eb7da0c2dbe53fc6366995af51fc3faa776 100644 (file)
--- a/environ.c
+++ b/environ.c
@@ -332,8 +332,9 @@ PGAPI_ConnectError( HDBC hdbc,
                /* general error */
                break;
            case CONN_UNSUPPORTED_OPTION:
-               pg_sqlstate_set(env, szSqlState, "IM001", "IM001");
+               pg_sqlstate_set(env, szSqlState, "HYC00", "IM001");
                /* driver does not support this function */
+               break;
            case CONN_INVALID_ARGUMENT_NO:
                pg_sqlstate_set(env, szSqlState, "HY009", "S1009");
                /* invalid argument value */
diff --git a/msdtc_enlist.cpp b/msdtc_enlist.cpp
new file mode 100755 (executable)
index 0000000..19656ed
--- /dev/null
@@ -0,0 +1,1074 @@
+/*------
+ * Module:         msdtc_enlist.cpp
+ *
+ * Description:
+ *     This module contains routines related to
+ *         the enlistment in MSDTC.
+ *
+ *-------
+ */
+
+#ifndef    _WIN32_WINNT
+#define    _WIN32_WINNT    0x0400
+#endif /* _WIN32_WINNT */
+
+#include <oleTx2xa.h>
+#include <XOLEHLP.h>
+/*#include <Txdtc.h>*/
+#include "connection.h"
+
+/*#define  _SLEEP_FOR_TEST_*/
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <process.h>
+#include <map>
+#ifndef    WIN32
+#include <errno.h>
+#endif /* WIN32 */
+
+#include "qresult.h"
+#include "dlg_specific.h"
+
+#include "pgapifunc.h"
+
+static CRITICAL_SECTION    life_cs;
+static CRITICAL_SECTION    map_cs;
+static class INIT_CRIT
+{
+public:
+   INIT_CRIT() {
+       InitializeCriticalSection(&life_cs);
+       InitializeCriticalSection(&map_cs);
+       }
+   ~INIT_CRIT() {
+           DeleteCriticalSection(&life_cs);
+           DeleteCriticalSection(&map_cs);
+           }
+} init_crit;
+#define    LIFELOCK_ACQUIRE EnterCriticalSection(&life_cs)
+#define    LIFELOCK_RELEASE LeaveCriticalSection(&life_cs)
+#define    MLOCK_ACQUIRE   EnterCriticalSection(&map_cs)
+#define    MLOCK_RELEASE   LeaveCriticalSection(&map_cs)
+
+
+static const char *XidToText(const XID &xid, char *rtext)
+{
+   int glen = xid.gtrid_length, blen = xid.bqual_length;
+   int i, j;
+
+   for (i = 0, j = 0; i < glen; i++, j += 2)
+       sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);
+   strcat(rtext, "-"); j++;
+   for (; i < glen + blen; i++, j += 2)
+       sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); 
+   return rtext;
+}
+
+
+EXTERN_C extern HMODULE    s_hModule;
+
+static char    xalibpath[_MAX_PATH] = "";
+static char    xalibname[_MAX_FNAME] = "";
+static LONG    g_cComponents = 0;
+static LONG    g_cServerLocks = 0;
+
+static char *GetXaLibName()
+{
+   char    dllpath[_MAX_PATH], drive[_MAX_DRIVE], dir[_MAX_DIR],
+       fname[_MAX_FNAME], ext[_MAX_EXT];
+   if (!xalibpath[0])
+   {
+       GetModuleFileName(s_hModule, dllpath, sizeof(dllpath));
+       /* In Windows XP SP2, the dllname should be specified */
+       /* instead of the dll path name, because it looks up */
+       /* the HKLM\Microfost\MSDTC\XADLL\(dllname) registry */
+       /* entry for security reason. */
+       _splitpath(dllpath, drive, dir, fname, ext);
+       // snprintf(xalibname, sizeof(xalibname), "%s%s", fname, ext);
+       strcpy(xalibname, "pgxalib.dll");
+       snprintf(xalibpath, sizeof(xalibpath), "%s%s%s", drive, dir, xalibname);
+   }
+   return xalibname;
+}
+
+//
+// \88È\89º\82ÌITransactionResourceAsync\83I\83u\83W\83F\83N\83g\82Í\94C\88Ó\82Ì\83X\83\8c\83b\83h\82©\82ç
+// \8e©\97R\82É\83A\83N\83Z\83X\89Â\94\\82È\82æ\82¤\82É\8eÀ\91\95\82·\82é\81B\8aeRequest\82Ì\8c\8b\89Ê\82ð\95Ô\82·\82½\82ß\82É
+// \8eg\97p\82·\82éITransactionEnlistmentAsync\83C\83\93\83^\81[\83t\83F\83C\83X\82à\82»\82Ì\82æ\82¤\82É
+// \8eÀ\91\95\82³\82ê\82Ä\82¢\82é\81i\82Æ\8ev\82í\82ê\82é\81A\89º\8bL\8eQ\8fÆ\81j\82Ì\82Å\8cÄ\82Ñ\8fo\82µ\82ÉCOM\82Ì\83A\83p\81[
+// \83g\83\81\83\93\83g\82ð\88Ó\8e¯\82·\82é(CoMarshalInterThreadInterfaceInStream/CoGetIn
+// terfaceAndReleaseStream\82ð\8eg\97p\82·\82é\81j\95K\97v\82Í\82È\82¢\81B
+// \82±\82ÌDLL\93à\82Å\8eg\97p\82·\82éITransactionResourceAsync\82ÆITransactionEnlist
+// mentAsync\82Ì\83C\83\93\83^\81[\83t\83F\83C\83X\83|\83C\83\93\83^\81[\82Í\94C\88Ó\82Ì\83X\83\8c\83b\83h\82©\82ç\92¼\90Ú\8eg\97p
+// \82·\82é\82±\82Æ\82ª\82Å\82«\82é\81B
+//
+
+// OLE Transactions Standard
+//
+// OLE Transactions is the Microsoft interface standard for transaction
+// management. Applications use OLE Transactions-compliant interfaces to
+// initiate, commit, abort, and inquire about transactions. Resource
+// managers use OLE Transactions-compliant interfaces to enlist in
+// transactions, to propagate transactions to other resource managers,
+// to propagate transactions from process to process or from system to
+// system, and to participate in the two-phase commit protocol.
+//
+// The Microsoft DTC system implements most OLE Transactions-compliant
+// objects, interfaces, and methods. Resource managers that wish to use
+// OLE Transactions must implement some OLE Transactions-compliant objects,
+// interfaces, and methods.
+//
+// The OLE Transactions specification is based on COM but it differs in the
+// following respects: 
+//
+// OLE Transactions objects cannot be created using the COM CoCreate APIs. 
+// References to OLE Transactions objects are always direct. Therefore,
+// no proxies or stubs are created for inter-apartment, inter-process,
+// or inter-node calls and OLE Transactions references cannot be marshaled
+// using standard COM marshaling. 
+// All references to OLE Transactions objects and their sinks are completely
+// free threaded and cannot rely upon COM concurrency control models.
+// For example, you cannot pass a reference to an IResourceManagerSink
+// interface on a single-threaded apartment and expect the callback to occur
+// only on the same single-threaded apartment. 
+
+/*#define  _LOCK_DEBUG_ */
+class  IAsyncPG : public ITransactionResourceAsync
+{
+   friend class    AsyncThreads;
+private:
+   IDtcToXaHelperSinglePipe    *helper;
+   DWORD               RMCookie;
+   ConnectionClass         *conn;
+   ConnectionClass         *xaconn;
+   LONG                refcnt;
+   CRITICAL_SECTION        as_spin; // to make this object Both
+   CRITICAL_SECTION        as_exec; // to make this object Both
+   XID             xid;
+   bool                prepared;
+   HANDLE              eThread[3];
+   HRESULT             prepare_result;
+   bool                requestAccepted;
+   HRESULT             commit_result;
+#ifdef _LOCK_DEBUG_
+   int             spin_cnt;
+   int             cs_cnt;
+#endif /* _LOCK_DEBUG_ */
+
+public:
+   enum {
+       PrepareExec = 0
+       ,CommitExec
+       ,AbortExec
+       };
+
+   ITransactionEnlistmentAsync *enlist;
+
+   HRESULT STDMETHODCALLTYPE QueryInterface(REFIID iid, void ** ppvObject);
+   ULONG   STDMETHODCALLTYPE AddRef(void); 
+   ULONG   STDMETHODCALLTYPE Release(void); 
+
+   HRESULT STDMETHODCALLTYPE PrepareRequest(BOOL fRetaining,
+               DWORD grfRM,
+               BOOL fWantMoniker,
+               BOOL fSinglePhase);
+   HRESULT STDMETHODCALLTYPE CommitRequest(DWORD grfRM, XACTUOW * pNewUOW);
+   HRESULT STDMETHODCALLTYPE AbortRequest(BOID * pboidReason,
+               BOOL fRetaining,
+               XACTUOW * pNewUOW);
+   HRESULT STDMETHODCALLTYPE TMDown(void);
+
+   IAsyncPG();
+   ~IAsyncPG();
+   void SetHelper(IDtcToXaHelperSinglePipe *pHelper, DWORD dwRMCookie) {helper = pHelper; RMCookie = dwRMCookie;} 
+    
+   HRESULT RequestExec(DWORD type, HRESULT res);
+   HRESULT ReleaseConnection(void);
+   void SetConnection(ConnectionClass *sconn) {SLOCK_ACQUIRE(); conn = sconn; SLOCK_RELEASE();}
+   void SetXid(const XID *ixid) {SLOCK_ACQUIRE(); xid = *ixid; SLOCK_RELEASE();}
+private:
+#ifdef _LOCK_DEBUG_
+   void SLOCK_ACQUIRE() {forcelog("SLOCK_ACQUIRE %d\n", spin_cnt); EnterCriticalSection(&as_spin); spin_cnt++;}
+   void SLOCK_RELEASE() {forcelog("SLOCK_RELEASE=%d\n", spin_cnt); LeaveCriticalSection(&as_spin); spin_cnt--;}
+#else
+   void SLOCK_ACQUIRE() {EnterCriticalSection(&as_spin);}
+   void SLOCK_RELEASE() {LeaveCriticalSection(&as_spin);}
+#endif /* _LOCK_DEBUG_ */
+   void ELOCK_ACQUIRE() {EnterCriticalSection(&as_exec);}
+   void ELOCK_RELEASE() {LeaveCriticalSection(&as_exec);}
+   ConnectionClass *getLockedXAConn(void);
+   ConnectionClass *generateXAConn(bool spinAcquired);
+   void SetPrepareResult(HRESULT res) {SLOCK_ACQUIRE(); prepared = true; prepare_result = res; SLOCK_RELEASE();} 
+   void SetDone(HRESULT);
+   void Reset_eThread(int idx) {SLOCK_ACQUIRE(); eThread[idx] = NULL; SLOCK_RELEASE();}
+   void Wait_pThread(bool slock_hold);
+   void Wait_cThread(bool slock_hold, bool once);
+};
+
+//
+// For thread control.
+//
+class  AsyncWait {
+private:
+   IAsyncPG    *obj;
+   DWORD       type;
+   int     waiting_count;
+public:
+   AsyncWait(IAsyncPG *async, DWORD itype) : obj(async), type(itype), waiting_count(0) {}
+   AsyncWait(const AsyncWait &a_th) : obj(a_th.obj), type(a_th.type), waiting_count(a_th.waiting_count) {}
+   ~AsyncWait()    {}
+   IAsyncPG *GetObj()  const {return obj;}
+   DWORD   GetType()  const {return type;}
+   int WaitCount()  const {return waiting_count;}
+   int StartWaiting() {return ++waiting_count;}
+   int StopWaiting() {return --waiting_count;}
+};
+//
+// List of threads invoked from IAsyncPG objects.
+//
+class  AsyncThreads {
+private:
+   static std::map <HANDLE, AsyncWait> th_list;
+public:
+   static void insert(HANDLE, IAsyncPG *, DWORD);
+   static void CleanupThreads(DWORD millisecond);
+   static bool WaitThread(IAsyncPG *, DWORD type, DWORD millisecond);
+};
+
+
+IAsyncPG::IAsyncPG(void) : helper(NULL), RMCookie(0), enlist(NULL), conn(NULL), xaconn(NULL), refcnt(1), prepared(false), requestAccepted(false)
+{
+   InterlockedIncrement(&g_cComponents);
+   InitializeCriticalSection(&as_spin);
+   InitializeCriticalSection(&as_exec);
+   eThread[0] = eThread[1] = eThread[2] = NULL;
+   memset(&xid, 0, sizeof(xid));
+#ifdef _LOCK_DEBUG_
+   spin_cnt = 0;
+   cs_cnt = 0;
+#endif /* _LOCK_DEBUG_ */
+}
+
+IAsyncPG::~IAsyncPG(void)
+{
+   ConnectionClass *fconn = NULL;
+
+   if (conn)
+   {
+       conn->asdum = NULL;
+       conn = NULL;
+   }
+   if (xaconn)
+   {
+       fconn = xaconn;
+       xaconn->asdum = NULL;
+       xaconn = NULL;
+   }
+   SLOCK_RELEASE();
+   LIFELOCK_RELEASE;
+   if (fconn)
+       PGAPI_FreeConnect((HDBC) fconn);
+   DeleteCriticalSection(&as_spin);
+   ELOCK_RELEASE();
+   DeleteCriticalSection(&as_exec);
+   InterlockedDecrement(&g_cComponents);
+}
+HRESULT STDMETHODCALLTYPE IAsyncPG::QueryInterface(REFIID riid, void ** ppvObject)
+{
+forcelog("%x QueryInterface called\n", this);
+   if (riid == IID_IUnknown || riid == IID_ITransactionResourceAsync)
+   {
+       *ppvObject = this;
+       AddRef();
+       return S_OK;
+   }
+   *ppvObject = NULL;
+   return E_NOINTERFACE;
+}
+ULONG  STDMETHODCALLTYPE IAsyncPG::AddRef(void)
+{
+   mylog("%x->AddRef called\n", this);
+   SLOCK_ACQUIRE();
+   refcnt++;
+   SLOCK_RELEASE();
+   return refcnt;
+}
+ULONG  STDMETHODCALLTYPE IAsyncPG::Release(void)
+{
+   mylog("%x->Release called refcnt=%d\n", this, refcnt);
+   SLOCK_ACQUIRE();
+   refcnt--;
+   if (refcnt <= 0)
+   {
+       SLOCK_RELEASE();
+       ELOCK_ACQUIRE();
+       LIFELOCK_ACQUIRE;
+       SLOCK_ACQUIRE();
+       if (refcnt <=0)
+       {
+   mylog("delete %x\n", this);
+           delete this;
+       }
+       else
+       {
+           SLOCK_RELEASE();
+           LIFELOCK_RELEASE;
+           ELOCK_RELEASE();
+       }
+   }
+   else
+       SLOCK_RELEASE();
+   return refcnt;
+}
+
+void IAsyncPG::Wait_pThread(bool slock_hold)
+{
+   mylog("Wait_pThread %d in\n", slock_hold);
+   HANDLE  wThread;
+   int wait_idx = PrepareExec;
+   bool    th_found;
+   if (!slock_hold)
+       SLOCK_ACQUIRE();
+   while (NULL != eThread[wait_idx])
+   {
+       wThread = eThread[wait_idx];
+       SLOCK_RELEASE();
+       /*
+       if (WAIT_OBJECT_0 == WaitForSingleObject(wThread, 2000))
+       {
+           SLOCK_ACQUIRE();
+           if (eThread[wait_idx])
+           {
+               CloseHandle(wThread);
+               eThread[wait_idx] = NULL;
+           }
+       }
+       else
+           SLOCK_ACQUIRE();
+       */
+       th_found = AsyncThreads::WaitThread(this, wait_idx, 2000);
+       SLOCK_ACQUIRE();
+       if (th_found)
+           break;
+   }
+   if (!slock_hold)
+       SLOCK_RELEASE();
+   mylog("Wait_pThread out\n");
+}
+
+void IAsyncPG::Wait_cThread(bool slock_hold, bool once)
+{
+   HANDLE  wThread;
+   int wait_idx;
+   bool    th_found;
+
+   mylog("Wait_cThread %d,%d in\n", slock_hold, once);
+   if (!slock_hold)
+       SLOCK_ACQUIRE();
+   if (NULL != eThread[CommitExec])
+       wait_idx = CommitExec;
+   else
+       wait_idx = AbortExec;
+   while (NULL != eThread[wait_idx])
+   {
+       wThread = eThread[wait_idx];
+       SLOCK_RELEASE();
+       /*
+       if (WAIT_OBJECT_0 == WaitForSingleObject(wThread, 2000))
+       {
+           SLOCK_ACQUIRE();
+           if (cThread)
+           {
+               CloseHandle(wThread);
+               cThread = NULL;
+           }
+       }
+       else
+           SLOCK_ACQUIRE();
+       */
+       th_found = AsyncThreads::WaitThread(this, wait_idx, 2000);
+       SLOCK_ACQUIRE();
+       if (once || th_found)
+           break;
+   }
+   if (!slock_hold)
+       SLOCK_RELEASE();
+   mylog("Wait_cThread out\n");
+}
+
+/* Processing Prepare/Commit Request */
+typedef
+struct RequestPara {
+   DWORD   type;
+   LPVOID  lpr;
+   HRESULT res;
+} RequestPara;
+
+void   IAsyncPG::SetDone(HRESULT res)
+{
+   LIFELOCK_ACQUIRE;
+   SLOCK_ACQUIRE();
+   prepared = false;
+   requestAccepted = true;
+   commit_result = res;
+   if (conn || xaconn)
+   {
+       if (conn)
+       {
+           conn->asdum = NULL;
+           conn = NULL;
+       }
+       SLOCK_RELEASE();
+       LIFELOCK_RELEASE;
+       ELOCK_ACQUIRE();
+       if (xaconn)
+       {
+           xaconn->asdum = NULL;
+           PGAPI_FreeConnect(xaconn);
+           xaconn = NULL;
+       }
+       ELOCK_RELEASE();
+   }
+   else
+   {
+       SLOCK_RELEASE();
+       LIFELOCK_RELEASE;
+   }
+   // Release(); /* unneccesary */
+}
+   
+ConnectionClass    *IAsyncPG::generateXAConn(bool spinAcquired)
+{
+   if (!spinAcquired)
+       SLOCK_ACQUIRE();
+   if (prepared && !xaconn)
+   {
+       SLOCK_RELEASE();
+       ELOCK_ACQUIRE();
+       LIFELOCK_ACQUIRE;
+       SLOCK_ACQUIRE();
+       if (prepared && !xaconn)
+       {
+           PGAPI_AllocConnect(conn->henv, (HDBC *) &xaconn);
+           memcpy(&xaconn->connInfo, &conn->connInfo, sizeof(ConnInfo));
+           conn->asdum = NULL;
+           conn = NULL;
+           SLOCK_RELEASE();
+           LIFELOCK_RELEASE;
+           CC_connect(xaconn, AUTH_REQ_OK, NULL);
+       }
+       else
+       {
+           SLOCK_RELEASE();
+           LIFELOCK_RELEASE;
+       }
+       ELOCK_RELEASE();
+   }
+   else
+       SLOCK_RELEASE();
+   return xaconn;
+}
+
+//
+// [in]
+// ELOCK is acquired.
+//
+// [out]
+// If the return connection != NULL
+//     the lock for the connection is acquired.
+// 
+ConnectionClass    *IAsyncPG::getLockedXAConn()
+{
+   SLOCK_ACQUIRE();
+   if (!xaconn && conn && !CC_is_in_trans(conn))
+   {
+       if (TRY_ENTER_CONN_CS(conn))
+       {
+           if (CC_is_in_trans(conn))
+           {
+               LEAVE_CONN_CS(conn);
+           }
+           else
+           {
+               SLOCK_RELEASE();
+               return conn;
+           }
+       }
+   }
+   generateXAConn(true);
+   if (xaconn)
+       ENTER_CONN_CS(xaconn);
+   return xaconn;
+}
+
+HRESULT IAsyncPG::RequestExec(DWORD type, HRESULT res)
+{
+   HRESULT     ret;
+   bool        bReleaseEnlist = false;
+   ConnectionClass *econn;
+   QResultClass    *qres;
+   char        pgxid[258], cmd[512];
+
+   mylog("%x->RequestExec type=%d\n", this, type);
+   XidToText(xid, pgxid);
+#ifdef _SLEEP_FOR_TEST_
+   /*Sleep(2000);*/
+#endif /* _SLEEP_FOR_TEST_ */
+   /* AddRef(); unnecessary */
+   ELOCK_ACQUIRE();
+   switch (type)
+   {
+       case PrepareExec:
+           if (XACT_S_SINGLEPHASE == res)
+           {
+               if (!CC_commit(conn))
+                   res = E_FAIL;   
+               bReleaseEnlist = true;
+           }
+           else if (E_FAIL != res)
+           {
+               snprintf(cmd, sizeof(cmd), "PREPARE TRANSACTION '%s'", pgxid);
+               qres = CC_send_query(conn, cmd, NULL, 0, NULL);
+               if (!QR_command_maybe_successful(qres))
+                   res = E_FAIL;
+               QR_Destructor(qres);
+           }
+           ret = enlist->PrepareRequestDone(res, NULL, NULL);
+           SetPrepareResult(res);
+           break;
+       case CommitExec:
+           Wait_pThread(false);
+           if (E_FAIL != res)
+           {
+               econn = getLockedXAConn();
+               if (econn)
+               {
+                   snprintf(cmd, sizeof(cmd), "COMMIT PREPARED '%s'", pgxid);
+                   qres = CC_send_query(econn, cmd, NULL, 0, NULL);
+                   if (!QR_command_maybe_successful(qres))
+                       res = E_FAIL;
+                   QR_Destructor(qres);
+                   LEAVE_CONN_CS(econn);
+               }
+           }
+           SetDone(res);
+           ret = enlist->CommitRequestDone(res);
+           bReleaseEnlist = true;
+           break;
+       case AbortExec:
+           Wait_pThread(false);
+           if (prepared)
+           {
+               econn = getLockedXAConn();
+               if (econn)
+               {
+                   snprintf(cmd, sizeof(cmd), "ROLLBACK PREPARED '%s'", pgxid);
+                   qres = CC_send_query(econn, cmd, NULL, 0, NULL);
+                   if (!QR_command_maybe_successful(qres))
+                       res = E_FAIL;
+                   QR_Destructor(qres);
+                   LEAVE_CONN_CS(econn);
+               }
+           }
+           SetDone(res);
+           ret = enlist->AbortRequestDone(res);
+           bReleaseEnlist = true;
+           break;
+       default:
+           ret = -1;
+   }
+   if (bReleaseEnlist)
+   {
+       helper->ReleaseRMCookie(RMCookie, TRUE);
+       enlist->Release();
+   }
+   ELOCK_RELEASE();
+   /* Release(); unnecessary */
+   mylog("%x->Done ret=%d\n", this, ret);
+   return ret;
+}
+
+HRESULT IAsyncPG::ReleaseConnection(void)
+{
+   mylog("%x->ReleaseConnection\n", this);
+   ConnectionClass *iconn;
+   bool    done = false;
+
+   SLOCK_ACQUIRE();
+   if (iconn = conn)
+   {
+       Wait_pThread(true);
+       if (NULL != eThread[CommitExec] || NULL != eThread[AbortExec] || requestAccepted)
+       {
+           if (prepared)
+           {
+               Wait_cThread(true, true);
+               if (!prepared)
+                   done = true;
+           }
+           else
+               done = true;
+           if (done)
+               Wait_cThread(true, false);
+       }
+       if (conn && CONN_CONNECTED == conn->status && !done)
+       {
+           generateXAConn(true);
+       }
+       else
+           SLOCK_RELEASE();
+   }
+   else
+       SLOCK_RELEASE();
+   mylog("%x->ReleaseConnection exit\n", this);
+   return SQL_SUCCESS;
+}
+
+EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para);
+HRESULT STDMETHODCALLTYPE IAsyncPG::PrepareRequest(BOOL fRetaining, DWORD grfRM,
+               BOOL fWantMoniker, BOOL fSinglePhase)
+{
+   HRESULT ret, res;
+   RequestPara *reqp;
+
+   mylog("%x PrepareRequest called grhRM=%d enl=%x\n", this, grfRM, enlist);
+   SLOCK_ACQUIRE();
+   if (0 != CC_get_errornumber(conn))
+       res = ret = E_FAIL;
+   else
+   {
+       ret = S_OK;
+       if (fSinglePhase)
+       {
+           res = XACT_S_SINGLEPHASE;
+           mylog("XACT is singlePhase\n");
+       }
+       else
+           res = S_OK; 
+   }
+   SLOCK_RELEASE();
+   ELOCK_ACQUIRE();
+#ifdef _SLEEP_FOR_TEST_
+   Sleep(2000);
+#endif /* _SLEEP_FOR_TEST_ */
+   reqp = new RequestPara;
+   reqp->type = PrepareExec;
+   reqp->lpr = (LPVOID) this;
+   reqp->res = res;
+   AddRef();
+   HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
+   if (NULL == hThread)
+   {
+       delete(reqp);
+       ret = E_FAIL;
+   }
+   else
+   {
+       AsyncThreads::insert(hThread, this, reqp->type);
+   }
+   ELOCK_RELEASE();
+   Release();
+   return ret;
+}
+HRESULT STDMETHODCALLTYPE IAsyncPG::CommitRequest(DWORD grfRM, XACTUOW * pNewUOW)
+{
+   HRESULT     res = S_OK, ret = S_OK;
+   RequestPara *reqp;
+   mylog("%x CommitRequest called grfRM=%d enl=%x\n", this, grfRM, enlist);
+
+   SLOCK_ACQUIRE();
+   if (!prepared)
+       ret = E_UNEXPECTED;
+   else if (S_OK != prepare_result)
+       ret = E_UNEXPECTED;
+   SLOCK_RELEASE();
+   if (S_OK != ret)
+       return ret;
+   AddRef();
+   ELOCK_ACQUIRE();
+#ifdef _SLEEP_FOR_TEST_
+   Sleep(1000);
+#endif /* _SLEEP_FOR_TEST_ */
+   reqp = new RequestPara;
+   reqp->type = CommitExec;
+   reqp->lpr = (LPVOID) this;
+   reqp->res = res;
+   enlist->AddRef();
+   HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
+   if (NULL == hThread)
+   {
+       delete(reqp);
+       enlist->Release();
+       ret = E_FAIL;
+   }
+   else
+   {
+       AsyncThreads::insert(hThread, this, reqp->type);
+   }
+   mylog("CommitRequest ret=%d\n", ret);
+   requestAccepted = true;
+   ELOCK_RELEASE();
+   Release();
+   return ret;
+}
+HRESULT STDMETHODCALLTYPE IAsyncPG::AbortRequest(BOID * pboidReason, BOOL fRetaining,
+               XACTUOW * pNewUOW)
+{
+   HRESULT     res = S_OK, ret = S_OK;
+   RequestPara *reqp;
+
+   mylog("%x AbortRequest called\n", this);
+   AddRef();
+   ELOCK_ACQUIRE();
+   if (!prepared && conn)
+       CC_abort(conn);
+   reqp = new RequestPara;
+   reqp->type = AbortExec;
+   reqp->lpr = (LPVOID) this;
+   reqp->res = res;
+   enlist->AddRef();
+   HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
+   if (NULL == hThread)
+   {
+       delete(reqp);
+       enlist->Release();
+       ret = E_FAIL;
+   }
+   else
+   {
+       AsyncThreads::insert(hThread, this, reqp->type);
+   }
+   mylog("AbortRequest ret=%d\n", ret);
+   requestAccepted = true;
+   ELOCK_RELEASE();
+   Release();
+   return  ret;
+}
+HRESULT STDMETHODCALLTYPE IAsyncPG::TMDown(void)
+{
+forcelog("%x TMDown called\n", this);
+   return  S_OK;
+}
+
+//
+//
+//
+
+std::map<HANDLE, AsyncWait>    AsyncThreads::th_list;
+void   AsyncThreads::insert(HANDLE th, IAsyncPG *obj, DWORD type)
+{
+   if (!obj)   return;
+   MLOCK_ACQUIRE;
+   th_list.insert(std::pair<HANDLE, AsyncWait>(th, AsyncWait(obj, type)));
+   obj->SLOCK_ACQUIRE();
+   obj->eThread[type] = th;
+   obj->SLOCK_RELEASE();
+   MLOCK_RELEASE;
+}
+
+bool   AsyncThreads::WaitThread(IAsyncPG *obj, DWORD type, DWORD millisecond)
+{
+   HANDLE  th = NULL;
+   DWORD   gtype;
+   bool    typematch;
+   int wait_count;
+
+   MLOCK_ACQUIRE;
+   std::map<HANDLE, AsyncWait>::iterator p;
+   for (p = th_list.begin(); p != th_list.end(); p++)
+   {
+       gtype = p->second.GetType();
+       typematch = (gtype == type);
+       if (p->second.GetObj() == obj && typematch)
+       {
+           th = p->first;
+           break;
+       }
+   }
+   if (NULL == th)
+   {
+       MLOCK_RELEASE;
+       forcelog("WaitThread thread(%x, %d) not found\n", obj, type);
+       return false;
+   }
+   p->second.StartWaiting();
+   MLOCK_RELEASE;
+   
+   DWORD ret = WaitForSingleObject(th, millisecond);
+   MLOCK_ACQUIRE;
+   wait_count = p->second.StopWaiting();
+   if (WAIT_OBJECT_0 == ret)
+   {
+       IAsyncPG *async = p->second.GetObj();
+
+       if (type >= 0 && type <= IAsyncPG::AbortExec)
+           async->Reset_eThread(type);
+       if (wait_count <= 0)
+       {
+           th_list.erase(th);
+           MLOCK_RELEASE;
+           CloseHandle(th);
+           if (type >= IAsyncPG::CommitExec)
+           {
+               async->Release();
+           }
+       }
+       else
+           MLOCK_RELEASE;
+   }
+   else
+       MLOCK_RELEASE;
+   return true;
+}
+
+void   AsyncThreads::CleanupThreads(DWORD millisecond)
+{
+   int msize;
+   DWORD   nCount;
+
+   MLOCK_ACQUIRE;
+   if (msize = th_list.size(), msize <= 0)
+   {
+       MLOCK_RELEASE;
+       return;
+   }
+
+   mylog("CleanupThreads size=%d\n", msize);
+   HANDLE  *hds = new HANDLE[msize];
+   std::map<HANDLE, AsyncWait>::iterator p;
+   for (p = th_list.begin(), nCount = 0; p != th_list.end(); p++)
+   {
+       hds[nCount++] = p->first;
+       p->second.StartWaiting();
+   }
+   MLOCK_RELEASE;
+   int i;
+   while (nCount > 0)
+   {
+       DWORD ret = WaitForMultipleObjects(nCount, hds, 0, millisecond);
+       if (ret >= nCount)
+           break;
+       HANDLE  th = hds[ret];
+       MLOCK_ACQUIRE;
+       p = th_list.find(th);
+       if (p != th_list.end())
+       {
+           int wait_count = p->second.StopWaiting();
+           DWORD   type = p->second.GetType();
+           IAsyncPG * async = p->second.GetObj();
+
+           if (type >= IAsyncPG::PrepareExec && type <= IAsyncPG::AbortExec)
+               async->Reset_eThread(type);
+           if (wait_count <= 0)
+           {
+               th_list.erase(th);
+               MLOCK_RELEASE;
+               CloseHandle(th);
+               if (type >= IAsyncPG::CommitExec)
+               {
+                   async->Release();
+               }
+           }
+           else
+               MLOCK_RELEASE;
+       }
+       else
+           MLOCK_RELEASE;
+       for (i = ret; i < (int) nCount - 1; i++)
+           hds[i] = hds[i + 1];
+       nCount--;
+   }
+   for (i = 0; i < (int) nCount; i++)
+   {
+       p = th_list.find(hds[i]);
+       if (p != th_list.end())
+           p->second.StopWaiting();
+   }
+   delete [] hds;
+}
+
+
+EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para)
+{
+   RequestPara *reqp = (RequestPara *) para;
+   DWORD       type = reqp->type;
+   IAsyncPG *async = (IAsyncPG *) reqp->lpr;
+   HRESULT res = reqp->res, ret;
+
+   mylog("DtcRequestExec type=%d", reqp->type);
+   delete(reqp);
+   ret = async->RequestExec(type, res);
+   mylog(" Done ret=%d\n", ret);
+   return ret;
+}
+
+CSTR   regKey = "SOFTWARE\\Microsoft\\MSDTC\\XADLL";
+
+RETCODE static EnlistInDtc_1pipe(ConnectionClass *conn, ITransaction *pTra, ITransactionDispenser *pDtc)
+{
+   CSTR    func = "EnlistInDtc_1pipe";
+   static  IDtcToXaHelperSinglePipe    *pHelper = NULL;
+   ITransactionResourceAsync       *pRes = NULL;
+   IAsyncPG                *asdum;
+   HRESULT res;
+   bool    retry, errset;
+   DWORD   dwRMCookie;
+   XID xid;
+
+   if (!pHelper)
+   {
+       res = pDtc->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void **) &pHelper);
+       if (res != S_OK || !pHelper)
+       {
+           forcelog("DtcToXaHelperSingelPipe get error %d\n", res);
+           pHelper = NULL;
+           return SQL_ERROR;
+       }
+   }
+   res = (NULL != (asdum = new IAsyncPG)) ? S_OK : E_FAIL;
+   if (S_OK != res)
+   {
+       mylog("CoCreateInstance error %d\n", res);
+       return SQL_ERROR;
+   }
+
+mylog("dllname=%s dsn=%s\n", xalibname, conn->connInfo.dsn); res = 0;
+   retry = false;
+   errset = false;
+   ConnInfo *ci = &(conn->connInfo);
+   char    dtcname[1024];
+   snprintf(dtcname, sizeof(dtcname), "DRIVER={%s};SERVER=%s;PORT=%s;DATABASE=%s;UID=%s;PWD=%s;" ABBR_SSLMODE "=%s;" ABBR_DEBUG "=%d", 
+       ci->drivername, ci->server, ci->port, ci->database, ci->username, ci->password, ci->sslmode, ci->drivers.debug);
+   do { 
+       res = pHelper->XARMCreate(dtcname, (char *) GetXaLibName(), &dwRMCookie);
+       if (S_OK == res)
+           break;
+       mylog("XARMCreate error code=%x\n", res);
+       if (XACT_E_XA_TX_DISABLED == res)
+       {
+           CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMcreate error:Please enable XA transaction in MSDTC security configuration", func);
+           errset = true;
+       }
+       else if (!retry)
+       {
+           LONG    ret;
+           HKEY    sKey;
+           DWORD   rSize;
+
+           ret = ::RegOpenKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, KEY_QUERY_VALUE | KEY_SET_VALUE, &sKey);
+           if (ERROR_SUCCESS == ret)
+           {
+               switch (ret = ::RegQueryValueEx(sKey, "XADLL", NULL, NULL, NULL, &rSize))
+               {
+                   case ERROR_SUCCESS:
+                       if (rSize > 0)
+                           break;
+                   default:
+                       ret = ::RegSetValueEx(sKey, xalibname, 0, REG_SZ,
+                           (CONST BYTE *) xalibpath, strlen(xalibpath) + 1);
+                       if (ERROR_SUCCESS == ret)
+                       {
+                           retry = true;
+                           continue; // retry
+                       }
+                       CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMCreate error:Please register HKLM\\SOFTWARE\\Microsoft\\MSDTC\\XADLL", func);
+                       break;
+               }
+           }
+       }
+       if (!errset)
+           CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "MSDTC XARMCreate error", func);
+       return SQL_ERROR;
+   } while (1);
+   res = pHelper->ConvertTridToXID((DWORD *) pTra, dwRMCookie, &xid);
+   if (res != S_OK)
+   {
+       mylog("ConvertTridToXid error %d\n", res);
+       return SQL_ERROR;
+   }
+{
+char   pgxid[258];
+XidToText(xid, pgxid);
+mylog("ConvertTridToXID -> %s\n", pgxid);
+}
+   asdum->SetXid(&xid);
+   /* Create an IAsyncPG instance by myself */
+   /* DLLGetClassObject(GUID_IAsyncPG, IID_ITransactionResourceAsync, (void **) &asdum); */
+
+   asdum->SetHelper(pHelper, dwRMCookie);
+   res = pHelper->EnlistWithRM(dwRMCookie, pTra, asdum, &asdum->enlist);
+   if (res != S_OK)
+   {
+       mylog("EnlistWithRM error %d\n", res);
+       pHelper->ReleaseRMCookie(dwRMCookie, TRUE);
+       return SQL_ERROR;
+   }
+
+   mylog("asdum=%x start transaction\n", asdum);
+   CC_set_autocommit_off(conn);
+   asdum->SetConnection(conn);
+   conn->asdum = asdum;
+
+   return  SQL_SUCCESS;
+}
+
+
+EXTERN_C RETCODE EnlistInDtc(ConnectionClass *conn, void *pTra, int method)
+{
+   static  ITransactionDispenser   *pDtc = NULL;
+   ConnInfo *ci = &(conn->connInfo);
+
+   if (!pTra)
+   {
+       IAsyncPG *asdum = (IAsyncPG *) conn->asdum;
+       if (asdum)
+       {
+           /* asdum->Release(); */
+       }
+       if (ci->autocommit_normal)
+           CC_set_autocommit_on(conn);
+       return SQL_SUCCESS;
+   }
+   if (CC_is_in_trans(conn))
+   { 
+       CC_abort(conn);
+       ci->autocommit_normal = (CC_is_in_autocommit(conn) ? 1 : 0);
+   }
+   if (!pDtc)
+   {
+       HRESULT res;
+
+       res = DtcGetTransactionManager(NULL, NULL, IID_ITransactionDispenser,
+           0, 0, NULL,  (void **) &pDtc);
+       if (res != S_OK || !pDtc)
+       {
+           forcelog("TransactionManager get error %d\n", res);
+           pDtc = NULL;
+       }
+   }
+   return EnlistInDtc_1pipe(conn, (ITransaction *) pTra, pDtc);
+}
+
+EXTERN_C RETCODE DtcOnDisconnect(ConnectionClass *conn)
+{
+   mylog("DtcOnDisconnect\n");
+   LIFELOCK_ACQUIRE;
+   IAsyncPG *asdum = (IAsyncPG *) conn->asdum;
+   if (asdum)
+   {
+       asdum->AddRef();
+       LIFELOCK_RELEASE;
+       asdum->ReleaseConnection();
+       asdum->Release();
+   }
+   else    
+       LIFELOCK_RELEASE;
+   return SQL_SUCCESS;
+}
+
+EXTERN_C RETCODE DtcOnRelease(void)
+{
+   AsyncThreads::CleanupThreads(2000);
+   return SQL_SUCCESS;
+}
index 5d996d1e21562440b63e314f5cf72349483ce53c..37454240b47a34092bfc0e1fd63ef2e0954f86dc 100644 (file)
--- a/odbcapi.c
+++ b/odbcapi.c
@@ -294,11 +294,11 @@ SQLDisconnect(HDBC ConnectionHandle)
    ConnectionClass *conn = (ConnectionClass *) ConnectionHandle;
 
    mylog("[%s for %x]", func, ConnectionHandle);
-   ENTER_CONN_CS(conn);
-   CC_clear_error(conn);
 #ifdef _HANDLE_ENLIST_IN_DTC_
-   DtcOnDisconnect(conn, TRUE);
+   DtcOnDisconnect(conn); /* must be called without holding the connection lock */
 #endif /* _HANDLE_ENLIST_IN_DTC_ */
+   ENTER_CONN_CS(conn);
+   CC_clear_error(conn);
    ret = PGAPI_Disconnect(ConnectionHandle);
    LEAVE_CONN_CS(conn);
    return ret;
index 2b873420d7bdd7512e8e74561bfe6eab756cbe67..6e038102922f9e38ee2dc36be7c909f3a528d16c 100644 (file)
--- a/pgapi30.c
+++ b/pgapi30.c
@@ -1593,7 +1593,8 @@ PGAPI_SetConnectAttr(HDBC ConnectionHandle,
 #ifdef WIN32
 #ifdef _HANDLE_ENLIST_IN_DTC_
            mylog("SQL_ATTR_ENLIST_IN_DTC %x request received\n", Value);
-           return EnlistInDtc(conn, Value, conn->connInfo.xa_opt); /* telling a lie */
+           if (conn->connInfo.xa_opt != 0) 
+               return EnlistInDtc(conn, Value, conn->connInfo.xa_opt);
 #endif /* _HANDLE_ENLIST_IN_DTC_ */
 #endif /* WIN32 */
            unsupported = TRUE;
diff --git a/pgxalib.cpp b/pgxalib.cpp
new file mode 100755 (executable)
index 0000000..a39fb47
--- /dev/null
@@ -0,0 +1,579 @@
+/*------
+ * Module:         pgxalib.cpp
+ *
+ * Description:
+ *     This module implements XA like routines
+ *         invoked from MSDTC process.
+ *
+ *     xa_open(), xa_close(), xa_commit(),
+ *     xa_rollback() and xa_recover()
+ *     are really invoked AFAIC.
+ *-------
+ */
+
+#include <oleTx2xa.h>
+/*#define  _SLEEP_FOR_TEST_*/
+#include <sqlext.h>
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <process.h>
+#include <time.h>
+
+#include <string>
+#include <map>
+#include <vector>
+
+
+#define _BUILD_DLL_
+#ifdef _BUILD_DLL_
+
+EXTERN_C {
+
+#define    DIRSEPARATOR    "\\"
+#define    PG_BINARY_A "ab"
+#define    MYLOGDIR    "c:"
+#define    MYLOGFILE   "mylog_"
+
+static void
+generate_filename(const char *dirname, const char *prefix, char *filename)
+{
+   int         pid = 0;
+
+   pid = getpid();
+   if (dirname == 0 || filename == 0)
+       return;
+
+   strcpy(filename, dirname);
+   strcat(filename, DIRSEPARATOR);
+   if (prefix != 0)
+       strcat(filename, prefix);
+   sprintf(filename, "%s%u%s", filename, pid, ".log");
+   return;
+}
+
+static HENV    env = NULL;
+static void FreeEnv()
+{
+   if (env)
+   {
+       SQLFreeHandle(SQL_HANDLE_ENV, env);
+       env = NULL;
+   }
+}
+static FILE    *LOGFP = NULL;
+static CRITICAL_SECTION    mylog_cs;
+
+static void
+mylog(const char *fmt,...)
+{
+   va_list     args;
+   char        filebuf[80];
+   static  BOOL    init = TRUE;
+
+   EnterCriticalSection(&mylog_cs);
+   va_start(args, fmt);
+
+   if (init)
+   {
+       if (!LOGFP)
+       {
+           generate_filename(MYLOGDIR, MYLOGFILE, filebuf);
+           LOGFP = fopen(filebuf, PG_BINARY_A);
+       }
+       if (!LOGFP)
+       {
+           generate_filename("C:\\podbclog", MYLOGFILE, filebuf);
+           LOGFP = fopen(filebuf, PG_BINARY_A);
+       }
+       if (LOGFP)
+           setbuf(LOGFP, NULL);
+   }
+   init = FALSE;
+   if (LOGFP)
+   {
+       time_t  ntime;
+       char    ctim[128];
+
+       time(&ntime);
+       strcpy(ctim, ctime(&ntime));
+       ctim[strlen(ctim) - 1] = '\0';
+       fprintf(LOGFP, "[%d.%d(%s)]", GetCurrentProcessId(), GetCurrentThreadId(), ctim);
+       vfprintf(LOGFP, fmt, args);
+   }
+   va_end(args);
+   LeaveCriticalSection(&mylog_cs);
+}
+static int initialize_globals(void)
+{
+   static  int init = 1;
+
+   if (!init)
+       return 0;
+   init = 0;
+   InitializeCriticalSection(&mylog_cs);
+
+   return 0;
+}
+
+static void XatabClear(void);
+static void finalize_globals(void)
+{
+   XatabClear();
+   FreeEnv();
+   /* my(q)log is unavailable from here */
+   mylog("DETACHING PROCESS\n");
+   DeleteCriticalSection(&mylog_cs);
+   fclose(LOGFP);
+   LOGFP = NULL;
+}
+
+HINSTANCE s_hModule;               /* Saved module handle. */
+/*      This is where the Driver Manager attaches to this Driver */
+BOOL   WINAPI
+DllMain(HANDLE hInst, ULONG ul_reason_for_call, LPVOID lpReserved)
+{
+   WORD    wVersionRequested;
+   WSADATA wsaData;
+
+   switch (ul_reason_for_call)
+   {
+       case DLL_PROCESS_ATTACH:
+           s_hModule = (HINSTANCE) hInst;  /* Save for dialog boxes */
+
+           /* Load the WinSock Library */
+           wVersionRequested = MAKEWORD(1, 1);
+
+           if (WSAStartup(wVersionRequested, &wsaData))
+               return FALSE;
+
+           /* Verify that this is the minimum version of WinSock */
+           if (LOBYTE(wsaData.wVersion) != 1 ||
+               HIBYTE(wsaData.wVersion) != 1)
+           {
+               WSACleanup();
+               return FALSE;
+           }
+           initialize_globals();
+           break;
+
+       case DLL_THREAD_ATTACH:
+           break;
+
+       case DLL_PROCESS_DETACH:
+           finalize_globals();
+           WSACleanup();
+           return TRUE;
+
+       case DLL_THREAD_DETACH:
+           break;
+
+       default:
+           break;
+   }
+   return TRUE;
+}
+
+} /* end of EXTERN_C */
+
+#endif /* _BUILD_DLL_ */
+
+using namespace std;
+
+static CRITICAL_SECTION    map_cs;
+#define    MLOCK_ACQUIRE   EnterCriticalSection(&map_cs)
+#define    MLOCK_RELEASE   LeaveCriticalSection(&map_cs)
+
+class  XAConnection
+{
+private:
+   string  connstr;
+   HDBC    xaconn;
+   vector<string>  qvec;
+   int     pos;
+public:
+   XAConnection(LPCTSTR str) : connstr(str), xaconn(NULL), pos(-1) {}
+   ~XAConnection();
+   HDBC    ActivateConnection(void);
+   void    SetPos(int spos) {pos = spos;}
+   HDBC    GetConnection(void) const {return xaconn;}
+   vector<string>  &GetResultVec(void) {return qvec;}
+   int GetPos(void) {return pos;}
+   string GetConnstr(void) {return connstr;}
+};
+
+XAConnection::~XAConnection()
+{
+   qvec.clear();
+   if (xaconn)
+       SQLFreeHandle(SQL_HANDLE_DBC, xaconn);
+}
+
+HDBC   XAConnection::ActivateConnection(void)
+{
+   RETCODE ret;
+
+   MLOCK_ACQUIRE;
+   if (!env)
+   {
+       ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
+       if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret)
+           return NULL;
+   }
+   MLOCK_RELEASE;
+   if (!xaconn)
+   {
+       ret = SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (PTR) SQL_OV_ODBC3, 0);
+       ret = SQLAllocHandle(SQL_HANDLE_DBC, env, &xaconn);
+       if (SQL_SUCCESS == ret || SQL_SUCCESS_WITH_INFO)
+       {
+           ret = SQLDriverConnect(xaconn, NULL, (SQLCHAR *) connstr.c_str(), SQL_NTS, NULL, SQL_NULL_DATA, NULL, SQL_DRIVER_COMPLETE);
+           if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret)
+           {
+               mylog("SQLDriverConnect return=%d\n", ret);
+               SQLFreeHandle(SQL_HANDLE_DBC, xaconn);
+               xaconn = NULL;
+           }
+       }
+   }
+   return xaconn;
+}
+
+static map<int, XAConnection>  xatab;
+
+static class INIT_CRIT
+{
+private:
+public:
+   INIT_CRIT() {InitializeCriticalSection(&map_cs);}
+   ~INIT_CRIT()
+   {
+mylog("Leaving INIT_CRIT\n");
+       FreeEnv();
+       xatab.clear();
+       DeleteCriticalSection(&map_cs);
+   }
+} init_crit;
+
+static void XatabClear(void)
+{
+   xatab.clear();
+}
+
+static const char *XidToText(const XID &xid, char *rtext)
+{
+   int glen = xid.gtrid_length, blen = xid.bqual_length;
+   int i, j;
+
+   for (i = 0, j = 0; i < glen; i++, j += 2)
+       sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);
+   strcat(rtext, "-"); j++;
+   for (; i < glen + blen; i++, j += 2)
+       sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); 
+   return rtext;
+}
+
+static int
+pg_hex2bin(const UCHAR *src, UCHAR *dst, int length)
+{
+   UCHAR       chr;
+   const UCHAR *src_wk;
+   UCHAR       *dst_wk;
+   int     i, val;
+   BOOL        HByte = TRUE;
+
+   for (i = 0, src_wk = src, dst_wk = dst; i < length; i++, src_wk++)
+   {
+       chr = *src_wk;
+       if (!chr)
+           break;
+       if (chr >= 'a' && chr <= 'f')
+           val = chr - 'a' + 10;
+       else if (chr >= 'A' && chr <= 'F')
+           val = chr - 'A' + 10;
+       else
+           val = chr - '0';
+       if (HByte)
+           *dst_wk = (val << 4);
+       else
+       {
+           *dst_wk += val; 
+           dst_wk++;
+       }
+       HByte = !HByte;
+   }
+   return length;
+}
+
+static int TextToXid(XID &xid, const char *rtext)
+{
+   int slen, glen, blen;
+   char    *sptr;
+
+   slen = strlen(rtext);
+   sptr = strchr(rtext, '-');
+   if (sptr)
+   {
+       glen = (int) (sptr - rtext);
+       blen = slen - glen - 1;
+   }
+   else
+   {
+       glen = slen;
+       blen = 0;
+   }
+   xid.gtrid_length = glen / 2;
+   xid.bqual_length = blen / 2;
+   pg_hex2bin((const UCHAR *) rtext, (UCHAR *) &xid.data[0], glen);
+   pg_hex2bin((const UCHAR *) sptr + 1, (UCHAR *) &xid.data[glen / 2], blen);
+   return (glen + blen) / 2;
+}
+
+
+EXTERN_C static int __cdecl xa_open(char *xa_info, int rmid, long flags)
+{
+   mylog("xa_open %s rmid=%d flags=%ld\n", xa_info, rmid, flags);
+   MLOCK_ACQUIRE;
+   xatab.insert(pair<int, XAConnection>(rmid, XAConnection(xa_info)));
+   MLOCK_RELEASE;
+   return  S_OK;
+}
+EXTERN_C static int __cdecl xa_close(char *xa_info, int rmid, long flags)
+{
+   mylog("xa_close rmid=%d flags=%ld\n", rmid, flags);
+   MLOCK_ACQUIRE;
+   xatab.erase(rmid);
+   if (xatab.size() == 0)
+       FreeEnv();
+   MLOCK_RELEASE;
+   return XA_OK;
+}
+
+//
+// Dummy implmentation
+//
+EXTERN_C static int __cdecl xa_start(XID *xid, int rmid, long flags)
+{
+   char    pgxid[258];
+
+   XidToText(*xid, pgxid);
+   mylog("xa_start %s rmid=%d flags=%ld\n", pgxid, rmid, flags);
+   xatab.find(rmid)->second.ActivateConnection();
+   return XA_OK;
+}
+//
+// Dummy implementation
+//
+EXTERN_C static int __cdecl xa_end(XID *xid, int rmid, long flags)
+{
+   char    pgxid[258];
+
+   XidToText(*xid, pgxid);
+   mylog("xa_end %s rmid=%d flags=%ld\n", pgxid, rmid, flags);
+   return XA_OK;
+}
+
+EXTERN_C static int __cdecl xa_rollback(XID *xid, int rmid, long flags)
+{
+   int rmcode = XAER_RMERR;
+   char    pgxid[258];
+
+   XidToText(*xid, pgxid);
+   mylog("xa_rollback %s rmid=%d flags=%ld\n", pgxid, rmid, flags);
+   map<int, XAConnection>::iterator p;
+   p = xatab.find(rmid);
+   if (p != xatab.end())
+   {
+       HDBC    conn = p->second.ActivateConnection();
+       if (conn)
+       {
+           SQLCHAR cmdmsg[512], sqlstate[8];
+           HSTMT   stmt;
+           RETCODE ret;
+
+           ret = SQLAllocHandle(SQL_HANDLE_STMT, conn, &stmt);
+           if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret)
+           {
+               mylog("Statement allocation error\n");
+               return rmcode;
+           }
+           _snprintf((char *) cmdmsg, sizeof(cmdmsg), "ROLLBACK PREPARED '%s'", pgxid);
+           ret = SQLExecDirect(stmt, (SQLCHAR *) cmdmsg, SQL_NTS);
+           switch (ret)
+           {
+               case SQL_SUCCESS:
+               case SQL_SUCCESS_WITH_INFO:
+                   rmcode = XA_OK;
+                   break;
+               case SQL_ERROR:
+                   SQLGetDiagRec(SQL_HANDLE_STMT, stmt,
+                       1, sqlstate, NULL, cmdmsg,
+                                   sizeof(cmdmsg), NULL);
+                   mylog("xa_commit error %s '%s'\n", sqlstate, cmdmsg);
+                   if (_stricmp((char *) sqlstate, "42704") == 0)
+                       rmcode = XA_HEURHAZ;
+                   break;
+           }
+           SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+       }
+   }
+   return rmcode;
+}
+//
+// Dummy implementation
+// It's almost impossible to implement this routine properly.
+// 
+EXTERN_C static int __cdecl xa_prepare(XID *xid, int rmid, long flags)
+{
+   char    pgxid[258];
+
+   XidToText(*xid, pgxid);
+   mylog("xa_prepare %s rmid=%d\n", pgxid, rmid);
+#ifdef _SLEEP_FOR_TEST_
+   Sleep(2000);
+#endif /* _SLEEP_FOR_TEST_ */
+   map<int, XAConnection>::iterator p;
+   p = xatab.find(rmid);
+   if (p != xatab.end())
+   {
+       HDBC    conn = p->second.GetConnection();
+       if (conn)
+       {
+       }
+   }
+   return XAER_RMERR;
+}
+EXTERN_C static int __cdecl xa_commit(XID *xid, int rmid, long flags)
+{
+   int rmcode = XAER_RMERR;
+   char    pgxid[258];
+
+   XidToText(*xid, pgxid);
+   mylog("xa_commit %s rmid=%d flags=%ld\n", pgxid, rmid, flags);
+#ifdef _SLEEP_FOR_TEST_
+   Sleep(2000);
+#endif /* _SLEEP_FOR_TEST_ */
+   map<int, XAConnection>::iterator p;
+   p = xatab.find(rmid);
+   if (p != xatab.end())
+   {
+       HDBC    conn = p->second.ActivateConnection();
+       if (conn)
+       {
+           SQLCHAR cmdmsg[512], sqlstate[8];
+           HSTMT   stmt;
+           RETCODE ret;
+
+           SQLAllocHandle(SQL_HANDLE_STMT, conn, &stmt);
+           _snprintf((char *) cmdmsg, sizeof(cmdmsg), "COMMIT PREPARED '%s'", pgxid);
+           ret = SQLExecDirect(stmt, (SQLCHAR *) cmdmsg, SQL_NTS);
+           switch (ret)
+           {
+               case SQL_SUCCESS:
+               case SQL_SUCCESS_WITH_INFO:
+                   rmcode = XA_OK;
+                   break;
+               case SQL_ERROR:
+                   SQLGetDiagRec(SQL_HANDLE_STMT, stmt,
+                       1, sqlstate, NULL, cmdmsg,
+                                   sizeof(cmdmsg), NULL);
+                   if (_stricmp((char *) sqlstate, "42704") == 0)
+                       rmcode = XA_HEURHAZ;
+                   break;
+           }
+           SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+       }
+   }
+   return rmcode;
+}
+EXTERN_C static int __cdecl xa_recover(XID *xids, long count, int rmid, long flags)
+{
+   int rmcode = XAER_RMERR, rcount;
+
+   mylog("xa_recover rmid=%d count=%d flags=%ld\n", rmid, count, flags);
+   map<int, XAConnection>::iterator p;
+   p = xatab.find(rmid);
+   if (p == xatab.end())
+       return rmcode;
+   HDBC    conn = p->second.ActivateConnection();
+   if (!conn)
+       return rmcode;
+   vector<string>  &vec = p->second.GetResultVec();
+   int pos = p->second.GetPos();
+   if ((flags & TMSTARTRSCAN) != 0)
+   {
+       HSTMT   stmt;
+       RETCODE ret;
+       char    buf[512];
+
+       vec.clear();
+       SQLAllocHandle(SQL_HANDLE_STMT, conn, &stmt);
+       ret = SQLExecDirect(stmt, (SQLCHAR *) "select gid from pg_prepared_xacts", SQL_NTS);
+       if (SQL_SUCCESS != ret && SQL_SUCCESS_WITH_INFO != ret)
+       {
+           SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+           pos = -1;
+           goto onExit;
+       }
+       SQLBindCol(stmt, 1, SQL_C_CHAR, buf, sizeof(buf), NULL);
+       ret = SQLFetch(stmt);
+       while (SQL_NO_DATA_FOUND != ret)
+       {
+           vec.push_back(buf);
+           ret = SQLFetch(stmt);
+       }
+       SQLFreeHandle(SQL_HANDLE_STMT, stmt);
+       pos = 0;
+   }
+   rcount = vec.size();
+   rmcode = rcount - pos;
+   if (rmcode > count)
+       rmcode = count;
+   for (int i = 0; i < rmcode; i++, pos++)
+       TextToXid(xids[i], vec[pos].c_str());
+   
+   if ((flags & TMENDRSCAN) != 0)
+   {
+       vec.clear();
+       pos = -1;
+   }
+   mylog("return count=%d\n", rmcode);
+onExit:
+   p->second.SetPos(pos);
+   return rmcode;
+}
+
+//
+// I'm not sure if this is invoked from MSDTC
+// Anyway there's nothing to do with it.
+//
+EXTERN_C static int __cdecl xa_forget(XID *xid, int rmid, long flags)
+{
+   char    pgxid[258];
+
+   XidToText(*xid, pgxid);
+   mylog("xa_forget %s rmid=%d\n", pgxid, rmid);
+   return XA_OK;
+}
+//
+// I'm not sure if this invoked from MSDTC.
+//
+EXTERN_C static int __cdecl xa_complete(int *handle, int *retval, int rmid, long flags)
+{
+   mylog("xa_complete rmid=%d\n", rmid);
+   return XA_OK;
+}
+
+EXTERN_C static xa_switch_t    xapsw = { "psotgres_xa", TMNOMIGRATE,
+       0, xa_open, xa_close, xa_start, xa_end, xa_rollback,
+       xa_prepare, xa_commit, xa_recover, xa_forget,
+       xa_complete}; 
+
+EXTERN_C HRESULT __cdecl   GetXaSwitch (XA_SWITCH_FLAGS  XaSwitchFlags,
+       xa_switch_t **  ppXaSwitch)
+{
+   mylog("GetXaSwitch called\n");
+
+   *ppXaSwitch = &xapsw;
+   return  S_OK;
+}
index c215393377fe6954b9326ac7be830cdaa41b99b0..bcb64fe0566b996ba9a40b2b84da2887471c6660 100644 (file)
--- a/version.h
+++ b/version.h
@@ -9,8 +9,8 @@
 #ifndef __VERSION_H__
 #define __VERSION_H__
 
-#define POSTGRESDRIVERVERSION      "08.02.0003"
-#define POSTGRES_RESOURCE_VERSION  "08.02.0003\0"
-#define PG_DRVFILE_VERSION     8,2,0,03 
+#define POSTGRESDRIVERVERSION      "08.02.0004"
+#define POSTGRES_RESOURCE_VERSION  "08.02.0004\0"
+#define PG_DRVFILE_VERSION     8,2,0,04 
 
 #endif