* This ensures that the set of transactions seen as "running" by the
  * current xact will not change after it takes the snapshot.
  *
- * Note that only top-level XIDs are included in the snapshot. We can
- * still apply the xmin and xmax limits to subtransaction XIDs, but we
- * need to work a bit harder to see if XIDs in [xmin..xmax) are running.
+ * All running top-level XIDs are included in the snapshot.  We also try
+ * to include running subtransaction XIDs, but since PGPROC has only a
+ * limited cache area for subxact XIDs, full information may not be
+ * available.  If we find any overflowed subxid arrays, we have to mark
+ * the snapshot's subxid data as overflowed, and extra work will need to
+ * be done to determine what's running (see XidInSnapshot() in tqual.c).
  *
  * We also update the following backend-global variables:
  *             TransactionXmin: the oldest xmin of any snapshot in use in the
        TransactionId globalxmin;
        int                     index;
        int                     count = 0;
+       int                     subcount = 0;
 
        Assert(snapshot != NULL);
 
        /*
         * Allocating space for maxProcs xids is usually overkill; numProcs would
         * be sufficient.  But it seems better to do the malloc while not holding
-        * the lock, so we can't look at numProcs.
+        * the lock, so we can't look at numProcs.  Likewise, we allocate much
+        * more subxip storage than is probably needed.
         *
         * This does open a possibility for avoiding repeated malloc/free: since
         * maxProcs does not change at runtime, we can simply reuse the previous
-        * xip array if any.  (This relies on the fact that all callers pass
+        * xip arrays if any.  (This relies on the fact that all callers pass
         * static SnapshotData structs.)
         */
        if (snapshot->xip == NULL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OUT_OF_MEMORY),
                                         errmsg("out of memory")));
+               Assert(snapshot->subxip == NULL);
+               snapshot->subxip = (TransactionId *)
+                       malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+               if (snapshot->subxip == NULL)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_OUT_OF_MEMORY),
+                                        errmsg("out of memory")));
        }
 
        globalxmin = xmin = GetTopTransactionId();
 
        /*
-        * If we are going to set MyProc->xmin then we'd better get exclusive
-        * lock; if not, this is a read-only operation so it can be shared.
+        * It is sufficient to get shared lock on ProcArrayLock, even if we
+        * are computing a serializable snapshot and therefore will be setting
+        * MyProc->xmin.  This is because any two backends that have overlapping
+        * shared holds on ProcArrayLock will certainly compute the same xmin
+        * (since no xact, in particular not the oldest, can exit the set of
+        * running transactions while we hold ProcArrayLock --- see further
+        * discussion just below).  So it doesn't matter whether another backend
+        * concurrently doing GetSnapshotData or GetOldestXmin sees our xmin as
+        * set or not; he'd compute the same xmin for himself either way.
         */
-       LWLockAcquire(ProcArrayLock, serializable ? LW_EXCLUSIVE : LW_SHARED);
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
 
        /*--------------------
         * Unfortunately, we have to call ReadNewTransactionId() after acquiring
                if (TransactionIdIsNormal(xid))
                        if (TransactionIdPrecedes(xid, globalxmin))
                                globalxmin = xid;
+
+               /*
+                * Save subtransaction XIDs if possible (if we've already overflowed,
+                * there's no point).  Note that the subxact XIDs must be later than
+                * their parent, so no need to check them against xmin.
+                *
+                * The other backend can add more subxids concurrently, but cannot
+                * remove any.  Hence it's important to fetch nxids just once.
+                * Should be safe to use memcpy, though.  (We needn't worry about
+                * missing any xids added concurrently, because they must postdate
+                * xmax.)
+                */
+               if (subcount >= 0)
+               {
+                       if (proc->subxids.overflowed)
+                               subcount = -1;                                  /* overflowed */
+                       else
+                       {
+                               int             nxids = proc->subxids.nxids;
+
+                               if (nxids > 0)
+                               {
+                                       memcpy(snapshot->subxip + subcount,
+                                                  proc->subxids.xids,
+                                                  nxids * sizeof(TransactionId));
+                                       subcount += nxids;
+                               }
+                       }
+               }
        }
 
        if (serializable)
        snapshot->xmin = xmin;
        snapshot->xmax = xmax;
        snapshot->xcnt = count;
+       snapshot->subxcnt = subcount;
 
        snapshot->curcid = GetCurrentCommandId();
 
        int                     i,
                                j;
 
-       Assert(!TransactionIdEquals(xid, InvalidTransactionId));
+       Assert(TransactionIdIsValid(xid));
 
        /*
         * We must hold ProcArrayLock exclusively in order to remove transactions
 
 TransactionId RecentXmin = InvalidTransactionId;
 TransactionId RecentGlobalXmin = InvalidTransactionId;
 
+/* local functions */
+static bool XidInSnapshot(TransactionId xid, Snapshot snapshot);
+
 
 /*
  * HeapTupleSatisfiesItself
        /*
         * By here, the inserting transaction has committed - have to check
         * when...
-        *
-        * Note that the provided snapshot contains only top-level XIDs, so we
-        * have to convert a subxact XID to its parent for comparison. However, we
-        * can make first-pass range checks with the given XID, because a subxact
-        * with XID < xmin has surely also got a parent with XID < xmin, while one
-        * with XID >= xmax must belong to a parent that was not yet committed at
-        * the time of this snapshot.
         */
-       if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmin(tuple),
-                                                                        snapshot->xmin))
-       {
-               TransactionId parentXid;
-
-               if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmin(tuple),
-                                                                                snapshot->xmax))
-                       return false;
-
-               parentXid = SubTransGetTopmostTransaction(HeapTupleHeaderGetXmin(tuple));
-
-               if (TransactionIdFollowsOrEquals(parentXid, snapshot->xmin))
-               {
-                       uint32          i;
-
-                       /* no point in checking parentXid against xmax here */
-
-                       for (i = 0; i < snapshot->xcnt; i++)
-                       {
-                               if (TransactionIdEquals(parentXid, snapshot->xip[i]))
-                                       return false;
-                       }
-               }
-       }
+       if (XidInSnapshot(HeapTupleHeaderGetXmin(tuple), snapshot))
+               return false;                   /* treat as still in progress */
 
        if (tuple->t_infomask & HEAP_XMAX_INVALID)      /* xid invalid or aborted */
                return true;
 
        /*
         * OK, the deleting transaction committed too ... but when?
-        *
-        * See notes for the similar tests on tuple xmin, above.
         */
-       if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmax(tuple),
-                                                                        snapshot->xmin))
-       {
-               TransactionId parentXid;
+       if (XidInSnapshot(HeapTupleHeaderGetXmax(tuple), snapshot))
+               return true;                    /* treat as still in progress */
 
-               if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmax(tuple),
-                                                                                snapshot->xmax))
-                       return true;
-
-               parentXid = SubTransGetTopmostTransaction(HeapTupleHeaderGetXmax(tuple));
-
-               if (TransactionIdFollowsOrEquals(parentXid, snapshot->xmin))
-               {
-                       uint32          i;
-
-                       /* no point in checking parentXid against xmax here */
-
-                       for (i = 0; i < snapshot->xcnt; i++)
-                       {
-                               if (TransactionIdEquals(parentXid, snapshot->xip[i]))
-                                       return true;
-                       }
-               }
-       }
-
-/* This is to be used only for disaster recovery and requires serious analysis. */
-#ifndef MAKE_EXPIRED_TUPLES_VISIBLE
        return false;
-#else
-       return true;
-#endif
 }
 
 
 CopySnapshot(Snapshot snapshot)
 {
        Snapshot        newsnap;
+       Size            subxipoff;
+       Size            size;
 
-       /* We allocate any XID array needed in the same palloc block. */
-       newsnap = (Snapshot) palloc(sizeof(SnapshotData) +
-                                                               snapshot->xcnt * sizeof(TransactionId));
+       /* We allocate any XID arrays needed in the same palloc block. */
+       size = subxipoff = sizeof(SnapshotData) +
+               snapshot->xcnt * sizeof(TransactionId);
+       if (snapshot->subxcnt > 0)
+               size += snapshot->subxcnt * sizeof(TransactionId);
+
+       newsnap = (Snapshot) palloc(size);
        memcpy(newsnap, snapshot, sizeof(SnapshotData));
+
+       /* setup XID array */
        if (snapshot->xcnt > 0)
        {
                newsnap->xip = (TransactionId *) (newsnap + 1);
        else
                newsnap->xip = NULL;
 
+       /* setup subXID array */
+       if (snapshot->subxcnt > 0)
+       {
+               newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
+               memcpy(newsnap->subxip, snapshot->subxip,
+                          snapshot->subxcnt * sizeof(TransactionId));
+       }
+       else
+               newsnap->subxip = NULL;
+
        return newsnap;
 }
 
        LatestSnapshot = NULL;
        ActiveSnapshot = NULL;          /* just for cleanliness */
 }
+
+/*
+ * XidInSnapshot
+ *             Is the given XID still-in-progress according to the snapshot?
+ *
+ * Note: GetSnapshotData never stores either top xid or subxids of our own
+ * backend into a snapshot, so these xids will not be reported as "running"
+ * by this function.  This is OK for current uses, because we actually only
+ * apply this for known-committed XIDs.
+ */
+static bool
+XidInSnapshot(TransactionId xid, Snapshot snapshot)
+{
+       uint32          i;
+
+       /*
+        * Make a quick range check to eliminate most XIDs without looking at the
+        * xip arrays.  Note that this is OK even if we convert a subxact XID to
+        * its parent below, because a subxact with XID < xmin has surely also got
+        * a parent with XID < xmin, while one with XID >= xmax must belong to a
+        * parent that was not yet committed at the time of this snapshot.
+        */
+
+       /* Any xid < xmin is not in-progress */
+       if (TransactionIdPrecedes(xid, snapshot->xmin))
+               return false;
+       /* Any xid >= xmax is in-progress */
+       if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
+               return true;
+
+       /*
+        * If the snapshot contains full subxact data, the fastest way to check
+        * things is just to compare the given XID against both subxact XIDs and
+        * top-level XIDs.  If the snapshot overflowed, we have to use pg_subtrans
+        * to convert a subxact XID to its parent XID, but then we need only look
+        * at top-level XIDs not subxacts.
+        */
+       if (snapshot->subxcnt >= 0)
+       {
+               /* full data, so search subxip */
+               int32           j;
+
+               for (j = 0; j < snapshot->subxcnt; j++)
+               {
+                       if (TransactionIdEquals(xid, snapshot->subxip[j]))
+                               return true;
+               }
+
+               /* not there, fall through to search xip[] */
+       }
+       else
+       {
+               /* overflowed, so convert xid to top-level */
+               xid = SubTransGetTopmostTransaction(xid);
+
+               /*
+                * If xid was indeed a subxact, we might now have an xid < xmin,
+                * so recheck to avoid an array scan.  No point in rechecking xmax.
+                */
+               if (TransactionIdPrecedes(xid, snapshot->xmin))
+                       return false;
+       }
+
+       for (i = 0; i < snapshot->xcnt; i++)
+       {
+               if (TransactionIdEquals(xid, snapshot->xip[i]))
+                       return true;
+       }
+
+       return false;
+}