from a tuple's xmin or xmax, for example, we always call
 SubTransGetTopmostTransaction() before doing much else with it.
 
+    * PostgreSQL does not use "update in place" with a rollback log
+for its MVCC implementation.  Where possible it uses "HOT" updates on
+the same page (if there is room and no indexed value is changed).
+For non-HOT updates the old tuple is expired in place and a new tuple
+is inserted at a new location.  Because of this difference, a tuple
+lock in PostgreSQL doesn't automatically lock any other versions of a
+row.  We don't try to copy or expand a tuple lock to any other
+versions of the row, based on the following proof that any additional
+serialization failures we would get from that would be false
+positives:
+
+          o If transaction T1 reads a row (thus acquiring a predicate
+lock on it) and a second transaction T2 updates that row, must a
+third transaction T3 which updates the new version of the row have a
+rw-conflict in from T1 to prevent anomalies?  In other words, does it
+matter whether this edge T1 -> T3 is there?
+
+          o If T1 has a conflict in, it certainly doesn't. Adding the
+edge T1 -> T3 would create a dangerous structure, but we already had
+one from the edge T1 -> T2, so we would have aborted something
+anyway.
+
+          o Now let's consider the case where T1 doesn't have a
+conflict in. If that's the case, for this edge T1 -> T3 to make a
+difference, T3 must have a rw-conflict out that induces a cycle in
+the dependency graph, i.e. a conflict out to some transaction
+preceding T1 in the serial order. (A conflict out to T1 would work
+too, but that would mean T1 has a conflict in and we would have
+rolled back.)
+
+          o So now we're trying to figure out if there can be an
+rw-conflict edge T3 -> T0, where T0 is some transaction that precedes
+T1. For T0 to precede T1, there has to be has to be some edge, or
+sequence of edges, from T0 to T1. At least the last edge has to be a
+wr-dependency or ww-dependency rather than a rw-conflict, because T1
+doesn't have a rw-conflict in. And that gives us enough information
+about the order of transactions to see that T3 can't have a
+rw-dependency to T0:
+ - T0 committed before T1 started (the wr/ww-dependency implies this)
+ - T1 started before T2 committed (the T1->T2 rw-conflict implies this)
+ - T2 committed before T3 started (otherwise, T3 would be aborted
+                                   because of an update conflict)
+
+          o That means T0 committed before T3 started, and therefore
+there can't be a rw-conflict from T3 to T0.
+
+          o In both cases, we didn't need the T1 -> T3 edge.
+
     * Predicate locking in PostgreSQL will start at the tuple level
 when possible, with automatic conversion of multiple fine-grained
 locks to coarser granularity as need to avoid resource exhaustion.
 
  *                                                        BlockNumber newblkno);
  *             PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
  *                                                              BlockNumber newblkno);
- *             PredicateLockTupleRowVersionLink(const Relation relation,
- *                                                                              const HeapTuple oldTuple,
- *                                                                              const HeapTuple newTuple)
  *             ReleasePredicateLocks(bool isCommit)
  *
  * conflict detection (may also trigger rollback)
        PredicateLockAcquire(&tag);
 }
 
-/*
- * If the old tuple has any predicate locks, copy them to the new target.
- *
- * This is called at an UPDATE, where any predicate locks held on the old
- * tuple need to be copied to the new tuple, because logically they both
- * represent the same row. A lock taken before the update must conflict
- * with anyone locking the same row after the update.
- */
-void
-PredicateLockTupleRowVersionLink(const Relation relation,
-                                                                const HeapTuple oldTuple,
-                                                                const HeapTuple newTuple)
-{
-       PREDICATELOCKTARGETTAG oldtupletag;
-       PREDICATELOCKTARGETTAG oldpagetag;
-       PREDICATELOCKTARGETTAG newtupletag;
-       BlockNumber oldblk,
-                               newblk;
-       OffsetNumber oldoff,
-                               newoff;
-       TransactionId oldxmin,
-                               newxmin;
-
-       /*
-        * Bail out quickly if there are no serializable transactions
-        * running.
-        *
-        * It's safe to do this check without taking any additional
-        * locks. Even if a serializable transaction starts concurrently,
-        * we know it can't take any SIREAD locks on the modified tuple
-        * because the caller is holding the associated buffer page lock.
-        * Memory reordering isn't an issue; the memory barrier in the
-        * LWLock acquisition guarantees that this read occurs while the
-        * buffer page lock is held.
-        */
-       if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
-               return;
-
-       oldblk = ItemPointerGetBlockNumber(&(oldTuple->t_self));
-       oldoff = ItemPointerGetOffsetNumber(&(oldTuple->t_self));
-       oldxmin = HeapTupleHeaderGetXmin(oldTuple->t_data);
-
-       newblk = ItemPointerGetBlockNumber(&(newTuple->t_self));
-       newoff = ItemPointerGetOffsetNumber(&(newTuple->t_self));
-       newxmin = HeapTupleHeaderGetXmin(newTuple->t_data);
-
-       SET_PREDICATELOCKTARGETTAG_TUPLE(oldtupletag,
-                                                                        relation->rd_node.dbNode,
-                                                                        relation->rd_id,
-                                                                        oldblk,
-                                                                        oldoff,
-                                                                        oldxmin);
-
-       SET_PREDICATELOCKTARGETTAG_PAGE(oldpagetag,
-                                                                       relation->rd_node.dbNode,
-                                                                       relation->rd_id,
-                                                                       oldblk);
-
-       SET_PREDICATELOCKTARGETTAG_TUPLE(newtupletag,
-                                                                        relation->rd_node.dbNode,
-                                                                        relation->rd_id,
-                                                                        newblk,
-                                                                        newoff,
-                                                                        newxmin);
-
-       /*
-        * A page-level lock on the page containing the old tuple counts too.
-        * Anyone holding a lock on the page is logically holding a lock on the
-        * old tuple, so we need to acquire a lock on his behalf on the new tuple
-        * too. However, if the new tuple is on the same page as the old one, the
-        * old page-level lock already covers the new tuple.
-        *
-        * A relation-level lock always covers both tuple versions, so we don't
-        * need to worry about those here.
-        */
-       LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
-
-       TransferPredicateLocksToNewTarget(oldtupletag, newtupletag, false);
-       if (newblk != oldblk)
-               TransferPredicateLocksToNewTarget(oldpagetag, newtupletag, false);
-
-       LWLockRelease(SerializablePredicateLockListLock);
-}
-
 
 /*
  *             DeleteLockTarget
 
        /*
         * Bail out quickly if there are no serializable transactions
-        * running. As with PredicateLockTupleRowVersionLink, it's safe to
-        * check this without taking locks because the caller is holding
-        * the buffer page lock.
+        * running.
+        *
+        * It's safe to do this check without taking any additional
+        * locks. Even if a serializable transaction starts concurrently,
+        * we know it can't take any SIREAD locks on the page being split
+        * because the caller is holding the associated buffer page lock.
+        * Memory reordering isn't an issue; the memory barrier in the
+        * LWLock acquisition guarantees that this read occurs while the
+        * buffer page lock is held.
         */
        if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
                return;
 }
 
 /*
- * Check whether we should roll back one of these transactions
- * instead of flagging a new rw-conflict.
+ * We are about to add a RW-edge to the dependency graph - check that we don't
+ * introduce a dangerous structure by doing so, and abort one of the
+ * transactions if so.
+ *
+ * A serialization failure can only occur if there is a dangerous structure
+ * in the dependency graph:
+ *
+ *             Tin ------> Tpivot ------> Tout
+ *                       rw                     rw
+ *
+ * Furthermore, Tout must commit first.
+ *
+ * One more optimization is that if Tin is declared READ ONLY (or commits
+ * without writing), we can only have a problem if Tout committed before Tin
+ * acquired its snapshot.
  */
 static void
 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
        failure = false;
 
        /*
-        * Check for already-committed writer with rw-conflict out flagged. This
-        * means that the reader must immediately fail.
+        * Check for already-committed writer with rw-conflict out flagged
+        * (conflict-flag on W means that T2 committed before W):
+        *
+        *              R ------> W ------> T2
+        *                      rw        rw
+        *
+        * That is a dangerous structure, so we must abort. (Since the writer
+        * has already committed, we must be the reader)
         */
        if (SxactIsCommitted(writer)
          && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
                failure = true;
 
        /*
-        * Check whether the reader has become a pivot with a committed writer. If
-        * so, we must roll back unless every in-conflict either committed before
-        * the writer committed or is READ ONLY and overlaps the writer.
+        * Check whether the writer has become a pivot with an out-conflict
+        * committed transaction (T2), and T2 committed first:
+        *
+        *              R ------> W ------> T2
+        *                      rw        rw
+        *
+        * Because T2 must've committed first, there is no anomaly if:
+        * - the reader committed before T2
+        * - the writer committed before T2
+        * - the reader is a READ ONLY transaction and the reader was not
+        *   concurrent with T2 (= reader acquired its snapshot after T2 committed)
         */
-       if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
+       if (!failure)
        {
-               if (SxactHasSummaryConflictIn(reader))
+               if (SxactHasSummaryConflictOut(writer))
                {
                        failure = true;
                        conflict = NULL;
                }
                else
                        conflict = (RWConflict)
-                               SHMQueueNext(&reader->inConflicts,
-                                                        &reader->inConflicts,
-                                                        offsetof(RWConflictData, inLink));
+                               SHMQueueNext(&writer->outConflicts,
+                                                        &writer->outConflicts,
+                                                        offsetof(RWConflictData, outLink));
                while (conflict)
                {
-                       if (!SxactIsRolledBack(conflict->sxactOut)
-                               && (!SxactIsCommitted(conflict->sxactOut)
-                                       || conflict->sxactOut->commitSeqNo >= writer->commitSeqNo)
-                               && (!SxactIsReadOnly(conflict->sxactOut)
-                                       || conflict->sxactOut->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
+                       SERIALIZABLEXACT *t2 = conflict->sxactIn;
+
+                       if (SxactIsCommitted(t2)
+                               && (!SxactIsCommitted(reader)
+                                       || t2->commitSeqNo <= reader->commitSeqNo)
+                               && (!SxactIsCommitted(writer)
+                                       || t2->commitSeqNo <= writer->commitSeqNo)
+                               && (!SxactIsReadOnly(reader)
+                                       || t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
                        {
                                failure = true;
                                break;
                        }
                        conflict = (RWConflict)
-                               SHMQueueNext(&reader->inConflicts,
-                                                        &conflict->inLink,
-                                                        offsetof(RWConflictData, inLink));
+                               SHMQueueNext(&writer->outConflicts,
+                                                        &conflict->outLink,
+                                                        offsetof(RWConflictData, outLink));
                }
        }
 
        /*
-        * Check whether the writer has become a pivot with an out-conflict
-        * committed transaction, while neither reader nor writer is committed. If
-        * the reader is a READ ONLY transaction, there is only a serialization
-        * failure if an out-conflict transaction causing the pivot committed
-        * before the reader acquired its snapshot.  (That is, the reader must not
-        * have been concurrent with the out-conflict transaction.)
+        * Check whether the reader has become a pivot with a committed writer:
+        *
+        *              T0 ------> R ------> W
+        *                       rw        rw
+        *
+        * Because W must've committed first for an anomaly to occur, there is no
+        * anomaly if:
+        * - T0 committed before the writer
+        * - T0 is READ ONLY, and overlaps the writer
         */
-       if (!failure && !SxactIsCommitted(writer))
+       if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
        {
-               if (SxactHasSummaryConflictOut(reader))
+               if (SxactHasSummaryConflictIn(reader))
                {
                        failure = true;
                        conflict = NULL;
                }
                else
                        conflict = (RWConflict)
-                               SHMQueueNext(&writer->outConflicts,
-                                                        &writer->outConflicts,
-                                                        offsetof(RWConflictData, outLink));
+                               SHMQueueNext(&reader->inConflicts,
+                                                        &reader->inConflicts,
+                                                        offsetof(RWConflictData, inLink));
                while (conflict)
                {
-                       if ((reader == conflict->sxactIn && SxactIsCommitted(reader))
-                               || (SxactIsCommitted(conflict->sxactIn)
-                                       && !SxactIsCommitted(reader)
-                                       && (!SxactIsReadOnly(reader)
-                                               || conflict->sxactIn->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot)))
+                       SERIALIZABLEXACT *t0 = conflict->sxactOut;
+
+                       if (!SxactIsRolledBack(t0)
+                               && (!SxactIsCommitted(t0)
+                                       || t0->commitSeqNo >= writer->commitSeqNo)
+                               && (!SxactIsReadOnly(t0)
+                                       || t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
                        {
                                failure = true;
                                break;
                        }
                        conflict = (RWConflict)
-                               SHMQueueNext(&writer->outConflicts,
-                                                        &conflict->outLink,
-                                                        offsetof(RWConflictData, outLink));
+                               SHMQueueNext(&reader->inConflicts,
+                                                        &conflict->inLink,
+                                                        offsetof(RWConflictData, inLink));
                }
        }
 
        if (failure)
        {
+               /*
+                * We have to kill a transaction to avoid a possible anomaly from
+                * occurring. If the writer is us, we can just ereport() to cause
+                * a transaction abort. Otherwise we flag the writer for termination,
+                * causing it to abort when it tries to commit. However, if the writer
+                * is a prepared transaction, already prepared, we can't abort it
+                * anymore, so we have to kill the reader instead.
+                */
                if (MySerializableXact == writer)
                {
                        LWLockRelease(SerializableXactHashLock);
                        ereport(ERROR,
                                        (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                         errmsg("could not serialize access due to read/write dependencies among transactions"),
-                       errdetail("Cancelled on identification as pivot, during write."),
+                       errdetail("Cancelled on identification as a pivot, during write."),
                                         errhint("The transaction might succeed if retried.")));
                }
                else if (SxactIsPrepared(writer))
                {
                        LWLockRelease(SerializableXactHashLock);
+
+                       /* if we're not the writer, we have to be the reader */
+                       Assert(MySerializableXact == reader);
                        ereport(ERROR,
                                        (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                         errmsg("could not serialize access due to read/write dependencies among transactions"),