#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
+#include "storage/snaparray.h"
#include "utils/combocid.h"
#include "utils/guc.h"
#include "utils/inval.h"
SubTransactionId parentSubid);
static void CleanupTransaction(void);
static void CommitTransaction(void);
-static TransactionId RecordTransactionAbort(bool isSubXact);
+static void RecordTransactionAbort(bool isSubXact, TransactionId xid,
+ int nchildren, TransactionId *children);
static void StartTransaction(void);
static void StartSubTransaction(void);
/*
* RecordTransactionCommit
- *
- * Returns latest XID among xact and its children, or InvalidTransactionId
- * if the xact has no XID. (We compute that here just because it's easier.)
*/
-static TransactionId
-RecordTransactionCommit(void)
+static void
+RecordTransactionCommit(TransactionId xid, int nchildren,
+ TransactionId *children)
{
- TransactionId xid = GetTopTransactionIdIfAny();
bool markXidCommitted = TransactionIdIsValid(xid);
- TransactionId latestXid = InvalidTransactionId;
int nrels;
RelFileNode *rels;
- int nchildren;
- TransactionId *children;
int nmsgs = 0;
SharedInvalidationMessage *invalMessages = NULL;
bool RelcacheInitFileInval = false;
/* Get data needed for commit record */
nrels = smgrGetPendingDeletes(true, &rels);
- nchildren = xactGetCommittedChildren(&children);
if (XLogStandbyInfoActive())
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
&RelcacheInitFileInval);
END_CRIT_SECTION();
}
- /* Compute latestXid while we have the child XIDs handy */
- latestXid = TransactionIdLatest(xid, nchildren, children);
-
/*
* Wait for synchronous replication, if required.
*
/* Clean up local data */
if (rels)
pfree(rels);
-
- return latestXid;
}
/*
* RecordTransactionAbort
- *
- * Returns latest XID among xact and its children, or InvalidTransactionId
- * if the xact has no XID. (We compute that here just because it's easier.)
*/
-static TransactionId
-RecordTransactionAbort(bool isSubXact)
+static void
+RecordTransactionAbort(bool isSubXact, TransactionId xid, int nchildren,
+ TransactionId *children)
{
- TransactionId xid = GetCurrentTransactionIdIfAny();
- TransactionId latestXid;
int nrels;
RelFileNode *rels;
- int nchildren;
- TransactionId *children;
XLogRecData rdata[3];
int lastrdata = 0;
xl_xact_abort xlrec;
* If we haven't been assigned an XID, nobody will care whether we aborted
* or not. Hence, we're done in that case. It does not matter if we have
* rels to delete (note that this routine is not responsible for actually
- * deleting 'em). We cannot have any child XIDs, either.
+ * deleting 'em).
*/
if (!TransactionIdIsValid(xid))
{
/* Reset XactLastRecEnd until the next transaction writes something */
if (!isSubXact)
XactLastRecEnd.xrecoff = 0;
- return InvalidTransactionId;
+ return;
}
/*
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
- nchildren = xactGetCommittedChildren(&children);
/* XXX do we really need a critical section here? */
START_CRIT_SECTION();
END_CRIT_SECTION();
- /* Compute latestXid while we have the child XIDs handy */
- latestXid = TransactionIdLatest(xid, nchildren, children);
-
- /*
- * If we're aborting a subtransaction, we can immediately remove failed
- * XIDs from PGPROC's cache of running child XIDs. We do that here for
- * subxacts, because we already have the child XID array at hand. For
- * main xacts, the equivalent happens just after this function returns.
- */
- if (isSubXact)
- XidCacheRemoveRunningXids(xid, nchildren, children, latestXid);
-
/* Reset XactLastRecEnd until the next transaction writes something */
if (!isSubXact)
XactLastRecEnd.xrecoff = 0;
/* And clean up local data */
if (rels)
pfree(rels);
-
- return latestXid;
}
/*
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+ TransactionId xid;
+ int nchildren;
+ TransactionId *children;
ShowTransactionState("CommitTransaction");
*/
PreCommit_Notify();
+ /* Get XID information. */
+ xid = GetTopTransactionIdIfAny();
+ nchildren = xactGetCommittedChildren(&children);
+ latestXid = TransactionIdLatest(xid, nchildren, children);
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
/*
* Here is where we really truly commit.
*/
- latestXid = RecordTransactionCommit();
+ RecordTransactionCommit(xid, nchildren, children);
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
* RecordTransactionCommit.
*/
ProcArrayEndTransaction(MyProc, latestXid);
+ if (TransactionIdIsNormal(xid))
+ SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid);
/*
* This is all post-commit cleanup. Note that if an error is raised here,
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+ TransactionId xid;
+ TransactionId *children;
+ int nchildren;
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
AtAbort_Notify();
AtEOXact_RelationMap(false);
+ /* Get XID information. */
+ xid = GetCurrentTransactionIdIfAny();
+ nchildren = xactGetCommittedChildren(&children);
+ latestXid = TransactionIdLatest(xid, nchildren, children);
+
/*
* Advertise the fact that we aborted in pg_clog (assuming that we got as
* far as assigning an XID to advertise).
*/
- latestXid = RecordTransactionAbort(false);
+ RecordTransactionAbort(false, xid, nchildren, children);
TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
* RecordTransactionAbort.
*/
ProcArrayEndTransaction(MyProc, latestXid);
+ if (TransactionIdIsNormal(xid))
+ SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid);
/*
* Post-abort cleanup. See notes in CommitTransaction() concerning
*/
if (s->curTransactionOwner)
{
+ TransactionId latestXid;
+ TransactionId xid;
+ int nchildren;
+ TransactionId *children;
+
AfterTriggerEndSubXact(false);
AtSubAbort_Portals(s->subTransactionId,
s->parent->subTransactionId,
s->parent->subTransactionId);
AtSubAbort_Notify();
+ /* Get XID information. */
+ xid = GetCurrentTransactionIdIfAny();
+ nchildren = xactGetCommittedChildren(&children);
+ latestXid = TransactionIdLatest(xid, nchildren, children);
+
/* Advertise the fact that we aborted in pg_clog. */
- (void) RecordTransactionAbort(true);
+ (void) RecordTransactionAbort(true, xid, nchildren, children);
+
+ /*
+ * If we're aborting a subtransaction, we can immediately remove failed
+ * XIDs from PGPROC's cache of running child XIDs.
+ */
+ if (TransactionIdIsValid(xid))
+ {
+ XidCacheRemoveRunningXids(xid, nchildren, children, latestXid);
+ if (TransactionIdIsNormal(xid))
+ SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid);
+ }
/* Post-abort cleanup */
if (TransactionIdIsValid(s->transactionId))
--- /dev/null
+Snapshot Derivation
+===================
+
+An MVCC snapshot memorializes a point in the commit sequence. That is, the
+effects of any transaction which committed before the snapshot was taken will
+be visible to the snapshot. Transactions which abort, or which have not yet
+committed at the time the snapshot is taken, will not be visible to the
+snapshot. We need only be concerned about transactions to which an XID (or
+XIDs, if there are subtransactions) have been assigned; any changes made to
+the database which are not related to a particular XID are non-transactional
+and do not obey MVCC semantics anyway. Furthermore, we can always find out
+(from CLOG) whether or not an XID that is no longer running committed or
+aborted. Thus, an MVCC snapshot need only be capable of answering the
+following question: given an XID, had that XID ended (either by committing
+or aborting) at the time the snapshot was taken?
+
+The snaparray is responsible for providing backends with sufficient data
+to construct an MVCC snapshot that can answer this question. We
+periodically record in shared memory (1) the highest-numbered XID which had
+already ended at the time the snapshot was taken ("xmax"); and (2) the XIDs
+of all lower-numbered transactions which had not yet ended at the time the
+snapshot was taken ("running XIDs"). As a special case, transactions which
+are performing a lazy VACUUM operation can be excluded from the set of running
+XIDs, since they need not be seen as in-progress even when they are.
+
+It is important for performance to minimize the amount of work that must be
+done at transaction end. Since most transactions change the set
+of running XIDs only slightly (such as by removing themselves from that list),
+we provide a method for the xmax and set of running XIDs to be updated
+incrementally. We periodically record the current xmax and set of running
+XIDs in shared memory, and then track all transaction commits which happen
+subsequently. This provides enough information for backends to reconstruct
+a complete snapshot, as follows:
+
+1. If the largest newly completed XID is greater than the original xmax,
+then increase the xmax of the snapshot to that value, and add all XIDs
+between the old and new values to the set of running XIDs.
+
+2. Remove all of the newly completed XIDs from the set of running XIDs
+(except for the one which became the new xmax value, which is the only
+one that won't be listed).
+
+Each backend must process the list of completed XIDs every time it wishes
+to derive a snapshot, so we must not allow that list to get too long.
+Periodically, we (A) read the most recently written xmax, running XIDs, and
+newly completed XIDs; (B) perform the calculations described above to
+incrementally update these values; and then (C) write the updated xmax and
+list of running XIDs back to shared memory. This effectively truncates the
+list of newly completed XIDs.
+
+Note that this machinery doesn't require any action whatsoever at the time
+an XID is assigned. Newly assigned XIDs will always be greater than than
+any already-completed XID, and will be added to the set of running XIDs
+automatically when it is necessary to advance xmax.
+
+Lazy Vacuum
+===========
+
+XIDs allocated while performing a lazy vacuum don't need to be included in
+the set of running XIDs. Instead of marking them complete when the
+transaction ends, they are marked as complete when the transaction starts.
+Although this advances xmax sooner than technically necessary, it has a
+major advantage that makes it worth doing anyway: it prevents VACUUM from
+holding back the xmin horizon.
+
+Subtransactions and Overflow
+============================
+
+The algorithm described in the previous section could be problematic in the
+presence of large numbers of subtransactions, because the amount of space
+required to store the list of in-progress XIDs could grow very large. It's
+not really unbounded, because the XID space is only 32 bits wide, and at any
+given time half of those XIDs are in the past and half are in the future,
+which somewhat limits the effective range of XIDs that could be simultaneously
+in use. Moreover, since dead rows can't be reclaimed until they're no longer
+visible to any running transaction, performance would probably degenerate
+severely well before the system reaching these theoretical limits.
+Nonetheless, we can't assume that it will always be practical for a snapshot
+to include every still-running XID.
+
+To address this problem, we allow XIDs assigned to subtransactions to be
+removed from the list of running XIDs. Whenever we do this, we remember the
+highest XID so removed ("highest removed subxid"). If the snapshot receives
+an inquiry about an XID that might have been removed, it must consult
+pg_subtrans and look up the corresponding parent XID, if any. The visibility
+calculation can then be performed as normal using the parent XID, which is
+guaranteed to be present in the array if indeed it's still running. This
+check can be skipped if the XID is less than the oldest running XID ("xmin"),
+due to the rule that a subtransaction XID must always be greater than the XID
+of its parent transaction.
+
+Removing subtransaction XIDs from the set of running XIDs is a somewhat
+laborious process, so we hope it won't happen too often. We iterate over
+the ProcArray to get a list of all still-running toplevel XIDs, and then
+remove any XIDs not found there.
+
+Shared Memory Organization
+==========================
+
+The main shared memory structure used by the snaparray code is a ring buffer,
+which acts as a circular message buffer. We maintain three pointers into this
+ring buffer: (1) the location at which backends must begin reading to derive
+a valid snapshot ("start pointer"), (2) the location at which they may stop
+reading ("stop pointer"), and (3) the location beyond which no data has been
+writter ("write pointer"). To prevent overflow, these points are stored as
+64-bit integers and interpreted modulo the buffer size. There are two types
+of messages which can be stored in this buffer: snapshot summaries, and newly
+completed XIDs.
+
+Because the most common operation on this ring buffer is to record newly
+completed XIDs, and because such messages are typically small, often just
+a single XID, it is important that the representation of such messages be
+compact. Therefore, XID completion messages consist of just the XIDs
+themselves.
+
+To distinguish itself from an XID completion message, a snapshot summary
+begins with InvalidTransactionId, followed by the remaining data items,
+all as 4-byte quantities. The format in full is as follows:
+
+ InvalidTransactionId
+ xmax
+ highest removed subxid
+ number of running xids
+ running xid 1
+ ...
+ running xid n
+
+Write access to the ring buffer is serialized by SnapArrayLock; only one
+transaction (that has an XID) can end at a time. When ending, a transaction
+first may either write an XID completion message or may choose (if the distance
+between the start pointer and the new stop pointer seems like it's getting too
+large) to instead write a new snapshot summary.
+
+Reads from the ring buffer can proceed in parallel with other reads, and
+generally with writes. However, if the ring buffer wraps around before a
+read is complete, then the data is no good and the operation must be retried.
+This is obviously undesirable from a performance perspective, but should be
+rare with a ring buffer of adequate size. Under typical conditions, a
+wraparound would require dozens if not hundreds of commits to complete
+before the backend taking its snapshot can complete its memcpy(). However,
+if it does happen, we recover by taking SnapArrayLock in shared mode and
+repeating the operation. To minimize the chances of further wraparounds,
+whenever we do this, we also set a flag requesting that the next backend
+to commit compress subtransaction IDs out of the snapshot.
+
+There is one further fly in the ointment. Since the start pointer, stop
+pointer, and write pointer are 64-bit integers, we can't assume that they
+can be read and written atomically on all platforms. Thus, in general,
+access to these variables must be protected by spinlocks. (On platforms
+where 8 byte loads and stores are atomic, we could use these facilities,
+together with memory barriers, in lieu of the spinlocks, which would be
+advantageous on machines with many CPU cores. But we don't support that
+yet.)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * snaparray.c
+ * IPC infrastructure for MVCC snapshots.
+ *
+ * Backends frequently (often many times per transaction) require a list
+ * of all currently-running XIDs in order to construct an MVCC snapshot.
+ * Due to the frequency of this operation, it must be performed with
+ * minimal locking. Even shared lwlocks can be problematic, as the
+ * spinlock protecting the state of the LWLock will become a contention
+ * bottleneck on machines with 32+ CPU cores.
+ *
+ * We store snapshot-related information in a shared array called the
+ * snaparray, which acts as a ring buffer. The management of this ring
+ * buffer is documented in src/backend/storage/ipc/README.snaparray.
+ *
+ * During hot standby, we instead need a list of XIDs that were running
+ * on the master as of the current point in the WAL stream. We manage
+ * this list using the same machinery that's used on the primary, except
+ * that the array is updated through WAL replay rather than directly by
+ * individual backends.
+ *
+ * It is perhaps possible for a backend on the master to terminate without
+ * writing an abort record for its transaction (e.g. if the master crashed
+ * unexpectedly). That would tie up snaparray slots indefinitely, so the
+ * master periodically transmits snapshot information to the standby to
+ * make the necessary pruning possible.
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/ipc/snaparray.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/snaparray.h"
+#include "utils/catcache.h"
+#include "utils/guc.h"
+
+typedef struct SnapArrayStruct
+{
+ /* this first group of fields never changes */
+ uint32 ring_buffer_size; /* size of array */
+ uint32 compaction_threshold; /* when should we remove subxids? */
+
+ slock_t start_stop_mutex; /* protects next group of fields */
+ uint64 start_pointer; /* backends should start reading here */
+ uint64 stop_pointer; /* backends should stop reading here */
+
+ slock_t write_mutex; /* protects next group of fields */
+ uint64 write_pointer; /* last byte written + 1 */
+ uint64 num_wraparounds; /* number of wraparounds */
+ uint64 num_writes; /* number of writes */
+
+ /* next group of fields is protected by SnapArrayLock */
+ uint32 last_summary_size; /* # entries in last snapshot summary */
+ TransactionId xmax_threshold; /* xmax at which to rewrite snapshot */
+ bool compaction_requested; /* snapshot compaction requested? */
+
+ TransactionId buffer[FLEXIBLE_ARRAY_MEMBER];
+} SnapArrayStruct;
+
+int snapshot_buffers = -1;
+
+static SnapArrayStruct *SnapArray;
+static TransactionId xid_cmp_base;
+
+/* Local cache of latest SnapArray data. */
+static TransactionId *SnapArrayCache;
+static uint32 SnapArrayCacheSize;
+static uint32 SnapArrayCacheEntries;
+static uint64 SnapArrayCacheStartPointer;
+static uint64 SnapArrayCacheStopPointer;
+
+static void SnapArrayUpdateCache(void);
+static uint32 SnapArrayComputeRunningXids(TransactionId xmax,
+ TransactionId new_xmax,
+ uint32 num_running_xids,
+ TransactionId *running_xids,
+ uint32 num_removed_xids,
+ TransactionId *removed_xids,
+ uint32 num_new_running_xids,
+ TransactionId *new_running_xids);
+static void SnapArrayWriteSnapshotSummary(TransactionId xmax,
+ TransactionId latest_removed_subxid,
+ uint32 nxids,
+ TransactionId *running_xids);
+static void SnapArrayReadData(uint64 start_location, uint64 stop_location,
+ TransactionId *buffer);
+static void SnapArrayAdvanceWritePointer(uint64 nitems);
+static void SnapArrayWriteData(uint64 write_location, uint32 nitems,
+ TransactionId *item);
+static int xid_cmp(const void *a, const void *b);
+
+#define SNAPSHOT_SUMMARY_ITEMS 4
+#define SizeOfOneCompressedSnapshot() \
+ mul_size(sizeof(TransactionId), add_size(MaxBackends, \
+ SNAPSHOT_SUMMARY_ITEMS))
+#define BytesToKb(x) \
+ (((x) / 1024) + (((x) % 1024) ? 1 : 0))
+
+/*
+ * By default, we allow enough space in the snapshot array for 64 entries
+ * per backend. The result is in kilobytes.
+ *
+ * This should not be called until MaxBackends has received its final value.
+ */
+static Size
+SnapArraySuggestedSize()
+{
+ Size bytes = mul_size(SizeOfOneCompressedSnapshot(), 64);
+
+ return BytesToKb(bytes);
+}
+
+/*
+ * GUC check hook for snapshot_buffers.
+ */
+bool
+check_snapshot_buffers(int *newval, void **extra, GucSource source)
+{
+ Size minimum;
+
+ /*
+ * -1 indicates a request for auto-tune.
+ */
+ if (*newval == -1)
+ {
+ /*
+ * If we haven't yet changed the boot_val default of -1, just leave
+ * it be. We'll fix it later when SnapArrayShmemSize is called.
+ */
+ if (snapshot_buffers == -1)
+ return true;
+ /* Otherwise, substitute the auto-tune value. */
+ *newval = SnapArraySuggestedSize();
+ }
+
+ /*
+ * We must have at least enough buffer entries for one compressed snapshot.
+ * Performance might be terrible due to frequent rewrites, compressions,
+ * and wraparounds, but anything less could potentially fail outright.
+ */
+ minimum = SizeOfOneCompressedSnapshot();
+ if (*newval < BytesToKb(minimum))
+ *newval = BytesToKb(minimum);
+ return true;
+}
+
+/*
+ * Initialization of shared memory for SnapArray.
+ */
+Size
+SnapArrayShmemSize(void)
+{
+ Size size;
+
+ /*
+ * If the value of snapshot_buffers is -1, use the preferred auto-tune
+ * value.
+ */
+ if (snapshot_buffers == -1)
+ {
+ char buf[32];
+ snprintf(buf, sizeof(buf), UINT64_FORMAT,
+ (uint64) SnapArraySuggestedSize());
+ SetConfigOption("snapshot_buffers", buf, PGC_POSTMASTER,
+ PGC_S_OVERRIDE);
+ }
+ Assert(snapshot_buffers > 0);
+
+ /* Work out size of array. */
+ size = offsetof(SnapArrayStruct, buffer);
+ size = add_size(size, mul_size(snapshot_buffers, 1024));
+
+ return size;
+}
+
+/*
+ * Initialize the shared PGPROC array during postmaster startup.
+ */
+void
+SnapArrayShmemInit(void)
+{
+ Size size;
+ bool found;
+
+ /* Create or attach to the ProcArray shared structure */
+ size = SnapArrayShmemSize();
+ SnapArray = (SnapArrayStruct *)
+ ShmemInitStruct("Snap Array", size, &found);
+
+ if (!found)
+ {
+ SnapArray->ring_buffer_size =
+ snapshot_buffers * (1024 / sizeof(TransactionId));
+ SnapArray->compaction_threshold =
+ Min(64 * MaxBackends, SnapArray->ring_buffer_size / 4);
+ SnapArray->start_pointer = 0;
+ SnapArray->stop_pointer = 0;
+ SnapArray->write_pointer = 0;
+ SnapArray->compaction_requested = false;
+ SnapArray->xmax_threshold = InvalidTransactionId;
+ SpinLockInit(&SnapArray->start_stop_mutex);
+ SpinLockInit(&SnapArray->write_mutex);
+ }
+
+ /* While we're at it, we initialize our backend-local cache. */
+ if (!CacheMemoryContext)
+ CreateCacheMemoryContext();
+ SnapArrayCacheEntries = 128;
+ SnapArrayCache = MemoryContextAlloc(CacheMemoryContext,
+ sizeof(TransactionId) *
+ SnapArrayCacheEntries);
+}
+
+/*
+ * Recovery is over; prepare for normal running!
+ */
+void
+SnapArrayInitNormalRunning(TransactionId xmax)
+{
+ /*
+ * This should be invoked after WAL replay is complete and before any
+ * write transactions are initiated, so in theory no one else can be
+ * holding the lock. We acquire it anyway on principal.
+ */
+ LWLockAcquire(SnapArrayLock, LW_EXCLUSIVE);
+
+ /* XXX. We need to carve out twophase XIDs here. */
+ SnapArrayWriteSnapshotSummary(xmax, InvalidTransactionId, 0, NULL);
+
+ LWLockRelease(SnapArrayLock);
+}
+
+/*
+ * Remove running XIDs.
+ *
+ * Note that there is no external interface to *add* running XIDs; that happens
+ * implicitly. Removing an XID higher than the current xmax implicitly adds
+ * all the intermediate XIDs to the set.
+ */
+void
+SnapArrayRemoveRunningXids(TransactionId xid, int nchildren,
+ TransactionId *children, TransactionId latest_xid)
+{
+ uint64 start_pointer;
+ uint64 stop_pointer;
+ uint64 write_pointer;
+ uint64 slots_used;
+ uint64 slots_remaining;
+ uint64 last_summary_size;
+
+ Assert(TransactionIdIsValid(xid)); /* only call this if XID assigned */
+ Assert(nchildren >= 0); /* number of children can't be < 0 */
+
+ /* Lock out concurrent writers. */
+ LWLockAcquire(SnapArrayLock, LW_EXCLUSIVE);
+
+ /* Compute used and remaining ring buffer slots. */
+ start_pointer = SnapArray->start_pointer;
+ stop_pointer = SnapArray->stop_pointer;
+ write_pointer = SnapArray->write_pointer;
+ last_summary_size = (uint64) SnapArray->last_summary_size;
+ Assert(stop_pointer == write_pointer);
+ slots_used = stop_pointer - start_pointer;
+ Assert(slots_used > 0 && slots_used >= last_summary_size);
+ slots_remaining = write_pointer - start_pointer;
+ Assert(slots_remaining < SnapArray->ring_buffer_size);
+
+ /*
+ * Decide whether to write a new snapshot summary in lieu of just
+ * inserting the newly-removed XIDs. We do this when any of the
+ * following conditions hold:
+ *
+ * (1) The number of recently completed XIDs would exceed MaxBackends.
+ * (Reason: We don't want backends to need to process too many recently
+ * completed XIDs to derive a snapshot.)
+ *
+ * (2) The new xmax forced by the removal operation would exceed the
+ * threshold set by SnapArrayWriteSnapshotSummary. (Reason: If the
+ * xmax value advances too far, backends will need to add many intermediate
+ * xids to their snapshots, which could be expensive in terms of both
+ * processing time and memory.
+ *
+ * (3) The number of remaining slots before wraparound is less than
+ * nxids. (Reason: We can't overwrite the snapshot summary without
+ * writing a new one. If we do, we're screwed.)
+ *
+ * (4) The compaction_requested flag is set, indicating that at least
+ * one other backend had to retry due to a wraparound condition.
+ * (Reason: The snapshot is evidently too large to be copied quickly
+ * enough.)
+ *
+ * We expect (1) to occur most frequently. Condition (2) is likely to
+ * trigger only if xmax jumps forward abruptly (for example, because
+ * a transaction with many subtransactions suddenly allocates XIDs to
+ * all of them). Condition (3) is a safeguard against disaster, but
+ * should be unlikely given any reasonable buffer size. Condition (4)
+ * is not necessary for correctness, but seems prudent, and like (3)
+ * should only really be a risk with very small buffers.
+ */
+ if (slots_used + 1 + nchildren > last_summary_size + MaxBackends
+ || TransactionIdFollowsOrEquals(latest_xid, SnapArray->xmax_threshold)
+ || slots_remaining < 1 + nchildren
+ || SnapArray->compaction_requested)
+ {
+ uint32 nentries;
+ TransactionId *entries;
+ TransactionId xmax;
+ TransactionId new_xmax;
+ TransactionId highest_removed_subxid;
+ uint32 num_running_xids;
+ uint32 num_removed_xids;
+ TransactionId *running_xids;
+ TransactionId *removed_xids;
+ TransactionId *new_running_xids;
+ bool compact;
+ uint32 xids_added;
+ uint32 num_new_running_xids;
+ uint32 compaction_threshold;
+ uint32 certainly_removed_xids = 0;
+
+ /* Extract current data from array and append our new data. */
+ Assert(stop_pointer >= start_pointer + SNAPSHOT_SUMMARY_ITEMS);
+ Assert(start_pointer + last_summary_size <= stop_pointer);
+ nentries = (stop_pointer - start_pointer) + 1 + nchildren;
+ entries = palloc(sizeof(TransactionId) * nentries);
+ SnapArrayReadData(start_pointer, stop_pointer, entries);
+ entries[stop_pointer - start_pointer] = xid;
+ if (nchildren > 0)
+ memcpy(&entries[(stop_pointer - start_pointer) + 1], children,
+ sizeof(TransactionId) * nchildren);
+
+ /* Data must begin with a snapshot summary. */
+ Assert(entries[0] == InvalidTransactionId);
+ xmax = entries[1];
+ highest_removed_subxid = entries[2];
+ num_running_xids = (uint32) entries[3];
+ Assert(last_summary_size == num_running_xids + SNAPSHOT_SUMMARY_ITEMS);
+ num_removed_xids = nentries - last_summary_size;
+ running_xids = entries + SNAPSHOT_SUMMARY_ITEMS;
+ removed_xids = running_xids + num_running_xids;
+
+ /* Sort the removed XIDs. */
+ xid_cmp_base = xmax;
+ qsort(removed_xids, num_removed_xids, sizeof(TransactionId), xid_cmp);
+
+ /* Work out new xmax value. */
+ new_xmax = removed_xids[num_removed_xids - 1];
+ if (TransactionIdPrecedes(new_xmax, xmax))
+ new_xmax = xmax;
+
+ /* Work out number of new XIDs being added. */
+ if (new_xmax >= xmax)
+ xids_added = new_xmax - xmax;
+ else
+ xids_added = new_xmax - xmax - FirstNormalTransactionId;
+
+ /*
+ * Decide whether we need to compact away subxids. This is a bit
+ * hairy.
+ *
+ * If the compaction_requested flag has been set, then we always
+ * compact. Otherwise, the decision is based on the estimated size
+ * of the new snapshot summary relative to the compaction threshold.
+ * If no subxids have previously been compacted out the snapshot,
+ * then we know the exact numbers. Otherwise, to be conservative,
+ * we assume that the none of the XIDs which precede
+ * highest_removed_subxids will actually be found among the running
+ * XIDs. We could compute a more accurate answer, but it's not worth
+ * it.
+ */
+ compaction_threshold = SnapArray->compaction_threshold;
+ if (SnapArray->compaction_requested)
+ compact = true;
+ else if (last_summary_size + xids_added
+ > compaction_threshold + num_removed_xids)
+ compact = true;
+ else if (!TransactionIdIsValid(highest_removed_subxid))
+ compact = false;
+ else if (last_summary_size + xids_added < compaction_threshold)
+ compact = false;
+ else
+ {
+ uint32 low = 0;
+ uint32 high = num_removed_xids;
+
+ /* Binary search for certain-to-be-removable XIDs. */
+ while (low < high)
+ {
+ uint32 middle = (low + high) / 2;
+ if (TransactionIdFollows(removed_xids[middle],
+ highest_removed_subxid))
+ high = middle;
+ else
+ low = middle + 1;
+ }
+
+ /* Decide whether we think it'll fit. */
+ certainly_removed_xids = num_removed_xids - high;
+ if (last_summary_size + xids_added
+ > compaction_threshold + certainly_removed_xids)
+ compact = true;
+ else
+ compact = false;
+ }
+
+ /* Construct new list of running XIDs. */
+ if (compact)
+ {
+ /*
+ * If we're compacting away subtransaction XIDs, then we obtain
+ * the new list of running transaction IDs from the ProcArray.
+ * There shouldn't be more than MaxBackends.
+ *
+ * XXX. What about prepared transactions???
+ */
+ new_running_xids = palloc(sizeof(TransactionId) * MaxBackends);
+ num_new_running_xids =
+ GetTopXids(new_xmax, MaxBackends, new_running_xids);
+ if (num_new_running_xids > MaxBackends)
+ elog(PANIC, "too many toplevel XIDs");
+
+ /*
+ * We could bound this more tightly, but for now we just punt.
+ */
+ highest_removed_subxid = new_xmax;
+ TransactionIdRetreat(highest_removed_subxid);
+ }
+ else
+ {
+ uint32 result;
+
+ /*
+ * Allocate space for new snapshot.
+ */
+ Assert(xids_added > certainly_removed_xids);
+ num_new_running_xids = last_summary_size + xids_added
+ - certainly_removed_xids;
+ new_running_xids = palloc(sizeof(TransactionId)
+ * num_new_running_xids);
+
+ /*
+ * Filter out the removed XIDs from the running XIDs, and add any
+ * XIDs between the old and new xmax that aren't listed as removed.
+ */
+ result =
+ SnapArrayComputeRunningXids(xmax, new_xmax,
+ num_running_xids, running_xids,
+ num_removed_xids, removed_xids,
+ num_new_running_xids,
+ new_running_xids);
+ if (result > num_new_running_xids)
+ elog(PANIC, "snapshot size calculation is bogus");
+ num_new_running_xids = result;
+
+ /*
+ * If the highest removed subxid has aged out of the snapshot,
+ * we clear the value. Otherwise we might eventually have a
+ * problem when the XID space wraps around.
+ */
+ if (num_new_running_xids == 0
+ || TransactionIdPrecedes(highest_removed_subxid,
+ new_running_xids[0]))
+ highest_removed_subxid = InvalidTransactionId;
+ }
+
+ /* Write the new snapshot. */
+ SnapArrayWriteSnapshotSummary(new_xmax, highest_removed_subxid,
+ num_new_running_xids, new_running_xids);
+
+ /* Free memory. */
+ pfree(new_running_xids);
+ pfree(entries);
+ }
+ else
+ {
+ /*
+ * We've decided not to insert a new snapshot summary, so just
+ * append our completed XIDs.
+ */
+
+ /* Advance write pointer. */
+ SnapArrayAdvanceWritePointer(1 + nchildren);
+
+ /* Write data. Better not fail here... */
+ SnapArrayWriteData(write_pointer, 1, &xid);
+ if (nchildren > 0)
+ SnapArrayWriteData(write_pointer + 1, nchildren, children);
+
+ /* Update stop pointer. */
+ SpinLockAcquire(&SnapArray->start_stop_mutex);
+ SnapArray->stop_pointer = write_pointer + 1 + nchildren;
+ SpinLockRelease(&SnapArray->start_stop_mutex);
+ }
+
+ /* We're done. */
+ LWLockRelease(SnapArrayLock);
+}
+
+/*
+ * Construct an MVCC snapshot.
+ */
+void
+SnapArrayGetSnapshotData(Snapshot snapshot)
+{
+ TransactionId xmax;
+ TransactionId new_xmax;
+ TransactionId highest_removed_subxid;
+ uint32 num_running_xids;
+ uint32 num_removed_xids;
+ uint32 num_new_running_xids;
+ TransactionId *running_xids;
+ TransactionId *removed_xids;
+ TransactionId *new_running_xids;
+ uint32 n;
+ uint32 xids_added;
+ uint32 certainly_removed_xids = 0;
+ bool needsort = false;
+
+ /* Get latest information from shared memory. */
+ SnapArrayUpdateCache();
+
+ /* Data must begin with a snapshot summary. */
+ Assert(SnapArrayCacheSize > SNAPSHOT_SUMMARY_ITEMS);
+ Assert(SnapArrayCache[0] == InvalidTransactionId);
+ xmax = SnapArrayCache[1];
+ highest_removed_subxid = SnapArrayCache[2];
+ num_running_xids = (uint32) SnapArrayCache[3];
+ num_removed_xids
+ = SnapArrayCacheSize - (num_running_xids + SNAPSHOT_SUMMARY_ITEMS);
+ running_xids = SnapArrayCache + SNAPSHOT_SUMMARY_ITEMS;
+ removed_xids = running_xids + num_running_xids;
+
+ /*
+ * Scan the removed XIDs. This is enables us to work out the new xmax
+ * value, the number of XIDs we're certain to be able to remove from the
+ * running list (because they're newer than highest_removed_subxid), and
+ * whether or not the list of removed XIDs needs to be sorted.
+ */
+ new_xmax = xmax;
+ for (n = 0; n < num_removed_xids; ++n)
+ {
+ TransactionId xid = removed_xids[n];
+
+ if (TransactionIdFollows(xid, new_xmax))
+ new_xmax = removed_xids[n];
+ if (TransactionIdFollows(xid, highest_removed_subxid))
+ ++certainly_removed_xids;
+ if (n > 0 && TransactionIdPrecedes(xid, removed_xids[n-1]))
+ needsort = true;
+ }
+
+ /*
+ * Sort the removed XIDs (unless they are already in order).
+ *
+ * This is actually mutating the underlying cache, which is OK, because
+ * changing the order of the removed XIDs doesn't change the semantics.
+ * We skip this if the data is already in order, which could happen
+ * either because we've sorted the same data on a previous trip through
+ * this function, or because all removed XIDs added since our last visit
+ * were removed in ascending XID order.
+ *
+ * NB: Some quicksort implementations don't perform well on data that's
+ * already mostly or entirely sorted. Skipping the sort in the case where
+ * the data is completely in order should ameliorate any problems in this
+ * area quite a bit, but we might need to pick another sort algorithm if
+ * this probes problematic.
+ */
+ if (needsort)
+ {
+ xid_cmp_base = xmax;
+ qsort(removed_xids, num_removed_xids, sizeof(TransactionId), xid_cmp);
+ }
+
+ /* Work out number of new XIDs being added. */
+ if (new_xmax >= xmax)
+ xids_added = new_xmax - xmax;
+ else
+ xids_added = new_xmax - xmax - FirstNormalTransactionId;
+
+ /*
+ * XXX. The rest of this function is a fantasy pending reorganization
+ * of what goes into a snapshot.
+ */
+ num_new_running_xids =
+ num_running_xids + xids_added - certainly_removed_xids;
+ new_running_xids = palloc(sizeof(TransactionId) * num_new_running_xids);
+ num_new_running_xids =
+ SnapArrayComputeRunningXids(xmax, new_xmax,
+ num_running_xids, running_xids,
+ num_removed_xids, removed_xids,
+ num_new_running_xids, new_running_xids);
+}
+
+/*
+ * Make certain that the latest data from the shared SnapArray has been copied
+ * into our backend-private cache. In general, this means that we must read
+ * the latest snapshot summary and any recently removed XIDs from shared
+ * memory, but we can optimize away duplicate reads of the same data.
+ */
+static void
+SnapArrayUpdateCache(void)
+{
+ uint64 start_pointer;
+ uint64 stop_pointer;
+ uint64 write_pointer;
+ uint64 delta;
+ uint32 skip = 0;
+
+ /*
+ * Read start and stop pointers. Once we do this, the clock is ticking.
+ * We must finish reading any data we care about before the buffer wraps
+ * around. That shouldn't be a big deal, since the buffer will normally
+ * be much larger than the amount of data we're copying, but we mustn't
+ * add any potentially slow operations after this point.
+ */
+ SpinLockAcquire(&SnapArray->start_stop_mutex);
+ stop_pointer = SnapArray->stop_pointer;
+ start_pointer = SnapArray->start_pointer;
+ SpinLockRelease(&SnapArray->start_stop_mutex);
+ Assert(start_pointer < stop_pointer);
+
+ /* If the stop pointer has not moved, we are done! */
+ if (stop_pointer == SnapArrayCacheStopPointer)
+ return;
+
+ /* If our local cache is not large enough to hold the data, grow it. */
+ delta = stop_pointer - start_pointer;
+ if (delta < SnapArrayCacheSize)
+ {
+ Assert(delta < UINT_MAX);
+ SnapArrayCache = repalloc(SnapArrayCache,
+ sizeof(TransactionId)
+ * delta);
+ SnapArrayCacheEntries = delta;
+ }
+
+ /* Copy the data. */
+ if (start_pointer == SnapArrayCacheStartPointer)
+ {
+ /* We only need to copy the newly added data. */
+ skip = stop_pointer - SnapArrayCacheStopPointer;
+ SnapArrayReadData(SnapArrayCacheStopPointer, stop_pointer,
+ SnapArrayCache + skip);
+ }
+ else
+ {
+ /* We need to recopy all of the data. */
+ SnapArrayReadData(start_pointer, stop_pointer, SnapArrayCache);
+ }
+
+ /* Did we suffer a wraparound? */
+ SpinLockAcquire(&SnapArray->write_mutex);
+ write_pointer = SnapArray->write_pointer;
+ SpinLockRelease(&SnapArray->write_mutex);
+ if (write_pointer > start_pointer + skip + SnapArray->ring_buffer_size)
+ {
+ /*
+ * Oh, bummer. We'll have to redo. By acquiring the light-weight
+ * lock instead of the spinlock, we freeze out any concurrent updates,
+ * so there's no possibility of wraparound and no need to take
+ * the spinlock before reading the start and stop pointers.
+ * Unfortunately, this also reduces concurrency. And it's double
+ * work, so we hope it won't happen often.
+ *
+ * To reduce the chances that some other backend will immediately
+ * hit the same problem, we set the compaction_requested flag. This
+ * will cause the next backend that removes XIDs to remove subxids
+ * from the snapshot.
+ */
+ LWLockAcquire(SnapArrayLock, LW_SHARED);
+ stop_pointer = SnapArray->stop_pointer;
+ start_pointer = SnapArray->start_pointer;
+ delta = stop_pointer - start_pointer;
+ if (delta < SnapArrayCacheSize)
+ {
+ Assert(delta < UINT_MAX);
+ SnapArrayCache =
+ repalloc(SnapArrayCache,
+ sizeof(TransactionId) * delta);
+ SnapArrayCacheEntries = delta;
+ }
+ SnapArrayReadData(start_pointer, stop_pointer, SnapArrayCache);
+ SnapArray->compaction_requested = true;
+ LWLockRelease(SnapArrayLock);
+
+ /* Update statistics. */
+ SpinLockAcquire(&SnapArray->write_mutex);
+ SnapArray->num_wraparounds++;
+ SpinLockRelease(&SnapArray->write_mutex);
+ }
+
+ /* Bookkeeping. */
+ SnapArrayCacheSize = delta;
+ SnapArrayCacheStartPointer = start_pointer;
+ SnapArrayCacheStopPointer = stop_pointer;
+}
+
+/*
+ * Work out a new set of running XIDs given the existing set of running XIDs
+ * and a set of XIDs to be removed.
+ */
+static uint32
+SnapArrayComputeRunningXids(TransactionId xmax,
+ TransactionId new_xmax,
+ uint32 num_running_xids,
+ TransactionId *running_xids,
+ uint32 num_removed_xids,
+ TransactionId *removed_xids,
+ uint32 num_new_running_xids,
+ TransactionId *new_running_xids)
+{
+ uint32 i;
+ uint32 n = 0;
+ uint32 r = 0;
+
+ /*
+ * Check each running XID to see whether its been removed. This is
+ * basically a merge join: both XID lists are sorted.
+ */
+ for (i = 0; i < num_running_xids; ++i)
+ {
+ bool match = false;
+
+ while (1)
+ {
+ if (n >= num_removed_xids)
+ break;
+ if (TransactionIdEquals(running_xids[i], removed_xids[n]))
+ match = true;
+ if (TransactionIdPrecedesOrEquals(running_xids[i], removed_xids[n]))
+ break;
+ ++n;
+ }
+
+ if (!match)
+ {
+ if (r < num_new_running_xids)
+ new_running_xids[r] = running_xids[i];
+ ++r;
+ }
+ }
+
+ /*
+ * Next, we have to add any XIDs between the old and new xmax that have
+ * not been removed. Since the list of removed XIDs is sorted, we can do
+ * this in O(n+m) time, where n is the amount by which xmax has advanced
+ * and m is the number of removed XIDs greater than the old xmax.
+ */
+ TransactionIdAdvance(xmax);
+ while (TransactionIdPrecedes(xmax, new_xmax))
+ {
+ bool match = false;
+
+ while (1)
+ {
+ if (n >= num_removed_xids)
+ break;
+ if (TransactionIdEquals(xmax, removed_xids[n]))
+ match = true;
+ if (TransactionIdPrecedesOrEquals(xmax, removed_xids[n]))
+ break;
+ ++n;
+ }
+
+ if (!match)
+ {
+ if (r < num_new_running_xids)
+ new_running_xids[r] = xmax;
+ ++r;
+ }
+
+ TransactionIdAdvance(xmax);
+ }
+
+ /* All done. */
+ return r;
+}
+
+/*
+ * Write a new snapshot summary record into the SnapArray buffer.
+ */
+static void
+SnapArrayWriteSnapshotSummary(TransactionId xmax,
+ TransactionId latest_removed_subxid,
+ uint32 nxids,
+ TransactionId *running_xids)
+{
+ TransactionId summary_info[SNAPSHOT_SUMMARY_ITEMS];
+ TransactionId xmax_threshold;
+ uint64 write_pointer;
+
+ /*
+ * When this function is invoked from elsewhere within snaparray.c,
+ * the caller should arrange to compact the snapshot if required. We
+ * include this last-ditch check mostly to protect against external
+ * callers.
+ */
+ if (nxids + SNAPSHOT_SUMMARY_ITEMS > SnapArray->ring_buffer_size)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("snapshot too large for snapshot_buffers")));
+
+ /*
+ * InvalidTransactionId is used as a sentinel value, to mark the beginning
+ * of a new summary record.
+ *
+ * We cheat a bit and store the number of transaction IDs in one of the
+ * transaction ID slots. We could go the other way and cast all of the
+ * other values to uint32, but that would require a lot more adjustment
+ * if the size of a TransactionId were ever to change.
+ */
+ summary_info[0] = InvalidTransactionId;
+ summary_info[1] = xmax;
+ summary_info[2] = latest_removed_subxid;
+ summary_info[3] = (TransactionId) nxids;
+
+ /* Advance write pointer. */
+ write_pointer = SnapArray->write_pointer;
+ SnapArrayAdvanceWritePointer(SNAPSHOT_SUMMARY_ITEMS + nxids);
+
+ /* Write data. Better not fail here... */
+ SnapArrayWriteData(write_pointer, SNAPSHOT_SUMMARY_ITEMS, summary_info);
+ SnapArrayWriteData(write_pointer + SNAPSHOT_SUMMARY_ITEMS,
+ nxids, running_xids);
+
+ /* Update start and stop pointers. */
+ SpinLockAcquire(&SnapArray->start_stop_mutex);
+ SnapArray->stop_pointer = write_pointer + SNAPSHOT_SUMMARY_ITEMS + nxids;
+ SnapArray->start_pointer = write_pointer;
+ SpinLockRelease(&SnapArray->start_stop_mutex);
+
+ /*
+ * SnapArrayRemoveRunningXids uses these to decide when to summarize.
+ * See comments in that function for details.
+ */
+ SnapArray->last_summary_size = SNAPSHOT_SUMMARY_ITEMS + nxids;
+ xmax_threshold = xmax + (4 * MaxBackends);
+ if (!TransactionIdIsNormal(xmax_threshold))
+ xmax_threshold = FirstNormalTransactionId;
+ SnapArray->xmax_threshold = xmax_threshold;
+}
+
+/*
+ * Read data from SnapArray, handling wraparound as needed.
+ *
+ * This does no synchronization at all and very minimal sanity checking.
+ * It's just here so that higher-level functions needn't duplicate the
+ * code to handle wraparound.
+ */
+static void
+SnapArrayReadData(uint64 start_location, uint64 stop_location,
+ TransactionId *buffer)
+{
+ uint64 nxids = stop_location - start_location;
+
+ Assert(nxids < SnapArray->ring_buffer_size);
+ Assert(nxids > 0);
+
+ start_location %= SnapArray->ring_buffer_size;
+ if (start_location + nxids <= (uint64) SnapArray->ring_buffer_size)
+ memcpy(buffer, &SnapArray->buffer[start_location],
+ nxids * sizeof(TransactionId));
+ else
+ {
+ uint64 entries_before_wrap;
+
+ entries_before_wrap = SnapArray->ring_buffer_size - start_location;
+ memcpy(buffer, &SnapArray->buffer[start_location],
+ entries_before_wrap * sizeof(TransactionId));
+ memcpy(buffer + entries_before_wrap, SnapArray->buffer,
+ (nxids - entries_before_wrap) * sizeof(TransactionId));
+ }
+}
+
+/*
+ * Before beginning to write the data, writers must advance the write pointer,
+ * so that concurrent readers can detect whether the data they were busy
+ * reading may have been overwritten.
+ */
+static void
+SnapArrayAdvanceWritePointer(uint64 nitems)
+{
+ SpinLockAcquire(&SnapArray->write_mutex);
+ SnapArray->write_pointer += nitems;
+ SnapArray->num_writes++;
+ SpinLockRelease(&SnapArray->write_mutex);
+}
+
+/*
+ * Write data into SnapArray, handling wraparound as needed.
+ *
+ * This does no synchronization at all and very minimal sanity checking.
+ * It's just here so that higher-level functions needn't duplicate the
+ * code to handle wraparound.
+ */
+static void
+SnapArrayWriteData(uint64 write_location, uint32 nitems, TransactionId *item)
+{
+ write_location %= SnapArray->ring_buffer_size;
+ if (write_location + nitems <= SnapArray->ring_buffer_size)
+ memcpy(&SnapArray->buffer[write_location], item,
+ nitems * sizeof(TransactionId));
+ else
+ {
+ uint64 entries_before_wrap;
+
+ /*
+ * The actual constraint is tighter than what this assertion checks,
+ * but it'll prevent us from scribbling all over shared memory if
+ * our caller screws up.
+ */
+ Assert(nitems <= SnapArray->ring_buffer_size);
+
+ entries_before_wrap = SnapArray->ring_buffer_size - write_location;
+ memcpy(&SnapArray->buffer[write_location], item,
+ entries_before_wrap * sizeof(TransactionId));
+ memcpy(SnapArray->buffer, item + entries_before_wrap,
+ (nitems - entries_before_wrap) * sizeof(TransactionId));
+ }
+}
+
+/*
+ * Helper routine for XID sorting. XIDs aren't totally ordered, so we require
+ * xid_cmp_base to be set to an appropriate value before using this function.
+ * xids will be sorted by their distance ahead of or behind this point.
+ */
+static int
+xid_cmp(const void *a, const void *b)
+{
+ TransactionId *xa = (TransactionId *) a;
+ TransactionId *xb = (TransactionId *) b;
+ bool af;
+ bool bf;
+
+ Assert(TransactionIdIsNormal(xid_cmp_base));
+
+ if (*xa == *xb)
+ return 0;
+ af = TransactionIdFollows(*xa, xid_cmp_base);
+ bf = TransactionIdFollows(*xb, xid_cmp_base);
+ if (af && !bf)
+ return 1;
+ else if (bf && !af)
+ return -1;
+ else if (TransactionIdFollows(*xa, *xb))
+ return 1;
+ else
+ return -1;
+}