Add a function GetLockConflicts() to lock.c to report xacts holding
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 27 Aug 2006 19:14:34 +0000 (19:14 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 27 Aug 2006 19:14:34 +0000 (19:14 +0000)
locks that would conflict with a specified lock request, without
actually trying to get that lock.  Use this instead of the former ad hoc
method of doing the first wait step in CREATE INDEX CONCURRENTLY.
Fixes problem with undetected deadlock and in many cases will allow the
index creation to proceed sooner than it otherwise could've.  Per
discussion with Greg Stark.

src/backend/commands/indexcmds.c
src/backend/storage/lmgr/lock.c
src/include/storage/lock.h

index b4335b5c404937efafb46ef9f10ab84277737834..204d67fbd8bb5ad96f71f3a5b248b391af0831ea 100644 (file)
@@ -119,8 +119,11 @@ DefineIndex(RangeVar *heapRelation,
        Datum           reloptions;
        IndexInfo  *indexInfo;
        int                     numberOfAttributes;
+       List       *old_xact_list;
+       ListCell   *lc;
        uint32          ixcnt;
        LockRelId       heaprelid;
+       LOCKTAG         heaplocktag;
        Snapshot        snapshot;
        Relation pg_index;
        HeapTuple indexTuple;
@@ -466,33 +469,26 @@ DefineIndex(RangeVar *heapRelation,
        CommitTransactionCommand();
        StartTransactionCommand();
 
-       /* Establish transaction snapshot ... else GetLatestSnapshot complains */
-       (void) GetTransactionSnapshot();
-
        /*
         * Now we must wait until no running transaction could have the table open
-        * with the old list of indexes.  If we can take an exclusive lock then
-        * there are none now and anybody who opens it later will get the new
-        * index in their relcache entry.  Alternatively, if our Xmin reaches our
-        * own (new) transaction then we know no transactions that started before
-        * the index was visible are left anyway.
+        * with the old list of indexes.  To do this, inquire which xacts currently
+        * would conflict with ShareLock on the table -- ie, which ones have
+        * a lock that permits writing the table.  Then wait for each of these
+        * xacts to commit or abort.  Note we do not need to worry about xacts
+        * that open the table for writing after this point; they will see the
+        * new index when they open it.
+        *
+        * Note: GetLockConflicts() never reports our own xid,
+        * hence we need not check for that.
         */
-       for (;;)
-       {
-               CHECK_FOR_INTERRUPTS();
+       SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
+       old_xact_list = GetLockConflicts(&heaplocktag, ShareLock);
 
-               if (ConditionalLockRelationOid(relationId, ExclusiveLock))
-               {
-                       /* Release the lock right away to avoid blocking anyone */
-                       UnlockRelationOid(relationId, ExclusiveLock);
-                       break;
-               }
-
-               if (TransactionIdEquals(GetLatestSnapshot()->xmin,
-                                                               GetTopTransactionId()))
-                       break;
+       foreach(lc, old_xact_list)
+       {
+               TransactionId xid = lfirst_xid(lc);
 
-               pg_usleep(1000000L);            /* 1 sec */
+               XactLockTableWait(xid);
        }
 
        /*
index a6885fc77a6bb217898381442148ff3837dce76d..eaa682276061e88eae147af2f768547c4fc535e7 100644 (file)
@@ -1685,6 +1685,104 @@ LockReassignCurrentOwner(void)
 }
 
 
+/*
+ * GetLockConflicts
+ *             Get a list of TransactionIds of xacts currently holding locks
+ *             that would conflict with the specified lock/lockmode.
+ *             xacts merely awaiting such a lock are NOT reported.
+ *
+ * Of course, the result could be out of date by the time it's returned,
+ * so use of this function has to be thought about carefully.
+ *
+ * Only top-level XIDs are reported.  Note we never include the current xact
+ * in the result list, since an xact never blocks itself.
+ */
+List *
+GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
+{
+       List       *result = NIL;
+       LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+       LockMethod      lockMethodTable;
+       LOCK       *lock;
+       LOCKMASK        conflictMask;
+       SHM_QUEUE  *procLocks;
+       PROCLOCK   *proclock;
+       uint32          hashcode;
+       LWLockId        partitionLock;
+
+       if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+               elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+       lockMethodTable = LockMethods[lockmethodid];
+       if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
+               elog(ERROR, "unrecognized lock mode: %d", lockmode);
+
+       /*
+        * Look up the lock object matching the tag.
+        */
+       hashcode = LockTagHashCode(locktag);
+       partitionLock = LockHashPartitionLock(hashcode);
+
+       LWLockAcquire(partitionLock, LW_SHARED);
+
+       lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
+                                                                                               (void *) locktag,
+                                                                                               hashcode,
+                                                                                               HASH_FIND,
+                                                                                               NULL);
+       if (!lock)
+       {
+               /*
+                * If the lock object doesn't exist, there is nothing holding a
+                * lock on this lockable object.
+                */
+               LWLockRelease(partitionLock);
+               return NIL;
+       }
+
+       /*
+        * Examine each existing holder (or awaiter) of the lock.
+        */
+       conflictMask = lockMethodTable->conflictTab[lockmode];
+
+       procLocks = &(lock->procLocks);
+
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, lockLink));
+
+       while (proclock)
+       {
+               if (conflictMask & proclock->holdMask)
+               {
+                       PGPROC *proc = proclock->tag.myProc;
+
+                       /* A backend never blocks itself */
+                       if (proc != MyProc)
+                       {
+                               /* Fetch xid just once - see GetNewTransactionId */
+                               TransactionId xid = proc->xid;
+
+                               /*
+                                * Race condition: during xact commit/abort we zero out
+                                * PGPROC's xid before we mark its locks released.  If we see
+                                * zero in the xid field, assume the xact is in process of
+                                * shutting down and act as though the lock is already
+                                * released.
+                                */
+                               if (TransactionIdIsValid(xid))
+                                       result = lappend_xid(result, xid);
+                       }
+               }
+
+               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
+                                                                                        offsetof(PROCLOCK, lockLink));
+       }
+
+       LWLockRelease(partitionLock);
+
+       return result;
+}
+
+
 /*
  * AtPrepare_Locks
  *             Do the preparatory work for a PREPARE: make 2PC state file records
index ebea7289736695973eb22ea2adee1ca202949323..aa916e5ebd226f5be82f4298ad268d764c7d5884 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef LOCK_H_
 #define LOCK_H_
 
+#include "nodes/pg_list.h"
 #include "storage/itemptr.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -412,6 +413,7 @@ extern bool LockRelease(const LOCKTAG *locktag,
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
 extern void LockReleaseCurrentOwner(void);
 extern void LockReassignCurrentOwner(void);
+extern List *GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode);
 extern void AtPrepare_Locks(void);
 extern void PostPrepare_Locks(TransactionId xid);
 extern int LockCheckConflicts(LockMethod lockMethodTable,
@@ -421,13 +423,6 @@ extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
 extern void GrantAwaitedLock(void);
 extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode);
 extern Size LockShmemSize(void);
-extern bool DeadLockCheck(PGPROC *proc);
-extern void DeadLockReport(void);
-extern void RememberSimpleDeadLock(PGPROC *proc1,
-                                          LOCKMODE lockmode,
-                                          LOCK *lock,
-                                          PGPROC *proc2);
-extern void InitDeadLockChecking(void);
 extern LockData *GetLockStatusData(void);
 extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode);
 
@@ -438,6 +433,14 @@ extern void lock_twophase_postcommit(TransactionId xid, uint16 info,
 extern void lock_twophase_postabort(TransactionId xid, uint16 info,
                                                void *recdata, uint32 len);
 
+extern bool DeadLockCheck(PGPROC *proc);
+extern void DeadLockReport(void);
+extern void RememberSimpleDeadLock(PGPROC *proc1,
+                                          LOCKMODE lockmode,
+                                          LOCK *lock,
+                                          PGPROC *proc2);
+extern void InitDeadLockChecking(void);
+
 #ifdef LOCK_DEBUG
 extern void DumpLocks(PGPROC *proc);
 extern void DumpAllLocks(void);