Initial work on SnapArray implementation.
authorRobert Haas <rhaas@postgresql.org>
Tue, 16 Aug 2011 02:43:49 +0000 (22:43 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 14 Oct 2011 18:38:13 +0000 (14:38 -0400)
13 files changed:
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/storage/ipc/Makefile
src/backend/storage/ipc/README.snaparray [new file with mode: 0644]
src/backend/storage/ipc/ipci.c
src/backend/storage/ipc/procarray.c
src/backend/storage/ipc/snaparray.c [new file with mode: 0644]
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/storage/lwlock.h
src/include/storage/procarray.h
src/include/storage/snaparray.h [new file with mode: 0644]
src/include/utils/guc.h

index 3dab45c2da60a1010d566fccf658a4a55ee3ece3..9c1e072d150c0b74a6ef02178280b51394c778e5 100644 (file)
@@ -43,6 +43,7 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
+#include "storage/snaparray.h"
 #include "utils/combocid.h"
 #include "utils/guc.h"
 #include "utils/inval.h"
@@ -266,7 +267,8 @@ static void CallSubXactCallbacks(SubXactEvent event,
                                         SubTransactionId parentSubid);
 static void CleanupTransaction(void);
 static void CommitTransaction(void);
-static TransactionId RecordTransactionAbort(bool isSubXact);
+static void RecordTransactionAbort(bool isSubXact, TransactionId xid,
+                                          int nchildren, TransactionId *children);
 static void StartTransaction(void);
 
 static void StartSubTransaction(void);
@@ -900,20 +902,14 @@ AtSubStart_ResourceOwner(void)
 
 /*
  *     RecordTransactionCommit
- *
- * Returns latest XID among xact and its children, or InvalidTransactionId
- * if the xact has no XID.     (We compute that here just because it's easier.)
  */
-static TransactionId
-RecordTransactionCommit(void)
+static void
+RecordTransactionCommit(TransactionId xid, int nchildren,
+                                               TransactionId *children)
 {
-       TransactionId xid = GetTopTransactionIdIfAny();
        bool            markXidCommitted = TransactionIdIsValid(xid);
-       TransactionId latestXid = InvalidTransactionId;
        int                     nrels;
        RelFileNode *rels;
-       int                     nchildren;
-       TransactionId *children;
        int                     nmsgs = 0;
        SharedInvalidationMessage *invalMessages = NULL;
        bool            RelcacheInitFileInval = false;
@@ -921,7 +917,6 @@ RecordTransactionCommit(void)
 
        /* Get data needed for commit record */
        nrels = smgrGetPendingDeletes(true, &rels);
-       nchildren = xactGetCommittedChildren(&children);
        if (XLogStandbyInfoActive())
                nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
                                                                                                         &RelcacheInitFileInval);
@@ -1159,9 +1154,6 @@ RecordTransactionCommit(void)
                END_CRIT_SECTION();
        }
 
-       /* Compute latestXid while we have the child XIDs handy */
-       latestXid = TransactionIdLatest(xid, nchildren, children);
-
        /*
         * Wait for synchronous replication, if required.
         *
@@ -1177,8 +1169,6 @@ cleanup:
        /* Clean up local data */
        if (rels)
                pfree(rels);
-
-       return latestXid;
 }
 
 
@@ -1346,19 +1336,13 @@ AtSubCommit_childXids(void)
 
 /*
  *     RecordTransactionAbort
- *
- * Returns latest XID among xact and its children, or InvalidTransactionId
- * if the xact has no XID.     (We compute that here just because it's easier.)
  */
-static TransactionId
-RecordTransactionAbort(bool isSubXact)
+static void
+RecordTransactionAbort(bool isSubXact, TransactionId xid, int nchildren,
+                                          TransactionId *children)
 {
-       TransactionId xid = GetCurrentTransactionIdIfAny();
-       TransactionId latestXid;
        int                     nrels;
        RelFileNode *rels;
-       int                     nchildren;
-       TransactionId *children;
        XLogRecData rdata[3];
        int                     lastrdata = 0;
        xl_xact_abort xlrec;
@@ -1367,14 +1351,14 @@ RecordTransactionAbort(bool isSubXact)
         * If we haven't been assigned an XID, nobody will care whether we aborted
         * or not.      Hence, we're done in that case.  It does not matter if we have
         * rels to delete (note that this routine is not responsible for actually
-        * deleting 'em).  We cannot have any child XIDs, either.
+        * deleting 'em).
         */
        if (!TransactionIdIsValid(xid))
        {
                /* Reset XactLastRecEnd until the next transaction writes something */
                if (!isSubXact)
                        XactLastRecEnd.xrecoff = 0;
-               return InvalidTransactionId;
+               return;
        }
 
        /*
@@ -1394,7 +1378,6 @@ RecordTransactionAbort(bool isSubXact)
 
        /* Fetch the data we need for the abort record */
        nrels = smgrGetPendingDeletes(false, &rels);
-       nchildren = xactGetCommittedChildren(&children);
 
        /* XXX do we really need a critical section here? */
        START_CRIT_SECTION();
@@ -1458,18 +1441,6 @@ RecordTransactionAbort(bool isSubXact)
 
        END_CRIT_SECTION();
 
-       /* Compute latestXid while we have the child XIDs handy */
-       latestXid = TransactionIdLatest(xid, nchildren, children);
-
-       /*
-        * If we're aborting a subtransaction, we can immediately remove failed
-        * XIDs from PGPROC's cache of running child XIDs.  We do that here for
-        * subxacts, because we already have the child XID array at hand.  For
-        * main xacts, the equivalent happens just after this function returns.
-        */
-       if (isSubXact)
-               XidCacheRemoveRunningXids(xid, nchildren, children, latestXid);
-
        /* Reset XactLastRecEnd until the next transaction writes something */
        if (!isSubXact)
                XactLastRecEnd.xrecoff = 0;
@@ -1477,8 +1448,6 @@ RecordTransactionAbort(bool isSubXact)
        /* And clean up local data */
        if (rels)
                pfree(rels);
-
-       return latestXid;
 }
 
 /*
@@ -1786,6 +1755,9 @@ CommitTransaction(void)
 {
        TransactionState s = CurrentTransactionState;
        TransactionId latestXid;
+       TransactionId xid;
+       int                     nchildren;
+       TransactionId *children;
 
        ShowTransactionState("CommitTransaction");
 
@@ -1852,6 +1824,11 @@ CommitTransaction(void)
         */
        PreCommit_Notify();
 
+       /* Get XID information. */
+       xid = GetTopTransactionIdIfAny();
+       nchildren = xactGetCommittedChildren(&children);
+       latestXid = TransactionIdLatest(xid, nchildren, children);
+
        /* Prevent cancel/die interrupt while cleaning up */
        HOLD_INTERRUPTS();
 
@@ -1867,7 +1844,7 @@ CommitTransaction(void)
        /*
         * Here is where we really truly commit.
         */
-       latestXid = RecordTransactionCommit();
+       RecordTransactionCommit(xid, nchildren, children);
 
        TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
 
@@ -1877,6 +1854,8 @@ CommitTransaction(void)
         * RecordTransactionCommit.
         */
        ProcArrayEndTransaction(MyProc, latestXid);
+       if (TransactionIdIsNormal(xid))
+               SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid);
 
        /*
         * This is all post-commit cleanup.  Note that if an error is raised here,
@@ -2224,6 +2203,9 @@ AbortTransaction(void)
 {
        TransactionState s = CurrentTransactionState;
        TransactionId latestXid;
+       TransactionId xid;
+       TransactionId *children;
+       int                     nchildren;
 
        /* Prevent cancel/die interrupt while cleaning up */
        HOLD_INTERRUPTS();
@@ -2285,11 +2267,16 @@ AbortTransaction(void)
        AtAbort_Notify();
        AtEOXact_RelationMap(false);
 
+       /* Get XID information. */
+       xid = GetCurrentTransactionIdIfAny();
+       nchildren = xactGetCommittedChildren(&children);
+       latestXid = TransactionIdLatest(xid, nchildren, children);
+
        /*
         * Advertise the fact that we aborted in pg_clog (assuming that we got as
         * far as assigning an XID to advertise).
         */
-       latestXid = RecordTransactionAbort(false);
+       RecordTransactionAbort(false, xid, nchildren, children);
 
        TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
 
@@ -2299,6 +2286,8 @@ AbortTransaction(void)
         * RecordTransactionAbort.
         */
        ProcArrayEndTransaction(MyProc, latestXid);
+       if (TransactionIdIsNormal(xid))
+               SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid);
 
        /*
         * Post-abort cleanup.  See notes in CommitTransaction() concerning
@@ -4158,6 +4147,11 @@ AbortSubTransaction(void)
         */
        if (s->curTransactionOwner)
        {
+               TransactionId latestXid;
+               TransactionId xid;
+               int                     nchildren;
+               TransactionId *children;
+
                AfterTriggerEndSubXact(false);
                AtSubAbort_Portals(s->subTransactionId,
                                                   s->parent->subTransactionId,
@@ -4166,8 +4160,24 @@ AbortSubTransaction(void)
                                                                s->parent->subTransactionId);
                AtSubAbort_Notify();
 
+               /* Get XID information. */
+               xid = GetCurrentTransactionIdIfAny();
+               nchildren = xactGetCommittedChildren(&children);
+               latestXid = TransactionIdLatest(xid, nchildren, children);
+
                /* Advertise the fact that we aborted in pg_clog. */
-               (void) RecordTransactionAbort(true);
+               (void) RecordTransactionAbort(true, xid, nchildren, children);
+
+               /*
+                * If we're aborting a subtransaction, we can immediately remove failed
+                * XIDs from PGPROC's cache of running child XIDs.
+                */
+               if (TransactionIdIsValid(xid))
+               {
+                       XidCacheRemoveRunningXids(xid, nchildren, children, latestXid);
+                       if (TransactionIdIsNormal(xid))
+                               SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid);
+               }
 
                /* Post-abort cleanup */
                if (TransactionIdIsValid(s->transactionId))
index befb507047721bcde868a5682b97a3829cc85b81..08618d6440e02ed5ce77cc8c39da9581e2ef54c8 100644 (file)
@@ -51,6 +51,7 @@
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/snaparray.h"
 #include "storage/reinit.h"
 #include "storage/smgr.h"
 #include "storage/spin.h"
@@ -6927,6 +6928,9 @@ StartupXLOG(void)
        if (standbyState != STANDBY_DISABLED)
                ShutdownRecoveryTransactionEnvironment();
 
+       /* Set initial snapshot for normal running. */
+       SnapArrayInitNormalRunning(ShmemVariableCache->nextXid);
+
        /* Shut down readFile facility, free space */
        if (readFile >= 0)
        {
index 743f30e1c7389a6290200ad5b85c8460e72afbb2..9150eb74c9b3bcbda9ff3c1d5666a4e7e0650319 100644 (file)
@@ -16,6 +16,6 @@ endif
 endif
 
 OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
-       sinval.o sinvaladt.o standby.o
+       sinval.o sinvaladt.o snaparray.o standby.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/README.snaparray b/src/backend/storage/ipc/README.snaparray
new file mode 100644 (file)
index 0000000..e1ee63c
--- /dev/null
@@ -0,0 +1,153 @@
+Snapshot Derivation
+===================
+
+An MVCC snapshot memorializes a point in the commit sequence.  That is, the
+effects of any transaction which committed before the snapshot was taken will
+be visible to the snapshot.  Transactions which abort, or which have not yet
+committed at the time the snapshot is taken, will not be visible to the
+snapshot.  We need only be concerned about transactions to which an XID (or
+XIDs, if there are subtransactions) have been assigned; any changes made to
+the database which are not related to a particular XID are non-transactional
+and do not obey MVCC semantics anyway.  Furthermore, we can always find out
+(from CLOG) whether or not an XID that is no longer running committed or
+aborted.  Thus, an MVCC snapshot need only be capable of answering the
+following question: given an XID, had that XID ended (either by committing
+or aborting) at the time the snapshot was taken?
+
+The snaparray is responsible for providing backends with sufficient data
+to construct an MVCC snapshot that can answer this question.  We
+periodically record in shared memory (1) the highest-numbered XID which had
+already ended at the time the snapshot was taken ("xmax"); and (2) the XIDs
+of all lower-numbered transactions which had not yet ended at the time the
+snapshot was taken ("running XIDs").  As a special case, transactions which
+are performing a lazy VACUUM operation can be excluded from the set of running
+XIDs, since they need not be seen as in-progress even when they are.
+
+It is important for performance to minimize the amount of work that must be
+done at transaction end.  Since most transactions change the set
+of running XIDs only slightly (such as by removing themselves from that list),
+we provide a method for the xmax and set of running XIDs to be updated
+incrementally.  We periodically record the current xmax and set of running
+XIDs in shared memory, and then track all transaction commits which happen
+subsequently.  This provides enough information for backends to reconstruct
+a complete snapshot, as follows:
+
+1. If the largest newly completed XID is greater than the original xmax,
+then increase the xmax of the snapshot to that value, and add all XIDs
+between the old and new values to the set of running XIDs.
+
+2. Remove all of the newly completed XIDs from the set of running XIDs
+(except for the one which became the new xmax value, which is the only
+one that won't be listed).
+
+Each backend must process the list of completed XIDs every time it wishes
+to derive a snapshot, so we must not allow that list to get too long.
+Periodically, we (A) read the most recently written xmax, running XIDs, and
+newly completed XIDs; (B) perform the calculations described above to
+incrementally update these values; and then (C) write the updated xmax and
+list of running XIDs back to shared memory.  This effectively truncates the
+list of newly completed XIDs.
+
+Note that this machinery doesn't require any action whatsoever at the time
+an XID is assigned.  Newly assigned XIDs will always be greater than than
+any already-completed XID, and will be added to the set of running XIDs
+automatically when it is necessary to advance xmax.
+
+Lazy Vacuum
+===========
+
+XIDs allocated while performing a lazy vacuum don't need to be included in
+the set of running XIDs.  Instead of marking them complete when the
+transaction ends, they are marked as complete when the transaction starts.
+Although this advances xmax sooner than technically necessary, it has a
+major advantage that makes it worth doing anyway: it prevents VACUUM from
+holding back the xmin horizon.
+
+Subtransactions and Overflow
+============================
+
+The algorithm described in the previous section could be problematic in the
+presence of large numbers of subtransactions, because the amount of space
+required to store the list of in-progress XIDs could grow very large.  It's
+not really unbounded, because the XID space is only 32 bits wide, and at any
+given time half of those XIDs are in the past and half are in the future,
+which somewhat limits the effective range of XIDs that could be simultaneously
+in use.  Moreover, since dead rows can't be reclaimed until they're no longer
+visible to any running transaction, performance would probably degenerate
+severely well before the system reaching these theoretical limits.
+Nonetheless, we can't assume that it will always be practical for a snapshot
+to include every still-running XID.
+
+To address this problem, we allow XIDs assigned to subtransactions to be
+removed from the list of running XIDs.  Whenever we do this, we remember the
+highest XID so removed ("highest removed subxid").  If the snapshot receives
+an inquiry about an XID that might have been removed, it must consult
+pg_subtrans and look up the corresponding parent XID, if any.  The visibility
+calculation can then be performed as normal using the parent XID, which is
+guaranteed to be present in the array if indeed it's still running.  This
+check can be skipped if the XID is less than the oldest running XID ("xmin"),
+due to the rule that a subtransaction XID must always be greater than the XID
+of its parent transaction.
+
+Removing subtransaction XIDs from the set of running XIDs is a somewhat
+laborious process, so we hope it won't happen too often.  We iterate over
+the ProcArray to get a list of all still-running toplevel XIDs, and then
+remove any XIDs not found there.
+
+Shared Memory Organization
+==========================
+
+The main shared memory structure used by the snaparray code is a ring buffer,
+which acts as a circular message buffer.  We maintain three pointers into this
+ring buffer: (1) the location at which backends must begin reading to derive
+a valid snapshot ("start pointer"), (2) the location at which they may stop
+reading ("stop pointer"), and (3) the location beyond which no data has been
+writter ("write pointer").  To prevent overflow, these points are stored as
+64-bit integers and interpreted modulo the buffer size.  There are two types
+of messages which can be stored in this buffer: snapshot summaries, and newly
+completed XIDs.
+
+Because the most common operation on this ring buffer is to record newly
+completed XIDs, and because such messages are typically small, often just
+a single XID, it is important that the representation of such messages be
+compact.  Therefore, XID completion messages consist of just the XIDs
+themselves.
+
+To distinguish itself from an XID completion message, a snapshot summary
+begins with InvalidTransactionId, followed by the remaining data items,
+all as 4-byte quantities.  The format in full is as follows:
+
+   InvalidTransactionId
+   xmax
+   highest removed subxid
+   number of running xids
+   running xid 1
+   ...
+   running xid n
+
+Write access to the ring buffer is serialized by SnapArrayLock; only one
+transaction (that has an XID) can end at a time.  When ending, a transaction
+first may either write an XID completion message or may choose (if the distance
+between the start pointer and the new stop pointer seems like it's getting too
+large) to instead write a new snapshot summary.
+
+Reads from the ring buffer can proceed in parallel with other reads, and
+generally with writes.  However, if the ring buffer wraps around before a
+read is complete, then the data is no good and the operation must be retried.
+This is obviously undesirable from a performance perspective, but should be
+rare with a ring buffer of adequate size.  Under typical conditions, a
+wraparound would require dozens if not hundreds of commits to complete
+before the backend taking its snapshot can complete its memcpy().  However,
+if it does happen, we recover by taking SnapArrayLock in shared mode and
+repeating the operation.  To minimize the chances of further wraparounds,
+whenever we do this, we also set a flag requesting that the next backend
+to commit compress subtransaction IDs out of the snapshot.
+
+There is one further fly in the ointment.  Since the start pointer, stop
+pointer, and write pointer are 64-bit integers, we can't assume that they
+can be read and written atomically on all platforms.  Thus, in general,
+access to these variables must be protected by spinlocks.  (On platforms
+where 8 byte loads and stores are atomic, we could use these facilities,
+together with memory barriers, in lieu of the spinlocks, which would be
+advantageous on machines with many CPU cores.  But we don't support that
+yet.)
index 56c0bd8d498071f3d8a49b238ee4dd8e4a2720ca..8be87b033bb98fe03ae822f897c4930bf7390ef1 100644 (file)
@@ -34,6 +34,7 @@
 #include "storage/pmsignal.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
+#include "storage/snaparray.h"
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
@@ -115,6 +116,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                size = add_size(size, MultiXactShmemSize());
                size = add_size(size, LWLockShmemSize());
                size = add_size(size, ProcArrayShmemSize());
+               size = add_size(size, SnapArrayShmemSize());
                size = add_size(size, BackendStatusShmemSize());
                size = add_size(size, SInvalShmemSize());
                size = add_size(size, PMSignalShmemSize());
@@ -213,6 +215,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                InitProcGlobal();
        CreateSharedProcArray();
        CreateSharedBackendStatus();
+       SnapArrayShmemInit();
 
        /*
         * Set up shared-inval messaging
index 9489012a187440da1c6279e72ba7b4b96ade080a..3bc0e2e9233018c3101e3ca067ab88704ff38cbe 100644 (file)
@@ -1364,6 +1364,43 @@ GetSnapshotData(Snapshot snapshot)
        return snapshot;
 }
 
+/*
+ * GetTopXids -- write top-level in-progress XIDs to the provided array
+ *
+ * Only XIDs preceding xmax are captured.  On a standby, master transactions
+ * are ignored.  At most nxids results are written to xids, and the total
+ * number of matches that would have been written had enough space existed
+ * is returned.
+ */
+uint32
+GetTopXids(TransactionId xmax, uint32 nxids, TransactionId *xids)
+{
+       uint32          count = 0;
+       int                     index;
+
+       ProcArrayStruct *arrayP = procArray;
+
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+       for (index = 0; index < arrayP->numProcs; index++)
+       {
+               volatile PGPROC *proc = arrayP->procs[index];
+
+               /* Fetch xid just once - see GetNewTransactionId */
+               TransactionId pxid = proc->xid;
+
+               if (!TransactionIdIsValid(pxid) || !TransactionIdPrecedes(pxid, xmax))
+                       continue;
+
+               if (count < nxids)
+                       xids[count] = pxid;
+               ++count;
+       }
+
+       LWLockRelease(ProcArrayLock);
+       return count;
+}
+
 /*
  * GetRunningTransactionData -- returns information about running transactions.
  *
diff --git a/src/backend/storage/ipc/snaparray.c b/src/backend/storage/ipc/snaparray.c
new file mode 100644 (file)
index 0000000..a0f3538
--- /dev/null
@@ -0,0 +1,963 @@
+/*-------------------------------------------------------------------------
+ *
+ * snaparray.c
+ *       IPC infrastructure for MVCC snapshots.
+ *
+ * Backends frequently (often many times per transaction) require a list
+ * of all currently-running XIDs in order to construct an MVCC snapshot.
+ * Due to the frequency of this operation, it must be performed with
+ * minimal locking.  Even shared lwlocks can be problematic, as the
+ * spinlock protecting the state of the LWLock will become a contention
+ * bottleneck on machines with 32+ CPU cores.
+ *
+ * We store snapshot-related information in a shared array called the
+ * snaparray, which acts as a ring buffer.  The management of this ring
+ * buffer is documented in src/backend/storage/ipc/README.snaparray.
+ *
+ * During hot standby, we instead need a list of XIDs that were running
+ * on the master as of the current point in the WAL stream.  We manage
+ * this list using the same machinery that's used on the primary, except
+ * that the array is updated through WAL replay rather than directly by
+ * individual backends.  
+ *
+ * It is perhaps possible for a backend on the master to terminate without
+ * writing an abort record for its transaction (e.g. if the master crashed
+ * unexpectedly).  That would tie up snaparray slots indefinitely, so the
+ * master periodically transmits snapshot information to the standby to
+ * make the necessary pruning possible.
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/storage/ipc/snaparray.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/snaparray.h"
+#include "utils/catcache.h"
+#include "utils/guc.h"
+
+typedef struct SnapArrayStruct
+{
+       /* this first group of fields never changes */
+       uint32          ring_buffer_size;       /* size of array */
+       uint32          compaction_threshold;   /* when should we remove subxids? */
+
+       slock_t         start_stop_mutex;       /* protects next group of fields */
+       uint64          start_pointer;          /* backends should start reading here */
+       uint64          stop_pointer;           /* backends should stop reading here */
+
+       slock_t         write_mutex;            /* protects next group of fields */
+       uint64          write_pointer;          /* last byte written + 1 */
+       uint64          num_wraparounds;        /* number of wraparounds */
+       uint64          num_writes;                     /* number of writes */
+
+       /* next group of fields is protected by SnapArrayLock */
+       uint32          last_summary_size;      /* # entries in last snapshot summary */
+       TransactionId   xmax_threshold; /* xmax at which to rewrite snapshot */
+       bool            compaction_requested;   /* snapshot compaction requested? */
+
+       TransactionId   buffer[FLEXIBLE_ARRAY_MEMBER];
+} SnapArrayStruct;
+
+int                    snapshot_buffers = -1;
+
+static SnapArrayStruct *SnapArray;
+static TransactionId   xid_cmp_base;
+
+/* Local cache of latest SnapArray data. */
+static TransactionId   *SnapArrayCache;
+static uint32          SnapArrayCacheSize;
+static uint32          SnapArrayCacheEntries;
+static uint64          SnapArrayCacheStartPointer;
+static uint64          SnapArrayCacheStopPointer;
+
+static void SnapArrayUpdateCache(void);
+static uint32 SnapArrayComputeRunningXids(TransactionId xmax,
+                                                       TransactionId new_xmax,
+                                                       uint32 num_running_xids,
+                                                       TransactionId *running_xids,
+                                                       uint32 num_removed_xids,
+                                                       TransactionId *removed_xids,
+                                                       uint32 num_new_running_xids,
+                                                       TransactionId *new_running_xids);
+static void SnapArrayWriteSnapshotSummary(TransactionId xmax,
+                                                                                 TransactionId latest_removed_subxid,
+                                                                                 uint32 nxids,
+                                                                                 TransactionId *running_xids);
+static void SnapArrayReadData(uint64 start_location, uint64 stop_location,
+                                 TransactionId *buffer);
+static void SnapArrayAdvanceWritePointer(uint64 nitems);
+static void SnapArrayWriteData(uint64 write_location, uint32 nitems,
+                                  TransactionId *item);
+static int xid_cmp(const void *a, const void *b);
+
+#define SNAPSHOT_SUMMARY_ITEMS         4
+#define SizeOfOneCompressedSnapshot() \
+       mul_size(sizeof(TransactionId), add_size(MaxBackends, \
+               SNAPSHOT_SUMMARY_ITEMS))
+#define BytesToKb(x) \
+       (((x) / 1024) + (((x) % 1024) ? 1 : 0))
+
+/*
+ * By default, we allow enough space in the snapshot array for 64 entries
+ * per backend.  The result is in kilobytes.
+ *
+ * This should not be called until MaxBackends has received its final value.
+ */
+static Size
+SnapArraySuggestedSize()
+{
+       Size    bytes = mul_size(SizeOfOneCompressedSnapshot(), 64);
+
+       return BytesToKb(bytes);
+}
+
+/*
+ * GUC check hook for snapshot_buffers.
+ */
+bool
+check_snapshot_buffers(int *newval, void **extra, GucSource source)
+{
+       Size    minimum;
+
+       /*
+        * -1 indicates a request for auto-tune.
+        */
+       if (*newval == -1)
+       {
+               /*
+                * If we haven't yet changed the boot_val default of -1, just leave
+                * it be.  We'll fix it later when SnapArrayShmemSize is called.
+                */
+               if (snapshot_buffers == -1)
+                       return true;
+               /* Otherwise, substitute the auto-tune value. */
+               *newval = SnapArraySuggestedSize();
+       }
+
+       /*
+        * We must have at least enough buffer entries for one compressed snapshot.
+        * Performance might be terrible due to frequent rewrites, compressions,
+        * and wraparounds, but anything less could potentially fail outright.
+        */
+       minimum = SizeOfOneCompressedSnapshot();
+       if (*newval < BytesToKb(minimum))
+               *newval = BytesToKb(minimum);
+       return true;
+}
+
+/*
+ * Initialization of shared memory for SnapArray.
+ */
+Size
+SnapArrayShmemSize(void)
+{
+       Size    size;
+
+       /*
+        * If the value of snapshot_buffers is -1, use the preferred auto-tune
+        * value.
+        */
+       if (snapshot_buffers == -1)
+       {
+               char buf[32];
+               snprintf(buf, sizeof(buf), UINT64_FORMAT,
+                                (uint64) SnapArraySuggestedSize());
+               SetConfigOption("snapshot_buffers", buf, PGC_POSTMASTER,
+                                               PGC_S_OVERRIDE);
+       }
+       Assert(snapshot_buffers > 0);
+
+       /* Work out size of array. */
+       size = offsetof(SnapArrayStruct, buffer);
+       size = add_size(size, mul_size(snapshot_buffers, 1024));
+
+       return size;
+}
+
+/*
+ * Initialize the shared PGPROC array during postmaster startup.
+ */
+void
+SnapArrayShmemInit(void)
+{
+       Size            size;
+       bool            found;
+
+       /* Create or attach to the ProcArray shared structure */
+       size = SnapArrayShmemSize();
+       SnapArray = (SnapArrayStruct *)
+               ShmemInitStruct("Snap Array", size, &found);
+
+       if (!found)
+       {
+               SnapArray->ring_buffer_size =
+                       snapshot_buffers * (1024 / sizeof(TransactionId));
+               SnapArray->compaction_threshold =
+                       Min(64 * MaxBackends, SnapArray->ring_buffer_size / 4);
+               SnapArray->start_pointer = 0;
+               SnapArray->stop_pointer = 0;
+               SnapArray->write_pointer = 0;
+               SnapArray->compaction_requested = false;
+               SnapArray->xmax_threshold = InvalidTransactionId;
+               SpinLockInit(&SnapArray->start_stop_mutex);
+               SpinLockInit(&SnapArray->write_mutex);
+       }
+
+       /* While we're at it, we initialize our backend-local cache. */
+       if (!CacheMemoryContext)
+               CreateCacheMemoryContext();
+       SnapArrayCacheEntries = 128;
+       SnapArrayCache = MemoryContextAlloc(CacheMemoryContext,
+                                                                               sizeof(TransactionId) *
+                                                                               SnapArrayCacheEntries);
+}
+
+/*
+ * Recovery is over; prepare for normal running!
+ */
+void
+SnapArrayInitNormalRunning(TransactionId xmax)
+{
+       /*
+        * This should be invoked after WAL replay is complete and before any
+        * write transactions are initiated, so in theory no one else can be
+        * holding the lock.  We acquire it anyway on principal.
+        */
+       LWLockAcquire(SnapArrayLock, LW_EXCLUSIVE);
+
+       /* XXX.  We need to carve out twophase XIDs here. */
+       SnapArrayWriteSnapshotSummary(xmax, InvalidTransactionId, 0, NULL);
+
+       LWLockRelease(SnapArrayLock);
+}
+
+/*
+ * Remove running XIDs.
+ *
+ * Note that there is no external interface to *add* running XIDs; that happens
+ * implicitly.  Removing an XID higher than the current xmax implicitly adds
+ * all the intermediate XIDs to the set.
+ */
+void
+SnapArrayRemoveRunningXids(TransactionId xid, int nchildren,
+                                                  TransactionId *children, TransactionId latest_xid)
+{
+       uint64          start_pointer;
+       uint64          stop_pointer;
+       uint64          write_pointer;
+       uint64          slots_used;
+       uint64          slots_remaining;
+       uint64          last_summary_size;
+
+       Assert(TransactionIdIsValid(xid));      /* only call this if XID assigned */
+       Assert(nchildren >= 0);                         /* number of children can't be < 0 */
+
+       /* Lock out concurrent writers. */
+       LWLockAcquire(SnapArrayLock, LW_EXCLUSIVE);
+
+       /* Compute used and remaining ring buffer slots. */
+       start_pointer = SnapArray->start_pointer;
+       stop_pointer = SnapArray->stop_pointer;
+       write_pointer = SnapArray->write_pointer;
+       last_summary_size = (uint64) SnapArray->last_summary_size;
+       Assert(stop_pointer == write_pointer);
+       slots_used = stop_pointer - start_pointer;
+       Assert(slots_used > 0 && slots_used >= last_summary_size);
+       slots_remaining = write_pointer - start_pointer;
+       Assert(slots_remaining < SnapArray->ring_buffer_size);
+
+       /*
+        * Decide whether to write a new snapshot summary in lieu of just
+        * inserting the newly-removed XIDs.  We do this when any of the
+        * following conditions hold:
+        *
+        * (1) The number of recently completed XIDs would exceed MaxBackends.
+        * (Reason: We don't want backends to need to process too many recently
+        * completed XIDs to derive a snapshot.)
+        *
+        * (2) The new xmax forced by the removal operation would exceed the
+        * threshold set by SnapArrayWriteSnapshotSummary.  (Reason: If the
+        * xmax value advances too far, backends will need to add many intermediate
+        * xids to their snapshots, which could be expensive in terms of both
+        * processing time and memory.
+        *
+        * (3) The number of remaining slots before wraparound is less than
+        * nxids.  (Reason: We can't overwrite the snapshot summary without
+        * writing a new one.  If we do, we're screwed.)
+        *
+        * (4) The compaction_requested flag is set, indicating that at least
+        * one other backend had to retry due to a wraparound condition.
+        * (Reason: The snapshot is evidently too large to be copied quickly
+        * enough.)
+        *
+        * We expect (1) to occur most frequently.  Condition (2) is likely to
+        * trigger only if xmax jumps forward abruptly (for example, because
+        * a transaction with many subtransactions suddenly allocates XIDs to
+        * all of them).  Condition (3) is a safeguard against disaster, but
+        * should be unlikely given any reasonable buffer size.  Condition (4)
+        * is not necessary for correctness, but seems prudent, and like (3)
+        * should only really be a risk with very small buffers.
+        */
+       if (slots_used + 1 + nchildren > last_summary_size + MaxBackends
+               || TransactionIdFollowsOrEquals(latest_xid, SnapArray->xmax_threshold)
+               || slots_remaining < 1 + nchildren
+               || SnapArray->compaction_requested)
+       {
+               uint32                  nentries;
+               TransactionId  *entries;
+               TransactionId   xmax;
+               TransactionId   new_xmax;
+               TransactionId   highest_removed_subxid;
+               uint32                  num_running_xids;
+               uint32                  num_removed_xids;
+               TransactionId  *running_xids;
+               TransactionId  *removed_xids;
+               TransactionId  *new_running_xids;
+               bool                    compact;
+               uint32                  xids_added;
+               uint32                  num_new_running_xids;
+               uint32                  compaction_threshold;
+               uint32                  certainly_removed_xids = 0;
+
+               /* Extract current data from array and append our new data. */
+               Assert(stop_pointer >= start_pointer + SNAPSHOT_SUMMARY_ITEMS);
+               Assert(start_pointer + last_summary_size <= stop_pointer);
+               nentries = (stop_pointer - start_pointer) + 1 + nchildren;
+               entries = palloc(sizeof(TransactionId) * nentries);
+               SnapArrayReadData(start_pointer, stop_pointer, entries);
+               entries[stop_pointer - start_pointer] = xid;
+               if (nchildren > 0)
+                       memcpy(&entries[(stop_pointer - start_pointer) + 1], children,
+                                  sizeof(TransactionId) * nchildren);
+
+               /* Data must begin with a snapshot summary. */
+               Assert(entries[0] == InvalidTransactionId);
+               xmax = entries[1];
+               highest_removed_subxid = entries[2];
+               num_running_xids = (uint32) entries[3];
+               Assert(last_summary_size == num_running_xids + SNAPSHOT_SUMMARY_ITEMS);
+               num_removed_xids = nentries - last_summary_size;
+               running_xids = entries + SNAPSHOT_SUMMARY_ITEMS;
+               removed_xids = running_xids + num_running_xids;
+
+               /* Sort the removed XIDs. */
+               xid_cmp_base = xmax;
+               qsort(removed_xids, num_removed_xids, sizeof(TransactionId), xid_cmp);
+
+               /* Work out new xmax value. */
+               new_xmax = removed_xids[num_removed_xids - 1];
+               if (TransactionIdPrecedes(new_xmax, xmax))
+                       new_xmax = xmax;
+
+               /* Work out number of new XIDs being added. */
+               if (new_xmax >= xmax)
+                       xids_added = new_xmax - xmax;
+               else
+                       xids_added = new_xmax - xmax - FirstNormalTransactionId;
+
+               /*
+                * Decide whether we need to compact away subxids.  This is a bit
+                * hairy.
+                *
+                * If the compaction_requested flag has been set, then we always
+                * compact.  Otherwise, the decision is based on the estimated size
+                * of the new snapshot summary relative to the compaction threshold.
+                * If no subxids have previously been compacted out the snapshot,
+                * then we know the exact numbers.  Otherwise, to be conservative,
+                * we assume that the none of the XIDs which precede
+                * highest_removed_subxids will actually be found among the running
+                * XIDs.  We could compute a more accurate answer, but it's not worth
+                * it.
+                */
+               compaction_threshold = SnapArray->compaction_threshold;
+               if (SnapArray->compaction_requested)
+                       compact = true;
+               else if (last_summary_size + xids_added
+                       > compaction_threshold + num_removed_xids)
+                       compact = true;
+               else if (!TransactionIdIsValid(highest_removed_subxid))
+                       compact = false;
+               else if (last_summary_size + xids_added < compaction_threshold)
+                       compact = false;
+               else
+               {
+                       uint32  low = 0;
+                       uint32  high = num_removed_xids;
+
+                       /* Binary search for certain-to-be-removable XIDs. */
+                       while (low < high)
+                       {
+                               uint32  middle = (low + high) / 2;
+                               if (TransactionIdFollows(removed_xids[middle],
+                                                                                highest_removed_subxid))
+                                       high = middle;
+                               else
+                                       low = middle + 1;
+                       }
+
+                       /* Decide whether we think it'll fit. */
+                       certainly_removed_xids = num_removed_xids - high;
+                       if (last_summary_size + xids_added
+                               > compaction_threshold + certainly_removed_xids)
+                               compact = true;
+                       else
+                               compact = false;
+               }
+
+               /* Construct new list of running XIDs. */
+               if (compact)
+               {
+                       /*
+                        * If we're compacting away subtransaction XIDs, then we obtain
+                        * the new list of running transaction IDs from the ProcArray.
+                        * There shouldn't be more than MaxBackends.
+                        *
+                        * XXX. What about prepared transactions???
+                        */
+                       new_running_xids = palloc(sizeof(TransactionId) * MaxBackends);
+                       num_new_running_xids =
+                               GetTopXids(new_xmax, MaxBackends, new_running_xids);
+                       if (num_new_running_xids > MaxBackends)
+                               elog(PANIC, "too many toplevel XIDs");
+
+                       /*
+                        * We could bound this more tightly, but for now we just punt.
+                        */
+                       highest_removed_subxid = new_xmax;
+                       TransactionIdRetreat(highest_removed_subxid);
+               }
+               else
+               {
+                       uint32          result;
+
+                       /*
+                        * Allocate space for new snapshot.
+                        */
+                       Assert(xids_added > certainly_removed_xids);
+                       num_new_running_xids = last_summary_size + xids_added
+                               - certainly_removed_xids;
+                       new_running_xids = palloc(sizeof(TransactionId)
+                               * num_new_running_xids);
+
+                       /*
+                        * Filter out the removed XIDs from the running XIDs, and add any
+                        * XIDs between the old and new xmax that aren't listed as removed.
+                        */
+                       result =
+                               SnapArrayComputeRunningXids(xmax, new_xmax,
+                                                                                       num_running_xids, running_xids,
+                                                                                       num_removed_xids, removed_xids,
+                                                                                       num_new_running_xids,
+                                                                                       new_running_xids);
+                       if (result > num_new_running_xids)
+                               elog(PANIC, "snapshot size calculation is bogus");
+                       num_new_running_xids = result;
+
+                       /*
+                        * If the highest removed subxid has aged out of the snapshot,
+                        * we clear the value.  Otherwise we might eventually have a
+                        * problem when the XID space wraps around.
+                        */
+                       if (num_new_running_xids == 0
+                               || TransactionIdPrecedes(highest_removed_subxid,
+                                                                         new_running_xids[0]))
+                               highest_removed_subxid = InvalidTransactionId;
+               }
+
+               /* Write the new snapshot. */
+               SnapArrayWriteSnapshotSummary(new_xmax, highest_removed_subxid,
+                                                                         num_new_running_xids, new_running_xids);
+
+               /* Free memory. */
+               pfree(new_running_xids);
+               pfree(entries);
+       }
+       else
+       {
+               /*
+                * We've decided not to insert a new snapshot summary, so just
+                * append our completed XIDs.
+                */
+
+               /* Advance write pointer. */
+               SnapArrayAdvanceWritePointer(1 + nchildren);
+
+               /* Write data.  Better not fail here... */
+               SnapArrayWriteData(write_pointer, 1, &xid);
+               if (nchildren > 0)
+                       SnapArrayWriteData(write_pointer + 1, nchildren, children);
+
+               /* Update stop pointer. */
+               SpinLockAcquire(&SnapArray->start_stop_mutex);
+               SnapArray->stop_pointer = write_pointer + 1 + nchildren;
+               SpinLockRelease(&SnapArray->start_stop_mutex);
+       }
+
+       /* We're done. */
+       LWLockRelease(SnapArrayLock);
+}
+
+/*
+ * Construct an MVCC snapshot.
+ */
+void
+SnapArrayGetSnapshotData(Snapshot snapshot)
+{
+       TransactionId   xmax;
+       TransactionId   new_xmax;
+       TransactionId   highest_removed_subxid;
+       uint32                  num_running_xids;
+       uint32                  num_removed_xids;
+       uint32                  num_new_running_xids;
+       TransactionId  *running_xids;
+       TransactionId  *removed_xids;
+       TransactionId  *new_running_xids;
+       uint32                  n;
+       uint32                  xids_added;
+       uint32                  certainly_removed_xids = 0;
+       bool                    needsort = false;
+
+       /* Get latest information from shared memory. */
+       SnapArrayUpdateCache();
+
+       /* Data must begin with a snapshot summary. */
+       Assert(SnapArrayCacheSize > SNAPSHOT_SUMMARY_ITEMS);
+       Assert(SnapArrayCache[0] == InvalidTransactionId);
+       xmax = SnapArrayCache[1];
+       highest_removed_subxid = SnapArrayCache[2];
+       num_running_xids = (uint32) SnapArrayCache[3];
+       num_removed_xids
+               = SnapArrayCacheSize - (num_running_xids + SNAPSHOT_SUMMARY_ITEMS);
+       running_xids = SnapArrayCache + SNAPSHOT_SUMMARY_ITEMS;
+       removed_xids = running_xids + num_running_xids;
+
+       /*
+        * Scan the removed XIDs.  This is enables us to work out the new xmax
+        * value, the number of XIDs we're certain to be able to remove from the
+        * running list (because they're newer than highest_removed_subxid), and
+        * whether or not the list of removed XIDs needs to be sorted.
+        */
+       new_xmax = xmax;
+       for (n = 0; n < num_removed_xids; ++n)
+       {
+               TransactionId   xid = removed_xids[n];
+
+               if (TransactionIdFollows(xid, new_xmax))
+                       new_xmax = removed_xids[n];
+               if (TransactionIdFollows(xid, highest_removed_subxid))
+                       ++certainly_removed_xids;
+               if (n > 0 && TransactionIdPrecedes(xid, removed_xids[n-1]))
+                       needsort = true;
+       }
+
+       /*
+        * Sort the removed XIDs (unless they are already in order).
+        *
+        * This is actually mutating the underlying cache, which is OK, because
+        * changing the order of the removed XIDs doesn't change the semantics.
+        * We skip this if the data is already in order, which could happen
+        * either because we've sorted the same data on a previous trip through
+        * this function, or because all removed XIDs added since our last visit
+        * were removed in ascending XID order.
+        *
+        * NB: Some quicksort implementations don't perform well on data that's
+        * already mostly or entirely sorted.  Skipping the sort in the case where
+        * the data is completely in order should ameliorate any problems in this
+        * area quite a bit, but we might need to pick another sort algorithm if
+        * this probes problematic.
+        */
+       if (needsort)
+       {
+               xid_cmp_base = xmax;
+               qsort(removed_xids, num_removed_xids, sizeof(TransactionId), xid_cmp);
+       }
+
+       /* Work out number of new XIDs being added. */
+       if (new_xmax >= xmax)
+               xids_added = new_xmax - xmax;
+       else
+               xids_added = new_xmax - xmax - FirstNormalTransactionId;
+
+       /*
+        * XXX.  The rest of this function is a fantasy pending reorganization
+        * of what goes into a snapshot.
+        */
+       num_new_running_xids =
+               num_running_xids + xids_added - certainly_removed_xids;
+       new_running_xids = palloc(sizeof(TransactionId) * num_new_running_xids);
+       num_new_running_xids =
+               SnapArrayComputeRunningXids(xmax, new_xmax,
+                                                                       num_running_xids, running_xids,
+                                                                       num_removed_xids, removed_xids,
+                                                                       num_new_running_xids, new_running_xids);
+}
+
+/*
+ * Make certain that the latest data from the shared SnapArray has been copied
+ * into our backend-private cache.  In general, this means that we must read
+ * the latest snapshot summary and any recently removed XIDs from shared
+ * memory, but we can optimize away duplicate reads of the same data.
+ */
+static void
+SnapArrayUpdateCache(void)
+{
+       uint64          start_pointer;
+       uint64          stop_pointer;
+       uint64          write_pointer;
+       uint64          delta;
+       uint32          skip = 0;
+
+       /*
+        * Read start and stop pointers.  Once we do this, the clock is ticking.
+        * We must finish reading any data we care about before the buffer wraps
+        * around.  That shouldn't be a big deal, since the buffer will normally
+        * be much larger than the amount of data we're copying, but we mustn't
+        * add any potentially slow operations after this point.
+        */
+       SpinLockAcquire(&SnapArray->start_stop_mutex);
+       stop_pointer = SnapArray->stop_pointer;
+       start_pointer = SnapArray->start_pointer;
+       SpinLockRelease(&SnapArray->start_stop_mutex);
+       Assert(start_pointer < stop_pointer);
+
+       /* If the stop pointer has not moved, we are done! */
+       if (stop_pointer == SnapArrayCacheStopPointer)
+               return;
+
+       /* If our local cache is not large enough to hold the data, grow it. */
+       delta = stop_pointer - start_pointer;
+       if (delta < SnapArrayCacheSize)
+       {
+               Assert(delta < UINT_MAX);
+               SnapArrayCache = repalloc(SnapArrayCache,
+                                                                 sizeof(TransactionId)
+                                                                 * delta);
+               SnapArrayCacheEntries = delta;
+       }
+
+       /* Copy the data. */
+       if (start_pointer == SnapArrayCacheStartPointer)
+       {
+               /* We only need to copy the newly added data. */
+               skip = stop_pointer - SnapArrayCacheStopPointer;
+               SnapArrayReadData(SnapArrayCacheStopPointer, stop_pointer,
+                                                 SnapArrayCache + skip);
+       }
+       else
+       {
+               /* We need to recopy all of the data. */
+               SnapArrayReadData(start_pointer, stop_pointer, SnapArrayCache);
+       }
+
+       /* Did we suffer a wraparound? */
+       SpinLockAcquire(&SnapArray->write_mutex);
+       write_pointer = SnapArray->write_pointer;
+       SpinLockRelease(&SnapArray->write_mutex);
+       if (write_pointer > start_pointer + skip + SnapArray->ring_buffer_size)
+       {
+               /*
+                * Oh, bummer.  We'll have to redo.  By acquiring the light-weight
+                * lock instead of the spinlock, we freeze out any concurrent updates,
+                * so there's no possibility of wraparound and no need to take
+                * the spinlock before reading the start and stop pointers.
+                * Unfortunately, this also reduces concurrency.  And it's double
+                * work, so we hope it won't happen often.
+                *
+                * To reduce the chances that some other backend will immediately
+                * hit the same problem, we set the compaction_requested flag.  This
+                * will cause the next backend that removes XIDs to remove subxids
+                * from the snapshot.
+                */
+               LWLockAcquire(SnapArrayLock, LW_SHARED);
+               stop_pointer = SnapArray->stop_pointer;
+               start_pointer = SnapArray->start_pointer;
+               delta = stop_pointer - start_pointer;
+               if (delta < SnapArrayCacheSize)
+               {
+                       Assert(delta < UINT_MAX);
+                       SnapArrayCache =
+                               repalloc(SnapArrayCache,
+                                                sizeof(TransactionId) * delta);
+                       SnapArrayCacheEntries = delta;
+               }
+               SnapArrayReadData(start_pointer, stop_pointer, SnapArrayCache);
+               SnapArray->compaction_requested = true;
+               LWLockRelease(SnapArrayLock);
+
+               /* Update statistics. */
+               SpinLockAcquire(&SnapArray->write_mutex);
+               SnapArray->num_wraparounds++;
+               SpinLockRelease(&SnapArray->write_mutex);
+       }
+
+       /* Bookkeeping. */
+       SnapArrayCacheSize = delta;
+       SnapArrayCacheStartPointer = start_pointer;
+       SnapArrayCacheStopPointer = stop_pointer;
+}
+
+/*
+ * Work out a new set of running XIDs given the existing set of running XIDs
+ * and a set of XIDs to be removed.
+ */
+static uint32
+SnapArrayComputeRunningXids(TransactionId xmax,
+                                                       TransactionId new_xmax,
+                                                       uint32 num_running_xids,
+                                                       TransactionId *running_xids,
+                                                       uint32 num_removed_xids,
+                                                       TransactionId *removed_xids,
+                                                       uint32 num_new_running_xids,
+                                                       TransactionId *new_running_xids)
+{
+       uint32          i;
+       uint32          n = 0;
+       uint32          r = 0;
+
+       /*
+        * Check each running XID to see whether its been removed.  This is
+        * basically a merge join: both XID lists are sorted.
+        */
+       for (i = 0; i < num_running_xids; ++i)
+       {
+               bool    match = false;
+
+               while (1)
+               {
+                       if (n >= num_removed_xids)
+                               break;
+                       if (TransactionIdEquals(running_xids[i], removed_xids[n]))
+                               match = true;
+                       if (TransactionIdPrecedesOrEquals(running_xids[i], removed_xids[n]))
+                               break;
+                       ++n;
+               }
+
+               if (!match)
+               {
+                       if (r < num_new_running_xids)
+                               new_running_xids[r] = running_xids[i];
+                       ++r;
+               }
+       }
+
+       /*
+        * Next, we have to add any XIDs between the old and new xmax that have
+        * not been removed.  Since the list of removed XIDs is sorted, we can do
+        * this in O(n+m) time, where n is the amount by which xmax has advanced
+        * and m is the number of removed XIDs greater than the old xmax.
+        */
+       TransactionIdAdvance(xmax);
+       while (TransactionIdPrecedes(xmax, new_xmax))
+       {
+               bool    match = false;
+
+               while (1)
+               {
+                       if (n >= num_removed_xids)
+                               break;
+                       if (TransactionIdEquals(xmax, removed_xids[n]))
+                               match = true;
+                       if (TransactionIdPrecedesOrEquals(xmax, removed_xids[n]))
+                               break;
+                       ++n;
+               }
+
+               if (!match)
+               {
+                       if (r < num_new_running_xids)
+                               new_running_xids[r] = xmax;
+                       ++r;
+               }
+
+               TransactionIdAdvance(xmax);
+       }
+
+       /* All done. */
+       return r;
+}
+
+/*
+ * Write a new snapshot summary record into the SnapArray buffer.
+ */
+static void
+SnapArrayWriteSnapshotSummary(TransactionId xmax,
+                                                         TransactionId latest_removed_subxid,
+                                                         uint32 nxids,
+                                                         TransactionId *running_xids)
+{
+       TransactionId   summary_info[SNAPSHOT_SUMMARY_ITEMS];
+       TransactionId   xmax_threshold;
+       uint64          write_pointer;
+
+       /*
+        * When this function is invoked from elsewhere within snaparray.c,
+        * the caller should arrange to compact the snapshot if required.  We
+        * include this last-ditch check mostly to protect against external
+        * callers.
+        */
+       if (nxids + SNAPSHOT_SUMMARY_ITEMS > SnapArray->ring_buffer_size)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                                errmsg("snapshot too large for snapshot_buffers")));
+
+       /*
+        * InvalidTransactionId is used as a sentinel value, to mark the beginning
+        * of a new summary record.
+        *
+        * We cheat a bit and store the number of transaction IDs in one of the
+        * transaction ID slots.  We could go the other way and cast all of the
+        * other values to uint32, but that would require a lot more adjustment
+        * if the size of a TransactionId were ever to change.
+        */
+       summary_info[0] = InvalidTransactionId;
+       summary_info[1] = xmax;
+       summary_info[2] = latest_removed_subxid;
+       summary_info[3] = (TransactionId) nxids;
+
+       /* Advance write pointer. */
+       write_pointer = SnapArray->write_pointer;
+       SnapArrayAdvanceWritePointer(SNAPSHOT_SUMMARY_ITEMS + nxids);
+
+       /* Write data.  Better not fail here... */
+       SnapArrayWriteData(write_pointer, SNAPSHOT_SUMMARY_ITEMS, summary_info);
+       SnapArrayWriteData(write_pointer + SNAPSHOT_SUMMARY_ITEMS,
+                                          nxids, running_xids);
+
+       /* Update start and stop pointers. */
+       SpinLockAcquire(&SnapArray->start_stop_mutex);
+       SnapArray->stop_pointer = write_pointer + SNAPSHOT_SUMMARY_ITEMS + nxids;
+       SnapArray->start_pointer = write_pointer;
+       SpinLockRelease(&SnapArray->start_stop_mutex);
+
+       /*
+        * SnapArrayRemoveRunningXids uses these to decide when to summarize.
+        * See comments in that function for details.
+        */
+       SnapArray->last_summary_size = SNAPSHOT_SUMMARY_ITEMS + nxids;
+       xmax_threshold = xmax + (4 * MaxBackends);
+       if (!TransactionIdIsNormal(xmax_threshold))
+               xmax_threshold = FirstNormalTransactionId;
+       SnapArray->xmax_threshold = xmax_threshold;
+}
+
+/*
+ * Read data from SnapArray, handling wraparound as needed.
+ *
+ * This does no synchronization at all and very minimal sanity checking.
+ * It's just here so that higher-level functions needn't duplicate the
+ * code to handle wraparound.
+ */
+static void
+SnapArrayReadData(uint64 start_location, uint64 stop_location,
+                                 TransactionId *buffer)
+{
+       uint64          nxids = stop_location - start_location;
+
+       Assert(nxids < SnapArray->ring_buffer_size);
+       Assert(nxids > 0);
+
+       start_location %= SnapArray->ring_buffer_size;
+       if (start_location + nxids <= (uint64) SnapArray->ring_buffer_size)
+               memcpy(buffer, &SnapArray->buffer[start_location],
+                          nxids * sizeof(TransactionId));
+       else
+       {
+               uint64          entries_before_wrap;
+
+               entries_before_wrap = SnapArray->ring_buffer_size - start_location;
+               memcpy(buffer, &SnapArray->buffer[start_location],
+                          entries_before_wrap * sizeof(TransactionId));
+               memcpy(buffer + entries_before_wrap, SnapArray->buffer,
+                          (nxids - entries_before_wrap) * sizeof(TransactionId));
+       }
+}
+
+/*
+ * Before beginning to write the data, writers must advance the write pointer,
+ * so that concurrent readers can detect whether the data they were busy
+ * reading may have been overwritten.
+ */
+static void
+SnapArrayAdvanceWritePointer(uint64 nitems)
+{
+       SpinLockAcquire(&SnapArray->write_mutex);
+       SnapArray->write_pointer += nitems;
+       SnapArray->num_writes++;
+       SpinLockRelease(&SnapArray->write_mutex);
+}
+
+/*
+ * Write data into SnapArray, handling wraparound as needed.
+ *
+ * This does no synchronization at all and very minimal sanity checking.
+ * It's just here so that higher-level functions needn't duplicate the
+ * code to handle wraparound.
+ */
+static void
+SnapArrayWriteData(uint64 write_location, uint32 nitems, TransactionId *item)
+{
+       write_location %= SnapArray->ring_buffer_size;
+       if (write_location + nitems <= SnapArray->ring_buffer_size)
+               memcpy(&SnapArray->buffer[write_location], item,
+                          nitems * sizeof(TransactionId));
+       else
+       {
+               uint64          entries_before_wrap;
+
+               /*
+                * The actual constraint is tighter than what this assertion checks,
+                * but it'll prevent us from scribbling all over shared memory if
+                * our caller screws up.
+                */
+               Assert(nitems <= SnapArray->ring_buffer_size);
+
+               entries_before_wrap = SnapArray->ring_buffer_size - write_location;
+               memcpy(&SnapArray->buffer[write_location], item,
+                          entries_before_wrap * sizeof(TransactionId));
+               memcpy(SnapArray->buffer, item + entries_before_wrap,
+                          (nitems - entries_before_wrap) * sizeof(TransactionId));
+       }
+}
+
+/*
+ * Helper routine for XID sorting.  XIDs aren't totally ordered, so we require
+ * xid_cmp_base to be set to an appropriate value before using this function.
+ * xids will be sorted by their distance ahead of or behind this point.
+ */
+static int
+xid_cmp(const void *a, const void *b)
+{
+       TransactionId  *xa = (TransactionId *) a;
+       TransactionId  *xb = (TransactionId *) b;
+       bool            af;
+       bool            bf;
+
+       Assert(TransactionIdIsNormal(xid_cmp_base));
+
+       if (*xa == *xb)
+               return 0;
+       af = TransactionIdFollows(*xa, xid_cmp_base);
+       bf = TransactionIdFollows(*xb, xid_cmp_base);
+       if (af && !bf)
+               return 1;
+       else if (bf && !af)
+               return -1;
+       else if (TransactionIdFollows(*xa, *xb))
+               return 1;
+       else
+               return -1;
+}
index f1d35a9a1129a9ca2da72d03a5aa4a15d08c4735..732263cccaa4c3cec04b1b4c6e637a837b65a755 100644 (file)
@@ -59,6 +59,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
+#include "storage/snaparray.h"
 #include "storage/standby.h"
 #include "storage/fd.h"
 #include "storage/predicate.h"
@@ -1618,6 +1619,23 @@ static struct config_int ConfigureNamesInt[] =
                check_temp_buffers, NULL, NULL
        },
 
+       /*
+        * Each entry in a snapshot buffer is a TransactionId, and the total
+        * number of such entries must fit within a 32-bit unsigned integer.
+        * Meanwhile, this entry is in units of kilobytes.  Hence, the
+        * odd-looking maximum value.
+        */
+       {
+               {"snapshot_buffers", PGC_POSTMASTER, RESOURCES_MEM,
+                       gettext_noop("Sets the amount of shared memory used for the snapshot buffer."),
+                       NULL,
+                       GUC_UNIT_KB
+               },
+               &snapshot_buffers,
+               -1, -1, UINT_MAX / (1024 / sizeof(TransactionId)),
+               check_snapshot_buffers, NULL, NULL
+       },
+
        {
                {"port", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
                        gettext_noop("Sets the TCP port the server listens on."),
index 67b098bd6d741848a5ed70028abc6466357d91ca..39d7c24691861cd7000ab5596ccdd5bf1a7215f2 100644 (file)
 # per transaction slot, plus lock space (see max_locks_per_transaction).
 # It is not advisable to set max_prepared_transactions nonzero unless you
 # actively intend to use prepared transactions.
+#snapshot_buffers = -1                 # -1 = auto-tune based on max_connections
 #work_mem = 1MB                                # min 64kB
 #maintenance_work_mem = 16MB           # min 1MB
 #max_stack_depth = 2MB                 # min 100kB
index 438a48d8dc99578af487e68a738e17898f91d1a2..4b60cd685c34bd2391810a97aa566983aa42fedf 100644 (file)
@@ -79,6 +79,7 @@ typedef enum LWLockId
        SerializablePredicateLockListLock,
        OldSerXidLock,
        SyncRepLock,
+       SnapArrayLock,
        /* Individual lock IDs end here */
        FirstBufMappingLock,
        FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
index a11d4385b7df448031c9a68517c2c94a2ef9fb06..6eb5eb674081be6d60f038255332df655d1cdc95 100644 (file)
@@ -68,5 +68,7 @@ extern bool CountOtherDBBackends(Oid databaseId,
 extern void XidCacheRemoveRunningXids(TransactionId xid,
                                                  int nxids, const TransactionId *xids,
                                                  TransactionId latestXid);
+extern uint32 GetTopXids(TransactionId xmax, uint32 nxids,
+                  TransactionId *xids);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/include/storage/snaparray.h b/src/include/storage/snaparray.h
new file mode 100644 (file)
index 0000000..e677ba7
--- /dev/null
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * snaparray.h
+ *       IPC infrastructure for MVCC snapshots.
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/snaparray.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SNAPARRAY_H
+#define SNAPARRAY_H
+
+extern Size SnapArrayShmemSize(void);
+extern void SnapArrayShmemInit(void);
+extern void SnapArrayInitNormalRunning(TransactionId xmax);
+extern void SnapArrayRemoveRunningXids(uint32 xid, int nchildren,
+                                                  TransactionId *children, TransactionId latest_xid);
+extern void SnapArrayGetSnapshotData(Snapshot snapshot);
+
+extern int     snapshot_buffers;
+
+#endif   /* SNAPARRAY_H */
index 8e3057a0140093f37c3fc28854e14d34b6c6bfca..66cb7c0d77608824d451f183004f181cb03d8d36 100644 (file)
@@ -378,4 +378,7 @@ extern void assign_search_path(const char *newval, void *extra);
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 
+/* in storage/ipc/snaparray.c */
+extern bool check_snapshot_buffers(int *newval, void **extra, GucSource source);
+
 #endif   /* GUC_H */