</sect1>
 
-<sect1 id="gist-recovery">
- <title>Crash Recovery</title>
-
- <para>
-  Usually, replay of the WAL log is sufficient to restore the integrity
-  of a GiST index following a database crash.  However, there are some
-  corner cases in which the index state is not fully rebuilt.  The index
-  will still be functionally correct, but there might be some performance
-  degradation.  When this occurs, the index can be repaired by
-  <command>VACUUM</>ing its table, or by rebuilding the index using
-  <command>REINDEX</>.  In some cases a plain <command>VACUUM</> is
-  not sufficient, and either <command>VACUUM FULL</> or <command>REINDEX</>
-  is needed.  The need for one of these procedures is indicated by occurrence
-  of this log message during crash recovery:
-<programlisting>
-LOG:  index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery
-</programlisting>
-  or this log message during routine index insertions:
-<programlisting>
-LOG:  index "FOO" needs VACUUM or REINDEX to finish crash recovery
-</programlisting>
-  If a plain <command>VACUUM</> finds itself unable to complete recovery
-  fully, it will return a notice:
-<programlisting>
-NOTICE:  index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery
-</programlisting>
- </para>
-</sect1>
-
 </chapter>
 
 the node splitting algorithm; method Union is used for propagating changes
 upward to maintain the tree properties.
 
-NOTICE: We modified original INSERT algorithm for performance reason. In
-particularly, it is now a single-pass algorithm.
-
-Function findLeaf is used to identify subtree for insertion. Page, in which
-insertion is proceeded, is locked as well as its parent page. Functions
-findParent and findPath are used to find parent pages, which could be changed
-because of concurrent access. Function pageSplit is recurrent and could split
-page by more than 2 pages, which could be necessary if keys have different
-lengths or more than one key are inserted (in such situation, user defined
-function pickSplit cannot guarantee free space on page).
-
-findLeaf(new-key)
-   push(stack, [root, 0]) //page, LSN
-   while(true)
-       ptr = top of stack
-       latch( ptr->page, S-mode )
-       ptr->lsn = ptr->page->lsn
-       if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn )
-           unlatch( ptr->page )
-           pop stack
-       else if ( ptr->page is not leaf )
-           push( stack, [get_best_child(ptr->page, new-key), 0] )
-           unlatch( ptr->page )
-       else
-           unlatch( ptr->page )
-           latch( ptr->page, X-mode )
-           if ( ptr->page is not leaf )
-               //the only root page can become a non-leaf
-               unlatch( ptr->page )
-           else if ( ptr->parent->lsn < ptr->page->nsn )
-               unlatch( ptr->page )
-               pop stack
-           else
-               return stack
-           end
-       end
-   end
+To insert a tuple, we first have to find a suitable leaf page to insert to.
+The algorithm walks down the tree, starting from the root, along the path
+of smallest Penalty. At each step:
+
+1. Has this page been split since we looked at the parent? If so, it's
+possible that we should be inserting to the other half instead, so retreat
+back to the parent.
+2. If this is a leaf node, we've found our target node.
+3. Otherwise use Penalty to pick a new target subtree.
+4. Check the key representing the target subtree. If it doesn't already cover
+the key we're inserting, replace it with the Union of the old downlink key
+and the key being inserted. (Actually, we always call Union, and just skip
+the replacement if the Unioned key is the same as the existing key)
+5. Replacing the key in step 4 might cause the page to be split. In that case,
+propagate the change upwards and restart the algorithm from the first parent
+that didn't need to be split.
+6. Walk down to the target subtree, and goto 1.
+
+This differs from the insertion algorithm in the original paper. In the
+original paper, you first walk down the tree until you reach a leaf page, and
+then you adjust the downlink in the parent, and propagating the adjustment up,
+all the way up to the root in the worst case. But we adjust the downlinks to
+cover the new key already when we walk down, so that when we reach the leaf
+page, we don't need to update the parents anymore, except to insert the
+downlinks if we have to split the page. This makes crash recovery simpler:
+after inserting a key to the page, the tree is immediately self-consistent
+without having to update the parents. Even if we split a page and crash before
+inserting the downlink to the parent, the tree is self-consistent because the
+right half of the split is accessible via the rightlink of the left page
+(which replaced the original page).
+
+Note that the algorithm can walk up and down the tree before reaching a leaf
+page, if internal pages need to split while adjusting the downlinks for the
+new key. Eventually, you should reach the bottom, and proceed with the
+insertion of the new tuple.
+
+Once we've found the target page to insert to, we check if there's room
+for the new tuple. If there is, the tuple is inserted, and we're done.
+If it doesn't fit, however, the page needs to be split. Note that it is
+possible that a page needs to be split into more than two pages, if keys have
+different lengths or more than one key is being inserted at a time (which can
+happen when inserting downlinks for a page split that resulted in more than
+two pages at the lower level). After splitting a page, the parent page needs
+to be updated. The downlink for the new page needs to be inserted, and the
+downlink for the old page, which became the left half of the split, needs to
+be updated to only cover those tuples that stayed on the left page. Inserting
+the downlink in the parent can again lead to a page split, recursing up to the
+root page in the worst case.
+
+gistplacetopage is the workhorse function that performs one step of the
+insertion. If the tuple fits, it inserts it to the given page, otherwise
+it splits the page, and constructs the new downlink tuples for the split
+pages. The caller must then call gistplacetopage() on the parent page to
+insert the downlink tuples. The parent page that holds the downlink to
+the child might have migrated as a result of concurrent splits of the
+parent, gistfindCorrectParent() is used to find the parent page.
+
+Splitting the root page works slightly differently. At root split,
+gistplacetopage() allocates the new child pages and replaces the old root
+page with the new root containing downlinks to the new children, all in one
+operation.
+
+
+findPath is a subroutine of findParent, used when the correct parent page
+can't be found by following the rightlinks at the parent level:
 
 findPath( stack item )
    push stack, [root, 0, 0] // page, LSN, parent
        pop stack
    end
 
+
+gistFindCorrectParent is used to re-find the parent of a page during
+insertion. It might have migrated to the right since we traversed down the
+tree because of page splits.
+
 findParent( stack item )
    parent = item->parent
-   latch( parent->page, X-mode )
    if ( parent->page->lsn != parent->lsn )
        while(true)
            search parent tuple on parent->page, if found the return
        end
        newstack = findPath( item->parent )
        replace part of stack to new one
+       latch( parent->page, X-mode )
        return findParent( item )
    end
 
+pageSplit function decides how to distribute keys to the new pages after
+page split:
+
 pageSplit(page, allkeys)
    (lkeys, rkeys) = pickSplit( allkeys )
    if ( page is root )
    return newkeys
 
 
-placetopage(page, keysarray)
-   if ( no space left on page )
-       keysarray = pageSplit(page, [ extract_keys(page), keysarray])
-       last page in chain gets old NSN,
-       original and others - new NSN equals to LSN
-       if ( page is root )
-           make new root with keysarray
-       end
-   else
-       put keysarray on page
-       if ( length of keysarray > 1 )
-           keysarray = [ union(keysarray) ]
-       end
-   end
 
-insert(new-key)
-   stack = findLeaf(new-key)
-   keysarray = [new-key]
-   ptr = top of stack
-   while(true)
-       findParent( ptr ) //findParent latches parent page
-       keysarray = placetopage(ptr->page, keysarray)
-       unlatch( ptr->page )
-       pop stack;
-       ptr = top of stack
-       if (length of keysarray == 1)
-           newboundingkey = union(oldboundingkey, keysarray)
-           if (newboundingkey == oldboundingkey)
-               unlatch ptr->page
-               break loop
-           end
-       end
-   end
+Concurrency control
+-------------------
+As a rule of thumb, if you need to hold a lock on multiple pages at the
+same time, the locks should be acquired in the following order: child page
+before parent, and left-to-right at the same level. Always acquiring the
+locks in the same order avoids deadlocks.
+
+The search algorithm only looks at and locks one page at a time. Consequently
+there's a race condition between a search and a page split. A page split
+happens in two phases: 1. The page is split 2. The downlink is inserted to the
+parent. If a search looks at the parent page between those steps, before the
+downlink is inserted, it will still find the new right half by following the
+rightlink on the left half. But it must not follow the rightlink if it saw the
+downlink in the parent, or the page will be visited twice!
+
+A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan
+sees that flag set, it knows that the right page is missing the downlink, and
+should be visited too. When split inserts the downlink to the parent, it
+clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the
+child page header to match the LSN of the insertion on the parent. If the
+F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the
+LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page
+before the downlink was inserted, so it should follow the rightlink. Otherwise
+the scan saw the downlink in the parent page, and will/did follow that as
+usual.
+
+A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because
+a page split keeps the child pages locked until the downlink has been inserted
+to the parent and the flag cleared again. But if a crash happens in the middle
+of a page split, before the downlinks are inserted into the parent, that will
+leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine,
+but we'll eventually want to fix that for performance reasons. And more
+importantly, dealing with pages with missing downlink pointers in the parent
+would complicate the insertion algorithm. So when an insertion sees a page
+with F_FOLLOW_RIGHT set, it immediately tries to bring the split that
+crashed in the middle to completion by adding the downlink in the parent.
+
 
 Authors:
    Teodor Sigaev   <teodor@sigaev.ru>
 
    MemoryContext tmpCtx;
 } GISTBuildState;
 
+/* A List of these is used represent a split-in-progress. */
+typedef struct
+{
+   Buffer      buf;        /* the split page "half" */
+   IndexTuple  downlink;   /* downlink for this half. */
+} GISTPageSplitInfo;
 
 /* non-export function prototypes */
 static void gistbuildCallback(Relation index,
             IndexTuple itup,
             Size freespace,
             GISTSTATE *GISTstate);
-static void gistfindleaf(GISTInsertState *state,
-            GISTSTATE *giststate);
+static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
+static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
+                GISTSTATE *giststate,
+                IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
+                Buffer leftchild);
+static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
+               GISTSTATE *giststate, List *splitinfo);
 
 
 #define ROTATEDIST(d) do { \
 
 
 /*
- * Workhouse routine for doing insertion into a GiST index. Note that
- * this routine assumes it is invoked in a short-lived memory context,
- * so it does not bother releasing palloc'd allocations.
+ * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
+ * at that offset is atomically removed along with inserting the new tuples.
+ * This is used to replace a tuple with a new one.
+ *
+ * If 'leftchildbuf' is valid, we're inserting the downlink for the page
+ * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
+ * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
+ *
+ * If there is not enough room on the page, it is split. All the split
+ * pages are kept pinned and locked and returned in *splitinfo, the caller
+ * is responsible for inserting the downlinks for them. However, if
+ * 'buffer' is the root page and it needs to be split, gistplacetopage()
+ * performs the split as one atomic operation, and *splitinfo is set to NIL.
+ * In that case, we continue to hold the root page locked, and the child
+ * pages are released; note that new tuple(s) are *not* on the root page
+ * but in one of the new child pages.
  */
-static void
-gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
+static bool
+gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
+               Buffer buffer,
+               IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
+               Buffer leftchildbuf,
+               List **splitinfo)
 {
-   GISTInsertState state;
-
-   memset(&state, 0, sizeof(GISTInsertState));
-
-   state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
-   state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
-   memcpy(state.itup[0], itup, IndexTupleSize(itup));
-   state.ituplen = 1;
-   state.freespace = freespace;
-   state.r = r;
-   state.key = itup->t_tid;
-   state.needInsertComplete = true;
-
-   state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
-   state.stack->blkno = GIST_ROOT_BLKNO;
+   Page        page = BufferGetPage(buffer);
+   bool        is_leaf = (GistPageIsLeaf(page)) ? true : false;
+   XLogRecPtr  recptr;
+   int         i;
+   bool        is_split;
 
-   gistfindleaf(&state, giststate);
-   gistmakedeal(&state, giststate);
-}
+   /*
+    * Refuse to modify a page that's incompletely split. This should
+    * not happen because we finish any incomplete splits while we walk
+    * down the tree. However, it's remotely possible that another
+    * concurrent inserter splits a parent page, and errors out before
+    * completing the split. We will just throw an error in that case,
+    * and leave any split we had in progress unfinished too. The next
+    * insert that comes along will clean up the mess.
+    */
+   if (GistFollowRight(page))
+       elog(ERROR, "concurrent GiST page split was incomplete");
 
-static bool
-gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
-{
-   bool        is_splitted = false;
-   bool        is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
+   *splitinfo = NIL;
 
    /*
-    * if (!is_leaf) remove old key: This node's key has been modified, either
+    * if isupdate, remove old key: This node's key has been modified, either
     * because a child split occurred or because we needed to adjust our key
     * for an insert in a child node. Therefore, remove the old version of
     * this node's key.
     * for WAL replay, in the non-split case we handle this by setting up a
     * one-element todelete array; in the split case, it's handled implicitly
     * because the tuple vector passed to gistSplit won't include this tuple.
-    *
-    * XXX: If we want to change fillfactors between node and leaf, fillfactor
-    * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
     */
-   if (gistnospace(state->stack->page, state->itup, state->ituplen,
-                   is_leaf ? InvalidOffsetNumber : state->stack->childoffnum,
-                   state->freespace))
+   is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
+   if (is_split)
    {
        /* no space for insertion */
        IndexTuple *itvec;
        int         tlen;
        SplitedPageLayout *dist = NULL,
                   *ptr;
-       BlockNumber rrlink = InvalidBlockNumber;
-       GistNSN     oldnsn;
+       BlockNumber oldrlink = InvalidBlockNumber;
+       GistNSN     oldnsn = { 0, 0 };
+       SplitedPageLayout rootpg;
+       BlockNumber blkno = BufferGetBlockNumber(buffer);
+       bool        is_rootsplit;
 
-       is_splitted = true;
+       is_rootsplit = (blkno == GIST_ROOT_BLKNO);
 
        /*
-        * Form index tuples vector to split: remove old tuple if t's needed
-        * and add new tuples to vector
+        * Form index tuples vector to split. If we're replacing an old tuple,
+        * remove the old version from the vector.
         */
-       itvec = gistextractpage(state->stack->page, &tlen);
-       if (!is_leaf)
+       itvec = gistextractpage(page, &tlen);
+       if (OffsetNumberIsValid(oldoffnum))
        {
            /* on inner page we should remove old tuple */
-           int         pos = state->stack->childoffnum - FirstOffsetNumber;
+           int         pos = oldoffnum - FirstOffsetNumber;
 
            tlen--;
            if (pos != tlen)
                memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
        }
-       itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
-       dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
-
-       state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
-       state->ituplen = 0;
+       itvec = gistjoinvector(itvec, &tlen, itup, ntup);
+       dist = gistSplit(state->r, page, itvec, tlen, giststate);
 
-       if (state->stack->blkno != GIST_ROOT_BLKNO)
+       /*
+        * Set up pages to work with. Allocate new buffers for all but the
+        * leftmost page. The original page becomes the new leftmost page,
+        * and is just replaced with the new contents.
+        *
+        * For a root-split, allocate new buffers for all child pages, the
+        * original page is overwritten with new root page containing
+        * downlinks to the new child pages.
+        */
+       ptr = dist;
+       if (!is_rootsplit)
        {
-           /*
-            * if non-root split then we should not allocate new buffer, but
-            * we must create temporary page to operate
-            */
-           dist->buffer = state->stack->buffer;
-           dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));
+           /* save old rightlink and NSN */
+           oldrlink = GistPageGetOpaque(page)->rightlink;
+           oldnsn = GistPageGetOpaque(page)->nsn;
+
+           dist->buffer = buffer;
+           dist->block.blkno = BufferGetBlockNumber(buffer);
+           dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
 
            /* clean all flags except F_LEAF */
            GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+
+           ptr = ptr->next;
+       }
+       for (; ptr; ptr = ptr->next)
+       {
+           /* Allocate new page */
+           ptr->buffer = gistNewBuffer(state->r);
+           GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
+           ptr->page = BufferGetPage(ptr->buffer);
+           ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
        }
 
-       /* make new pages and fills them */
+       /*
+        * Now that we know whick blocks the new pages go to, set up downlink
+        * tuples to point to them.
+        */
        for (ptr = dist; ptr; ptr = ptr->next)
        {
-           int         i;
-           char       *data;
+           ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+           GistTupleSetValid(ptr->itup);
+       }
 
-           /* get new page */
-           if (ptr->buffer == InvalidBuffer)
+       /*
+        * If this is a root split, we construct the new root page with the
+        * downlinks here directly, instead of requiring the caller to insert
+        * them. Add the new root page to the list along with the child pages.
+        */
+       if (is_rootsplit)
+       {
+           IndexTuple *downlinks;
+           int ndownlinks = 0;
+           int i;
+
+           rootpg.buffer = buffer;
+           rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
+           GistPageGetOpaque(rootpg.page)->flags = 0;
+
+           /* Prepare a vector of all the downlinks */
+           for (ptr = dist; ptr; ptr = ptr->next)
+               ndownlinks++;
+           downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
+           for (i = 0, ptr = dist; ptr; ptr = ptr->next)
+               downlinks[i++] = ptr->itup;
+
+           rootpg.block.blkno = GIST_ROOT_BLKNO;
+           rootpg.block.num = ndownlinks;
+           rootpg.list = gistfillitupvec(downlinks, ndownlinks,
+                                         &(rootpg.lenlist));
+           rootpg.itup = NULL;
+
+           rootpg.next = dist;
+           dist = &rootpg;
+       }
+       else
+       {
+           /* Prepare split-info to be returned to caller */
+           for (ptr = dist; ptr; ptr = ptr->next)
            {
-               ptr->buffer = gistNewBuffer(state->r);
-               GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
-               ptr->page = BufferGetPage(ptr->buffer);
+               GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
+               si->buf = ptr->buffer;
+               si->downlink = ptr->itup;
+               *splitinfo = lappend(*splitinfo, si);
            }
-           ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
+       }
 
-           /*
-            * fill page, we can do it because all these pages are new (ie not
-            * linked in tree or masked by temp page
-            */
-           data = (char *) (ptr->list);
+       /*
+        * Fill all pages. All the pages are new, ie. freshly allocated empty
+        * pages, or a temporary copy of the old page.
+        */
+       for (ptr = dist; ptr; ptr = ptr->next)
+       {
+           char *data = (char *) (ptr->list);
            for (i = 0; i < ptr->block.num; i++)
            {
                if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
                data += IndexTupleSize((IndexTuple) data);
            }
 
-           /* set up ItemPointer and remember it for parent */
-           ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
-           state->itup[state->ituplen] = ptr->itup;
-           state->ituplen++;
-       }
+           /* Set up rightlinks */
+           if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
+               GistPageGetOpaque(ptr->page)->rightlink =
+                   ptr->next->block.blkno;
+           else
+               GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
+
+           if (ptr->next && !is_rootsplit)
+               GistMarkFollowRight(ptr->page);
+           else
+               GistClearFollowRight(ptr->page);
 
-       /* saves old rightlink */
-       if (state->stack->blkno != GIST_ROOT_BLKNO)
-           rrlink = GistPageGetOpaque(dist->page)->rightlink;
+           /*
+            * Copy the NSN of the original page to all pages. The
+            * F_FOLLOW_RIGHT flags ensure that scans will follow the
+            * rightlinks until the downlinks are inserted.
+            */
+           GistPageGetOpaque(ptr->page)->nsn = oldnsn;
+       }
 
        START_CRIT_SECTION();
 
        /*
-        * must mark buffers dirty before XLogInsert, even though we'll still
-        * be changing their opaque fields below. set up right links.
+        * Must mark buffers dirty before XLogInsert, even though we'll still
+        * be changing their opaque fields below.
         */
        for (ptr = dist; ptr; ptr = ptr->next)
-       {
            MarkBufferDirty(ptr->buffer);
-           GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
-               ptr->next->block.blkno : rrlink;
-       }
+       if (BufferIsValid(leftchildbuf))
+           MarkBufferDirty(leftchildbuf);
 
-       /* restore splitted non-root page */
-       if (state->stack->blkno != GIST_ROOT_BLKNO)
-       {
-           PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
-           dist->page = BufferGetPage(dist->buffer);
-       }
+       /*
+        * The first page in the chain was a temporary working copy meant
+        * to replace the old page. Copy it over the old page.
+        */
+       PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
+       dist->page = BufferGetPage(dist->buffer);
 
+       /* Write the WAL record */
        if (RelationNeedsWAL(state->r))
-       {
-           XLogRecPtr  recptr;
-           XLogRecData *rdata;
-
-           rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
-                                  is_leaf, &(state->key), dist);
-
-           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
-
-           for (ptr = dist; ptr; ptr = ptr->next)
-           {
-               PageSetLSN(ptr->page, recptr);
-               PageSetTLI(ptr->page, ThisTimeLineID);
-           }
-       }
+           recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
+                                  dist, oldrlink, oldnsn, leftchildbuf);
        else
-       {
-           for (ptr = dist; ptr; ptr = ptr->next)
-           {
-               PageSetLSN(ptr->page, GetXLogRecPtrForTemp());
-           }
-       }
-
-       /* set up NSN */
-       oldnsn = GistPageGetOpaque(dist->page)->nsn;
-       if (state->stack->blkno == GIST_ROOT_BLKNO)
-           /* if root split we should put initial value */
-           oldnsn = PageGetLSN(dist->page);
+           recptr = GetXLogRecPtrForTemp();
 
        for (ptr = dist; ptr; ptr = ptr->next)
        {
-           /* only for last set oldnsn */
-           GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
-               PageGetLSN(ptr->page) : oldnsn;
+           PageSetLSN(ptr->page, recptr);
+           PageSetTLI(ptr->page, ThisTimeLineID);
        }
 
        /*
-        * release buffers, if it was a root split then release all buffers
-        * because we create all buffers
+        * Return the new child buffers to the caller.
+        *
+        * If this was a root split, we've already inserted the downlink
+        * pointers, in the form of a new root page. Therefore we can
+        * release all the new buffers, and keep just the root page locked.
         */
-       ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next;
-       for (; ptr; ptr = ptr->next)
-           UnlockReleaseBuffer(ptr->buffer);
-
-       if (state->stack->blkno == GIST_ROOT_BLKNO)
+       if (is_rootsplit)
        {
-           gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
-           state->needInsertComplete = false;
+           for (ptr = dist->next; ptr; ptr = ptr->next)
+               UnlockReleaseBuffer(ptr->buffer);
        }
-
-       END_CRIT_SECTION();
    }
    else
    {
-       /* enough space */
+       /*
+        * Enough space. We also get here if ntuples==0.
+        */
        START_CRIT_SECTION();
 
-       if (!is_leaf)
-           PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
-       gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
+       if (OffsetNumberIsValid(oldoffnum))
+           PageIndexTupleDelete(page, oldoffnum);
+       gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
 
-       MarkBufferDirty(state->stack->buffer);
+       MarkBufferDirty(buffer);
+
+       if (BufferIsValid(leftchildbuf))
+           MarkBufferDirty(leftchildbuf);
 
        if (RelationNeedsWAL(state->r))
        {
-           OffsetNumber noffs = 0,
-                       offs[1];
-           XLogRecPtr  recptr;
-           XLogRecData *rdata;
+           OffsetNumber ndeloffs = 0,
+                       deloffs[1];
 
-           if (!is_leaf)
+           if (OffsetNumberIsValid(oldoffnum))
            {
-               /* only on inner page we should delete previous version */
-               offs[0] = state->stack->childoffnum;
-               noffs = 1;
+               deloffs[0] = oldoffnum;
+               ndeloffs = 1;
            }
 
-           rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
-                                   offs, noffs,
-                                   state->itup, state->ituplen,
-                                   &(state->key));
+           recptr = gistXLogUpdate(state->r->rd_node, buffer,
+                                   deloffs, ndeloffs, itup, ntup,
+                                   leftchildbuf);
 
-           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
-           PageSetLSN(state->stack->page, recptr);
-           PageSetTLI(state->stack->page, ThisTimeLineID);
+           PageSetLSN(page, recptr);
+           PageSetTLI(page, ThisTimeLineID);
        }
        else
-           PageSetLSN(state->stack->page, GetXLogRecPtrForTemp());
-
-       if (state->stack->blkno == GIST_ROOT_BLKNO)
-           state->needInsertComplete = false;
+       {
+           recptr = GetXLogRecPtrForTemp();
+           PageSetLSN(page, recptr);
+       }
 
-       END_CRIT_SECTION();
+       *splitinfo = NIL;
+   }
 
-       if (state->ituplen > 1)
-       {                       /* previous is_splitted==true */
+   /*
+    * If we inserted the downlink for a child page, set NSN and clear
+    * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know
+    * to follow the rightlink if and only if they looked at the parent page
+    * before we inserted the downlink.
+    *
+    * Note that we do this *after* writing the WAL record. That means that
+    * the possible full page image in the WAL record does not include
+    * these changes, and they must be replayed even if the page is restored
+    * from the full page image. There's a chicken-and-egg problem: if we
+    * updated the child pages first, we wouldn't know the recptr of the WAL
+    * record we're about to write.
+    */
+   if (BufferIsValid(leftchildbuf))
+   {
+       Page leftpg = BufferGetPage(leftchildbuf);
 
-           /*
-            * child was splited, so we must form union for insertion in
-            * parent
-            */
-           IndexTuple  newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
+       GistPageGetOpaque(leftpg)->nsn = recptr;
+       GistClearFollowRight(leftpg);
 
-           ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
-           state->itup[0] = newtup;
-           state->ituplen = 1;
-       }
-       else if (is_leaf)
-       {
-           /*
-            * itup[0] store key to adjust parent, we set it to valid to
-            * correct check by GistTupleIsInvalid macro in gistgetadjusted()
-            */
-           ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
-           GistTupleSetValid(state->itup[0]);
-       }
+       PageSetLSN(leftpg, recptr);
+       PageSetTLI(leftpg, ThisTimeLineID);
    }
-   return is_splitted;
+
+   END_CRIT_SECTION();
+
+   return is_split;
 }
 
 /*
- * returns stack of pages, all pages in stack are pinned, and
- * leaf is X-locked
+ * Workhouse routine for doing insertion into a GiST index. Note that
+ * this routine assumes it is invoked in a short-lived memory context,
+ * so it does not bother releasing palloc'd allocations.
  */
-
 static void
-gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
+gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
 {
    ItemId      iid;
    IndexTuple  idxtuple;
-   GISTPageOpaque opaque;
+   GISTInsertStack firststack;
+   GISTInsertStack *stack;
+   GISTInsertState state;
+   bool        xlocked = false;
+
+   memset(&state, 0, sizeof(GISTInsertState));
+   state.freespace = freespace;
+   state.r = r;
+
+   /* Start from the root */
+   firststack.blkno = GIST_ROOT_BLKNO;
+   firststack.lsn.xrecoff = 0;
+   firststack.parent = NULL;
+   state.stack = stack = &firststack;
 
    /*
-    * walk down, We don't lock page for a long time, but so we should be
-    * ready to recheck path in a bad case... We remember, that page->lsn
-    * should never be invalid.
+    * Walk down along the path of smallest penalty, updating the parent
+    * pointers with the key we're inserting as we go. If we crash in the
+    * middle, the tree is consistent, although the possible parent updates
+    * were a waste.
     */
    for (;;)
    {
-       if (XLogRecPtrIsInvalid(state->stack->lsn))
-           state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
-       LockBuffer(state->stack->buffer, GIST_SHARE);
-       gistcheckpage(state->r, state->stack->buffer);
+       if (XLogRecPtrIsInvalid(stack->lsn))
+           stack->buffer = ReadBuffer(state.r, stack->blkno);
+
+       /*
+        * Be optimistic and grab shared lock first. Swap it for an
+        * exclusive lock later if we need to update the page.
+        */
+       if (!xlocked)
+       {
+           LockBuffer(stack->buffer, GIST_SHARE);
+           gistcheckpage(state.r, stack->buffer);
+       }
+
+       stack->page = (Page) BufferGetPage(stack->buffer);
+       stack->lsn = PageGetLSN(stack->page);
+       Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
 
-       state->stack->page = (Page) BufferGetPage(state->stack->buffer);
-       opaque = GistPageGetOpaque(state->stack->page);
+       /*
+        * If this page was split but the downlink was never inserted to
+        * the parent because the inserting backend crashed before doing
+        * that, fix that now.
+        */
+       if (GistFollowRight(stack->page))
+       {
+           if (!xlocked)
+           {
+               LockBuffer(stack->buffer, GIST_UNLOCK);
+               LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+               xlocked = true;
+               /* someone might've completed the split when we unlocked */
+               if (!GistFollowRight(stack->page))
+                   continue;
+           }
+           gistfixsplit(&state, giststate);
 
-       state->stack->lsn = PageGetLSN(state->stack->page);
-       Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn));
+           UnlockReleaseBuffer(stack->buffer);
+           xlocked = false;
+           state.stack = stack = stack->parent;
+           continue;
+       }
 
-       if (state->stack->blkno != GIST_ROOT_BLKNO &&
-           XLByteLT(state->stack->parent->lsn, opaque->nsn))
+       if (stack->blkno != GIST_ROOT_BLKNO &&
+           XLByteLT(stack->parent->lsn,
+                    GistPageGetOpaque(stack->page)->nsn))
        {
            /*
-            * caused split non-root page is detected, go up to parent to
-            * choose best child
+            * Concurrent split detected. There's no guarantee that the
+            * downlink for this page is consistent with the tuple we're
+            * inserting anymore, so go back to parent and rechoose the
+            * best child.
             */
-           UnlockReleaseBuffer(state->stack->buffer);
-           state->stack = state->stack->parent;
+           UnlockReleaseBuffer(stack->buffer);
+           xlocked = false;
+           state.stack = stack = stack->parent;
            continue;
        }
 
-       if (!GistPageIsLeaf(state->stack->page))
+       if (!GistPageIsLeaf(stack->page))
        {
            /*
-            * This is an internal page, so continue to walk down the tree. We
-            * find the child node that has the minimum insertion penalty and
-            * recursively invoke ourselves to modify that node. Once the
-            * recursive call returns, we may need to adjust the parent node
-            * for two reasons: the child node split, or the key in this node
-            * needs to be adjusted for the newly inserted key below us.
+            * This is an internal page so continue to walk down the tree.
+            * Find the child node that has the minimum insertion penalty.
             */
-           GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
-
-           state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
+           BlockNumber childblkno;
+           IndexTuple newtup;
+           GISTInsertStack *item;
 
-           iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
-           idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
-           item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
-           LockBuffer(state->stack->buffer, GIST_UNLOCK);
+           stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
+           iid = PageGetItemId(stack->page, stack->childoffnum);
+           idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+           childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
 
-           item->parent = state->stack;
-           item->child = NULL;
-           if (state->stack)
-               state->stack->child = item;
-           state->stack = item;
-       }
-       else
-       {
-           /* be carefull, during unlock/lock page may be changed... */
-           LockBuffer(state->stack->buffer, GIST_UNLOCK);
-           LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
-           state->stack->page = (Page) BufferGetPage(state->stack->buffer);
-           opaque = GistPageGetOpaque(state->stack->page);
+           /*
+            * Check that it's not a leftover invalid tuple from pre-9.1
+            */
+           if (GistTupleIsInvalid(idxtuple))
+               ereport(ERROR,
+                       (errmsg("index \"%s\" contains an inner tuple marked as invalid",
+                               RelationGetRelationName(r)),
+                        errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
+                        errhint("Please REINDEX it.")));
 
-           if (state->stack->blkno == GIST_ROOT_BLKNO)
+           /*
+            * Check that the key representing the target child node is
+            * consistent with the key we're inserting. Update it if it's not.
+            */
+           newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
+           if (newtup)
            {
                /*
-                * the only page can become inner instead of leaf is a root
-                * page, so for root we should recheck it
+                * Swap shared lock for an exclusive one. Beware, the page
+                * may change while we unlock/lock the page...
                 */
-               if (!GistPageIsLeaf(state->stack->page))
+               if (!xlocked)
                {
-                   /*
-                    * very rarely situation: during unlock/lock index with
-                    * number of pages = 1 was increased
-                    */
-                   LockBuffer(state->stack->buffer, GIST_UNLOCK);
-                   continue;
-               }
+                   LockBuffer(stack->buffer, GIST_UNLOCK);
+                   LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+                   xlocked = true;
+                   stack->page = (Page) BufferGetPage(stack->buffer);
 
+                   if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
+                   {
+                       /* the page was changed while we unlocked it, retry */
+                       continue;
+                   }
+               }
                /*
-                * we don't need to check root split, because checking
-                * leaf/inner is enough to recognize split for root
+                * Update the tuple.
+                *
+                * gistinserthere() might have to split the page to make the
+                * updated tuple fit. It will adjust the stack so that after
+                * the call, we'll be holding a lock on the page containing
+                * the tuple, which might have moved right.
+                *
+                * Except if this causes a root split, gistinserthere()
+                * returns 'true'. In that case, stack only holds the new
+                * root, and the child page was released. Have to start
+                * all over.
                 */
-
+               if (gistinserttuples(&state, stack, giststate, &newtup, 1,
+                                    stack->childoffnum, InvalidBuffer))
+               {
+                   UnlockReleaseBuffer(stack->buffer);
+                   xlocked = false;
+                   state.stack = stack = stack->parent;
+                   continue;
+               }
            }
-           else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
+           LockBuffer(stack->buffer, GIST_UNLOCK);
+           xlocked = false;
+
+           /* descend to the chosen child */
+           item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
+           item->blkno = childblkno;
+           item->parent = stack;
+           state.stack = stack = item;
+       }
+       else
+       {
+           /*
+            * Leaf page. Insert the new key. We've already updated all the
+            * parents on the way down, but we might have to split the page
+            * if it doesn't fit. gistinserthere() will take care of that.
+            */
+
+           /*
+            * Swap shared lock for an exclusive one. Be careful, the page
+            * may change while we unlock/lock the page...
+            */
+           if (!xlocked)
            {
-               /*
-                * detecting split during unlock/lock, so we should find
-                * better child on parent
-                */
+               LockBuffer(stack->buffer, GIST_UNLOCK);
+               LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+               xlocked = true;
+               stack->page = (Page) BufferGetPage(stack->buffer);
+               stack->lsn = PageGetLSN(stack->page);
 
-               /* forget buffer */
-               UnlockReleaseBuffer(state->stack->buffer);
+               if (stack->blkno == GIST_ROOT_BLKNO)
+               {
+                   /*
+                    * the only page that can become inner instead of leaf
+                    * is the root page, so for root we should recheck it
+                    */
+                   if (!GistPageIsLeaf(stack->page))
+                   {
+                       /*
+                        * very rare situation: during unlock/lock index with
+                        * number of pages = 1 was increased
+                        */
+                       LockBuffer(stack->buffer, GIST_UNLOCK);
+                       xlocked = false;
+                       continue;
+                   }
 
-               state->stack = state->stack->parent;
-               continue;
+                   /*
+                    * we don't need to check root split, because checking
+                    * leaf/inner is enough to recognize split for root
+                    */
+               }
+               else if (GistFollowRight(stack->page) ||
+                        XLByteLT(stack->parent->lsn,
+                                 GistPageGetOpaque(stack->page)->nsn))
+               {
+                   /*
+                    * The page was split while we momentarily unlocked the
+                    * page. Go back to parent.
+                    */
+                   UnlockReleaseBuffer(stack->buffer);
+                   xlocked = false;
+                   state.stack = stack = stack->parent;
+                   continue;
+               }
            }
 
-           state->stack->lsn = PageGetLSN(state->stack->page);
+           /* now state.stack->(page, buffer and blkno) points to leaf page */
 
-           /* ok we found a leaf page and it X-locked */
+           gistinserttuples(&state, stack, giststate, &itup, 1,
+                            InvalidOffsetNumber, InvalidBuffer);
+           LockBuffer(stack->buffer, GIST_UNLOCK);
+
+           /* Release any pins we might still hold before exiting */
+           for (; stack; stack = stack->parent)
+               ReleaseBuffer(stack->buffer);
            break;
        }
    }
-
-   /* now state->stack->(page, buffer and blkno) points to leaf page */
 }
 
 /*
  *
  * returns from the beginning of closest parent;
  *
- * To prevent deadlocks, this should lock only one page simultaneously.
+ * To prevent deadlocks, this should lock only one page at a time.
  */
 GISTInsertStack *
 gistFindPath(Relation r, BlockNumber child)
 
        top->lsn = PageGetLSN(page);
 
+       /*
+        * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
+        * downlink. This should not normally happen..
+        */
+       if (GistFollowRight(page))
+           elog(ERROR, "concurrent GiST page split was incomplete");
+
        if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
            GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
        {
                ptr = top;
                while (ptr->parent)
                {
-                   /* set child link */
-                   ptr->parent->child = ptr;
                    /* move childoffnum.. */
                    if (ptr == top)
                    {
    return NULL;
 }
 
-
 /*
- * Returns X-locked parent of stack page
+ * Updates the stack so that child->parent is the correct parent of the
+ * child. child->parent must be exclusively locked on entry, and will
+ * remain so at exit, but it might not be the same page anymore.
  */
-
 static void
 gistFindCorrectParent(Relation r, GISTInsertStack *child)
 {
    GISTInsertStack *parent = child->parent;
 
-   LockBuffer(parent->buffer, GIST_EXCLUSIVE);
    gistcheckpage(r, parent->buffer);
    parent->page = (Page) BufferGetPage(parent->buffer);
 
 
        /* install new chain of parents to stack */
        child->parent = parent;
-       parent->child = child;
 
        /* make recursive call to normal processing */
+       LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
        gistFindCorrectParent(r, child);
    }
 
    return;
 }
 
-void
-gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
+/*
+ * Form a downlink pointer for the page in 'buf'.
+ */
+static IndexTuple
+gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
+                GISTInsertStack *stack)
 {
-   int         is_splitted;
-   ItemId      iid;
-   IndexTuple  oldtup,
-               newtup;
+   Page page = BufferGetPage(buf);
+   OffsetNumber maxoff;
+   OffsetNumber offset;
+   IndexTuple downlink = NULL;
 
-   /* walk up */
-   while (true)
+   maxoff = PageGetMaxOffsetNumber(page);
+   for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
    {
-       /*
-        * After this call: 1. if child page was splited, then itup contains
-        * keys for each page 2. if  child page wasn't splited, then itup
-        * contains additional for adjustment of current key
-        */
-
-       if (state->stack->parent)
+       IndexTuple  ituple = (IndexTuple)
+           PageGetItem(page, PageGetItemId(page, offset));
+       if (downlink == NULL)
+           downlink = CopyIndexTuple(ituple);
+       else
        {
-           /*
-            * X-lock parent page before proceed child, gistFindCorrectParent
-            * should find and lock it
-            */
-           gistFindCorrectParent(state->r, state->stack);
+           IndexTuple newdownlink;
+           newdownlink = gistgetadjusted(rel, downlink, ituple,
+                                         giststate);
+           if (newdownlink)
+               downlink = newdownlink;
        }
-       is_splitted = gistplacetopage(state, giststate);
+   }
+
+   /*
+    * If the page is completely empty, we can't form a meaningful
+    * downlink for it. But we have to insert a downlink for the page.
+    * Any key will do, as long as its consistent with the downlink of
+    * parent page, so that we can legally insert it to the parent.
+    * A minimal one that matches as few scans as possible would be best,
+    * to keep scans from doing useless work, but we don't know how to
+    * construct that. So we just use the downlink of the original page
+    * that was split - that's as far from optimal as it can get but will
+    * do..
+    */
+   if (!downlink)
+   {
+       ItemId iid;
+
+       LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
+       gistFindCorrectParent(rel, stack);
+       iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
+       downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
+       downlink = CopyIndexTuple(downlink);
+       LockBuffer(stack->parent->buffer, GIST_UNLOCK);
+   }
 
-       /* parent locked above, so release child buffer */
-       UnlockReleaseBuffer(state->stack->buffer);
+   ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
+   GistTupleSetValid(downlink);
 
-       /* pop parent page from stack */
-       state->stack = state->stack->parent;
+   return downlink;
+}
 
-       /* stack is void */
-       if (!state->stack)
-           break;
 
-       /*
-        * child did not split, so we can check is it needed to update parent
-        * tuple
-        */
-       if (!is_splitted)
-       {
-           /* parent's tuple */
-           iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
-           oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
-           newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
-
-           if (!newtup)
-           {                   /* not need to update key */
-               LockBuffer(state->stack->buffer, GIST_UNLOCK);
-               break;
-           }
+/*
+ * Complete the incomplete split of state->stack->page.
+ */
+static void
+gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
+{
+   GISTInsertStack *stack = state->stack;
+   Buffer      buf;
+   Page        page;
+   List       *splitinfo = NIL;
+
+   elog(LOG, "fixing incomplete split in index \"%s\", block %u",
+        RelationGetRelationName(state->r), stack->blkno);
 
-           state->itup[0] = newtup;
+   Assert(GistFollowRight(stack->page));
+   Assert(OffsetNumberIsValid(stack->parent->childoffnum));
+
+   buf = stack->buffer;
+
+   /*
+    * Read the chain of split pages, following the rightlinks. Construct
+    * a downlink tuple for each page.
+    */
+   for (;;)
+   {
+       GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
+       IndexTuple downlink;
+
+       page = BufferGetPage(buf);
+
+       /* Form the new downlink tuples to insert to parent */
+       downlink = gistformdownlink(state->r, buf, giststate, stack);
+
+       si->buf = buf;
+       si->downlink = downlink;
+
+       splitinfo = lappend(splitinfo, si);
+
+       if (GistFollowRight(page))
+       {
+           /* lock next page */
+           buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
+           LockBuffer(buf, GIST_EXCLUSIVE);
        }
-   }                           /* while */
+       else
+           break;
+   }
+
+   /* Insert the downlinks */
+   gistfinishsplit(state, stack, giststate, splitinfo);
+}
+
+/*
+ * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
+ * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
+ * on the page.
+ *
+ * If leftchild is valid, we're inserting/updating the downlink for the
+ * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
+ * update NSN on leftchild, atomically with the insertion of the downlink.
+ *
+ * Returns 'true' if the page had to be split. On return, we will continue
+ * to hold an exclusive lock on state->stack->buffer, but if we had to split
+ * the page, it might not contain the tuple we just inserted/updated.
+ */
+static bool
+gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
+                GISTSTATE *giststate,
+                IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
+                Buffer leftchild)
+{
+   List *splitinfo;
+   bool is_split;
+
+   is_split = gistplacetopage(state, giststate, stack->buffer,
+                              tuples, ntup, oldoffnum,
+                              leftchild,
+                              &splitinfo);
+   if (splitinfo)
+       gistfinishsplit(state, stack, giststate, splitinfo);
+
+   return is_split;
+}
+
+/*
+ * Finish an incomplete split by inserting/updating the downlinks in
+ * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
+ * involved in the split, from left-to-right.
+ */
+static void
+gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
+               GISTSTATE *giststate, List *splitinfo)
+{
+   ListCell *lc;
+   List *reversed;
+   GISTPageSplitInfo *right;
+   GISTPageSplitInfo *left;
+   IndexTuple tuples[2];
+
+   /* A split always contains at least two halves */
+   Assert(list_length(splitinfo) >= 2);
+
+   /*
+    * We need to insert downlinks for each new page, and update the
+    * downlink for the original (leftmost) page in the split. Begin at
+    * the rightmost page, inserting one downlink at a time until there's
+    * only two pages left. Finally insert the downlink for the last new
+    * page and update the downlink for the original page as one operation.
+    */
 
-   /* release all parent buffers */
-   while (state->stack)
+   /* for convenience, create a copy of the list in reverse order */
+   reversed = NIL;
+   foreach(lc, splitinfo)
    {
-       ReleaseBuffer(state->stack->buffer);
-       state->stack = state->stack->parent;
+       reversed = lcons(lfirst(lc), reversed);
    }
 
-   /* say to xlog that insert is completed */
-   if (state->needInsertComplete && RelationNeedsWAL(state->r))
-       gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
+   LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
+   gistFindCorrectParent(state->r, stack);
+
+   while(list_length(reversed) > 2)
+   {
+       right = (GISTPageSplitInfo *) linitial(reversed);
+       left = (GISTPageSplitInfo *) lsecond(reversed);
+
+       if (gistinserttuples(state, stack->parent, giststate,
+                            &right->downlink, 1,
+                            InvalidOffsetNumber,
+                            left->buf))
+       {
+           /*
+            * If the parent page was split, need to relocate the original
+            * parent pointer.
+            */
+           gistFindCorrectParent(state->r, stack);
+       }
+       UnlockReleaseBuffer(right->buf);
+       reversed = list_delete_first(reversed);
+   }
+
+   right = (GISTPageSplitInfo *) linitial(reversed);
+   left = (GISTPageSplitInfo *) lsecond(reversed);
+
+   /*
+    * Finally insert downlink for the remaining right page and update the
+    * downlink for the original page to not contain the tuples that were
+    * moved to the new pages.
+    */
+   tuples[0] = left->downlink;
+   tuples[1] = right->downlink;
+   gistinserttuples(state, stack->parent, giststate,
+                    tuples, 2,
+                    stack->parent->childoffnum,
+                    left->buf);
+   LockBuffer(stack->parent->buffer, GIST_UNLOCK);
+   UnlockReleaseBuffer(right->buf);
+   Assert(left->buf == stack->buffer);
 }
 
 /*
        ROTATEDIST(res);
        res->block.num = v.splitVector.spl_nright;
        res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
-       res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
-           : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
+       res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
    }
 
    if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
        ROTATEDIST(res);
        res->block.num = v.splitVector.spl_nleft;
        res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
-       res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
-           : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
+       res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
    }
 
    return res;
 }
 
-/*
- * buffer must be pinned and locked by caller
- */
-void
-gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
-{
-   Page        page;
-
-   Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
-   page = BufferGetPage(buffer);
-
-   START_CRIT_SECTION();
-
-   GISTInitBuffer(buffer, 0);
-   gistfillbuffer(page, itup, len, FirstOffsetNumber);
-
-   MarkBufferDirty(buffer);
-
-   if (RelationNeedsWAL(r))
-   {
-       XLogRecPtr  recptr;
-       XLogRecData *rdata;
-
-       rdata = formUpdateRdata(r->rd_node, buffer,
-                               NULL, 0,
-                               itup, len, key);
-
-       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
-       PageSetLSN(page, recptr);
-       PageSetTLI(page, ThisTimeLineID);
-   }
-   else
-       PageSetLSN(page, GetXLogRecPtrForTemp());
-
-   END_CRIT_SECTION();
-}
-
 /*
  * Fill a GISTSTATE with information about the index
  */
 
    page = BufferGetPage(buffer);
    opaque = GistPageGetOpaque(page);
 
-   /* check if page split occurred since visit to parent */
+   /*
+    * Check if we need to follow the rightlink. We need to follow it if the
+    * page was concurrently split since we visited the parent (in which case
+    * parentlsn < nsn), or if the the system crashed after a page split but
+    * before the downlink was inserted into the parent.
+    */
    if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
-       XLByteLT(pageItem->data.parentlsn, opaque->nsn) &&
+       (GistFollowRight(page) ||
+        XLByteLT(pageItem->data.parentlsn, opaque->nsn)) &&
        opaque->rightlink != InvalidBlockNumber /* sanity check */ )
    {
        /* There was a page split, follow right link to add pages */
 
            v->spl_left[v->spl_nleft++] = i;
 }
 
-/*
- * if it was invalid tuple then we need special processing.
- * We move all invalid tuples on right page.
- *
- * if there is no place on left page, gistSplit will be called one more
- * time for left page.
- *
- * Normally, we never exec this code, but after crash replay it's possible
- * to get 'invalid' tuples (probability is low enough)
- */
-static void
-gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
-{
-   int         i;
-   static OffsetNumber offInvTuples[MaxOffsetNumber];
-   int         nOffInvTuples = 0;
-
-   for (i = 1; i <= len; i++)
-       if (GistTupleIsInvalid(itup[i - 1]))
-           offInvTuples[nOffInvTuples++] = i;
-
-   if (nOffInvTuples == len)
-   {
-       /* corner case, all tuples are invalid */
-       v->spl_rightvalid = v->spl_leftvalid = false;
-       gistSplitHalf(&v->splitVector, len);
-   }
-   else
-   {
-       GistSplitUnion gsvp;
-
-       v->splitVector.spl_right = offInvTuples;
-       v->splitVector.spl_nright = nOffInvTuples;
-       v->spl_rightvalid = false;
-
-       v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
-       v->splitVector.spl_nleft = 0;
-       for (i = 1; i <= len; i++)
-           if (!GistTupleIsInvalid(itup[i - 1]))
-               v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
-       v->spl_leftvalid = true;
-
-       gsvp.equiv = NULL;
-       gsvp.attr = v->spl_lattr;
-       gsvp.len = v->splitVector.spl_nleft;
-       gsvp.entries = v->splitVector.spl_left;
-       gsvp.isnull = v->spl_lisnull;
-
-       gistunionsubkeyvec(giststate, itup, &gsvp, 0);
-   }
-}
-
 /*
  * trys to split page by attno key, in a case of null
  * values move its to separate page.
        Datum       datum;
        bool        IsNull;
 
-       if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
-       {
-           gistSplitByInvalid(giststate, v, itup, len);
-           return;
-       }
-
        datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
        gistdentryinit(giststate, attno, &(entryvec->vector[i]),
                       datum, r, page, i,
            offNullTuples[nOffNullTuples++] = i;
    }
 
-   v->spl_leftvalid = v->spl_rightvalid = true;
-
    if (nOffNullTuples == len)
    {
        /*
 
  * invalid tuple. Resulting Datums aren't compressed.
  */
 
-bool
+void
 gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
                   Datum *attr, bool *isnull)
 {
            Datum       datum;
            bool        IsNull;
 
-           if (GistTupleIsInvalid(itvec[j]))
-               return FALSE;   /* signals that union with invalid tuple =>
-                                * result is invalid */
-
            datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
            if (IsNull)
                continue;
            isnull[i] = FALSE;
        }
    }
-
-   return TRUE;
 }
 
 /*
 {
    memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
 
-   if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS))
-       return gist_form_invalid_tuple(InvalidBlockNumber);
+   gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS);
 
    return gistFormTuple(giststate, r, attrS, isnullS, false);
 }
    IndexTuple  newtup = NULL;
    int         i;
 
-   if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
-       return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
-
    gistDeCompressAtt(giststate, r, oldtup, NULL,
                      (OffsetNumber) 0, oldentries, oldisnull);
 
        int         j;
        IndexTuple  itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 
-       if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup))
-       {
-           ereport(LOG,
-                   (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery",
-                           RelationGetRelationName(r))));
-           continue;
-       }
-
        sum_grow = 0;
        for (j = 0; j < r->rd_att->natts; j++)
        {
    }
 
    res = index_form_tuple(giststate->tupdesc, compatt, isnull);
-   GistTupleSetValid(res);
+   /*
+    * The offset number on tuples on internal pages is unused. For historical
+    * reasons, it is set 0xffff.
+    */
+   ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff);
    return res;
 }
 
 
 #include "utils/memutils.h"
 
 
-typedef struct GistBulkDeleteResult
-{
-   IndexBulkDeleteResult std;  /* common state */
-   bool        needReindex;
-} GistBulkDeleteResult;
-
-
 /*
  * VACUUM cleanup: update FSM
  */
 gistvacuumcleanup(PG_FUNCTION_ARGS)
 {
    IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
-   GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
+   IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
    Relation    rel = info->index;
    BlockNumber npages,
                blkno;
    /* Set up all-zero stats if gistbulkdelete wasn't called */
    if (stats == NULL)
    {
-       stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+       stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
        /* use heap's tuple count */
-       stats->std.num_index_tuples = info->num_heap_tuples;
-       stats->std.estimated_count = info->estimated_count;
+       stats->num_index_tuples = info->num_heap_tuples;
+       stats->estimated_count = info->estimated_count;
 
        /*
         * XXX the above is wrong if index is partial.  Would it be OK to just
         */
    }
 
-   if (stats->needReindex)
-       ereport(NOTICE,
-               (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
-                       RelationGetRelationName(rel))));
-
    /*
     * Need lock unless it's local to this backend.
     */
    IndexFreeSpaceMapVacuum(info->index);
 
    /* return statistics */
-   stats->std.pages_free = totFreePages;
+   stats->pages_free = totFreePages;
    if (needLock)
        LockRelationForExtension(rel, ExclusiveLock);
-   stats->std.num_pages = RelationGetNumberOfBlocks(rel);
+   stats->num_pages = RelationGetNumberOfBlocks(rel);
    if (needLock)
        UnlockRelationForExtension(rel, ExclusiveLock);
 
    GISTPageOpaque opaque = GistPageGetOpaque(page);
 
    if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
-       XLByteLT(stack->parentlsn, opaque->nsn) &&
+       (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
        opaque->rightlink != InvalidBlockNumber /* sanity check */ )
    {
        /* split page detected, install right link to the stack */
 gistbulkdelete(PG_FUNCTION_ARGS)
 {
    IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
-   GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
+   IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
    IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
    void       *callback_state = (void *) PG_GETARG_POINTER(3);
    Relation    rel = info->index;
 
    /* first time through? */
    if (stats == NULL)
-       stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+       stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
    /* we'll re-count the tuples each time */
-   stats->std.estimated_count = false;
-   stats->std.num_index_tuples = 0;
+   stats->estimated_count = false;
+   stats->num_index_tuples = 0;
 
    stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
    stack->blkno = GIST_ROOT_BLKNO;
                {
                    todelete[ntodelete] = i - ntodelete;
                    ntodelete++;
-                   stats->std.tuples_removed += 1;
+                   stats->tuples_removed += 1;
                }
                else
-                   stats->std.num_index_tuples += 1;
+                   stats->num_index_tuples += 1;
            }
 
            if (ntodelete)
 
                if (RelationNeedsWAL(rel))
                {
-                   XLogRecData *rdata;
                    XLogRecPtr  recptr;
-                   gistxlogPageUpdate *xlinfo;
 
-                   rdata = formUpdateRdata(rel->rd_node, buffer,
+                   recptr = gistXLogUpdate(rel->rd_node, buffer,
                                            todelete, ntodelete,
-                                           NULL, 0,
-                                           NULL);
-                   xlinfo = (gistxlogPageUpdate *) rdata->next->data;
-
-                   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+                                           NULL, 0, InvalidBuffer);
                    PageSetLSN(page, recptr);
                    PageSetTLI(page, ThisTimeLineID);
-
-                   pfree(xlinfo);
-                   pfree(rdata);
                }
                else
                    PageSetLSN(page, GetXLogRecPtrForTemp());
                stack->next = ptr;
 
                if (GistTupleIsInvalid(idxtuple))
-                   stats->needReindex = true;
+                   ereport(LOG,
+                           (errmsg("index \"%s\" contains an inner tuple marked as invalid",
+                                   RelationGetRelationName(rel)),
+                            errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
+                            errhint("Please REINDEX it.")));
            }
        }
 
 
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
-
-typedef struct
-{
-   gistxlogPageUpdate *data;
-   int         len;
-   IndexTuple *itup;
-   OffsetNumber *todelete;
-} PageUpdateRecord;
-
 typedef struct
 {
    gistxlogPage *header;
    NewPage    *page;
 } PageSplitRecord;
 
-/* track for incomplete inserts, idea was taken from nbtxlog.c */
-
-typedef struct gistIncompleteInsert
-{
-   RelFileNode node;
-   BlockNumber origblkno;      /* for splits */
-   ItemPointerData key;
-   int         lenblk;
-   BlockNumber *blkno;
-   XLogRecPtr  lsn;
-   BlockNumber *path;
-   int         pathlen;
-} gistIncompleteInsert;
-
-
 static MemoryContext opCtx;        /* working memory for operations */
-static MemoryContext insertCtx; /* holds incomplete_inserts list */
-static List *incomplete_inserts;
-
-
-#define ItemPointerEQ(a, b) \
-   ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
-     ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
-
 
+/*
+ * Replay the clearing of F_FOLLOW_RIGHT flag.
+ */
 static void
-pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
-                    BlockNumber *blkno, int lenblk,
-                    PageSplitRecord *xlinfo /* to extract blkno info */ )
+gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
+                        BlockNumber leftblkno)
 {
-   MemoryContext oldCxt;
-   gistIncompleteInsert *ninsert;
+   Buffer buffer;
 
-   if (!ItemPointerIsValid(&key))
+   buffer = XLogReadBuffer(node, leftblkno, false);
+   if (BufferIsValid(buffer))
+   {
+       Page page = (Page) BufferGetPage(buffer);
 
        /*
-        * if key is null then we should not store insertion as incomplete,
-        * because it's a vacuum operation..
+        * Note that we still update the page even if page LSN is equal to the
+        * LSN of this record, because the updated NSN is not included in the
+        * full page image.
         */
-       return;
-
-   oldCxt = MemoryContextSwitchTo(insertCtx);
-   ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
-
-   ninsert->node = node;
-   ninsert->key = key;
-   ninsert->lsn = lsn;
-
-   if (lenblk && blkno)
-   {
-       ninsert->lenblk = lenblk;
-       ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
-       memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
-       ninsert->origblkno = *blkno;
-   }
-   else
-   {
-       int         i;
-
-       Assert(xlinfo);
-       ninsert->lenblk = xlinfo->data->npage;
-       ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
-       for (i = 0; i < ninsert->lenblk; i++)
-           ninsert->blkno[i] = xlinfo->page[i].header->blkno;
-       ninsert->origblkno = xlinfo->data->origblkno;
-   }
-   Assert(ninsert->lenblk > 0);
-
-   /*
-    * Stick the new incomplete insert onto the front of the list, not the
-    * back.  This is so that gist_xlog_cleanup will process incompletions in
-    * last-in-first-out order.
-    */
-   incomplete_inserts = lcons(ninsert, incomplete_inserts);
-
-   MemoryContextSwitchTo(oldCxt);
-}
-
-static void
-forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
-{
-   ListCell   *l;
-
-   if (!ItemPointerIsValid(&key))
-       return;
-
-   if (incomplete_inserts == NIL)
-       return;
-
-   foreach(l, incomplete_inserts)
-   {
-       gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
-       if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
+       if (!XLByteLT(lsn, PageGetLSN(page)))
        {
-           /* found */
-           incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
-           pfree(insert->blkno);
-           pfree(insert);
-           break;
-       }
-   }
-}
+           GistPageGetOpaque(page)->nsn = lsn;
+           GistClearFollowRight(page);
 
-static void
-decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
-{
-   char       *begin = XLogRecGetData(record),
-              *ptr;
-   int         i = 0,
-               addpath = 0;
-
-   decoded->data = (gistxlogPageUpdate *) begin;
-
-   if (decoded->data->ntodelete)
-   {
-       decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
-       addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
-   }
-   else
-       decoded->todelete = NULL;
-
-   decoded->len = 0;
-   ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
-   while (ptr - begin < record->xl_len)
-   {
-       decoded->len++;
-       ptr += IndexTupleSize((IndexTuple) ptr);
-   }
-
-   decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
-
-   ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
-   while (ptr - begin < record->xl_len)
-   {
-       decoded->itup[i] = (IndexTuple) ptr;
-       ptr += IndexTupleSize(decoded->itup[i]);
-       i++;
+           PageSetLSN(page, lsn);
+           PageSetTLI(page, ThisTimeLineID);
+           MarkBufferDirty(buffer);
+       }
+       UnlockReleaseBuffer(buffer);
    }
 }
 
  * redo any page update (except page split)
  */
 static void
-gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
+gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
 {
-   gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
-   PageUpdateRecord xlrec;
+   char       *begin = XLogRecGetData(record);
+   gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
    Buffer      buffer;
    Page        page;
+   char       *data;
 
-   /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
-   forgetIncompleteInsert(xldata->node, xldata->key);
+   if (BlockNumberIsValid(xldata->leftchild))
+       gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
 
-   if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
-       /* operation with root always finalizes insertion */
-       pushIncompleteInsert(xldata->node, lsn, xldata->key,
-                            &(xldata->blkno), 1,
-                            NULL);
-
-   /* nothing else to do if page was backed up (and no info to do it with) */
+   /* nothing more to do if page was backed up (and no info to do it with) */
    if (record->xl_info & XLR_BKP_BLOCK_1)
        return;
 
-   decodePageUpdateRecord(&xlrec, record);
-
-   buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
+   buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
    if (!BufferIsValid(buffer))
        return;
    page = (Page) BufferGetPage(buffer);
        return;
    }
 
-   if (isnewroot)
-       GISTInitBuffer(buffer, 0);
-   else if (xlrec.data->ntodelete)
+   data = begin + sizeof(gistxlogPageUpdate);
+
+   /* Delete old tuples */
+   if (xldata->ntodelete > 0)
    {
        int         i;
+       OffsetNumber *todelete = (OffsetNumber *) data;
+       data += sizeof(OffsetNumber) * xldata->ntodelete;
 
-       for (i = 0; i < xlrec.data->ntodelete; i++)
-           PageIndexTupleDelete(page, xlrec.todelete[i]);
+       for (i = 0; i < xldata->ntodelete; i++)
+           PageIndexTupleDelete(page, todelete[i]);
        if (GistPageIsLeaf(page))
            GistMarkTuplesDeleted(page);
    }
 
    /* add tuples */
-   if (xlrec.len > 0)
-       gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
-
-   /*
-    * special case: leafpage, nothing to insert, nothing to delete, then
-    * vacuum marks page
-    */
-   if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
-       GistClearTuplesDeleted(page);
+   if (data - begin < record->xl_len)
+   {
+       OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+           OffsetNumberNext(PageGetMaxOffsetNumber(page));
+       while (data - begin < record->xl_len)
+       {
+           IndexTuple itup = (IndexTuple) data;
+           Size        sz = IndexTupleSize(itup);
+           OffsetNumber l;
+           data += sz;
+
+           l = PageAddItem(page, (Item) itup, sz, off, false, false);
+           if (l == InvalidOffsetNumber)
+               elog(ERROR, "failed to add item to GiST index page, size %d bytes",
+                    (int) sz);
+           off++;
+       }
+   }
+   else
+   {
+       /*
+        * special case: leafpage, nothing to insert, nothing to delete, then
+        * vacuum marks page
+        */
+       if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
+           GistClearTuplesDeleted(page);
+   }
 
    if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
 
 static void
 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
 {
+   gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
    PageSplitRecord xlrec;
    Buffer      buffer;
    Page        page;
    int         i;
-   int         flags;
+   bool        isrootsplit = false;
 
+   if (BlockNumberIsValid(xldata->leftchild))
+       gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
    decodePageSplitRecord(&xlrec, record);
-   flags = xlrec.data->origleaf ? F_LEAF : 0;
 
    /* loop around all pages */
    for (i = 0; i < xlrec.data->npage; i++)
    {
        NewPage    *newpage = xlrec.page + i;
+       int         flags;
+
+       if (newpage->header->blkno == GIST_ROOT_BLKNO)
+       {
+           Assert(i == 0);
+           isrootsplit = true;
+       }
 
        buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
        /* ok, clear buffer */
+       if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
+           flags = F_LEAF;
+       else
+           flags = 0;
        GISTInitBuffer(buffer, flags);
 
        /* and fill it */
        gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
 
+       if (newpage->header->blkno == GIST_ROOT_BLKNO)
+       {
+           GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
+           GistPageGetOpaque(page)->nsn = xldata->orignsn;
+           GistClearFollowRight(page);
+       }
+       else
+       {
+           if (i < xlrec.data->npage - 1)
+               GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+           else
+               GistPageGetOpaque(page)->rightlink = xldata->origrlink;
+           GistPageGetOpaque(page)->nsn = xldata->orignsn;
+           if (i < xlrec.data->npage - 1 && !isrootsplit)
+               GistMarkFollowRight(page);
+           else
+               GistClearFollowRight(page);
+       }
+
        PageSetLSN(page, lsn);
        PageSetTLI(page, ThisTimeLineID);
        MarkBufferDirty(buffer);
        UnlockReleaseBuffer(buffer);
    }
-
-   forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
-
-   pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
-                        NULL, 0,
-                        &xlrec);
 }
 
 static void
    UnlockReleaseBuffer(buffer);
 }
 
-static void
-gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
-{
-   char       *begin = XLogRecGetData(record),
-              *ptr;
-   gistxlogInsertComplete *xlrec;
-
-   xlrec = (gistxlogInsertComplete *) begin;
-
-   ptr = begin + sizeof(gistxlogInsertComplete);
-   while (ptr - begin < record->xl_len)
-   {
-       Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
-       forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
-       ptr += sizeof(ItemPointerData);
-   }
-}
-
 void
 gist_redo(XLogRecPtr lsn, XLogRecord *record)
 {
     * implement a similar optimization we have in b-tree, and remove killed
     * tuples outside VACUUM, we'll need to handle that here.
     */
-
    RestoreBkpBlocks(lsn, record, false);
 
    oldCxt = MemoryContextSwitchTo(opCtx);
    switch (info)
    {
        case XLOG_GIST_PAGE_UPDATE:
-           gistRedoPageUpdateRecord(lsn, record, false);
+           gistRedoPageUpdateRecord(lsn, record);
            break;
        case XLOG_GIST_PAGE_DELETE:
            gistRedoPageDeleteRecord(lsn, record);
            break;
-       case XLOG_GIST_NEW_ROOT:
-           gistRedoPageUpdateRecord(lsn, record, true);
-           break;
        case XLOG_GIST_PAGE_SPLIT:
            gistRedoPageSplitRecord(lsn, record);
            break;
        case XLOG_GIST_CREATE_INDEX:
            gistRedoCreateIndex(lsn, record);
            break;
-       case XLOG_GIST_INSERT_COMPLETE:
-           gistRedoCompleteInsert(lsn, record);
-           break;
        default:
            elog(PANIC, "gist_redo: unknown op code %u", info);
    }
 }
 
 static void
-out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
+out_target(StringInfo buf, RelFileNode node)
 {
    appendStringInfo(buf, "rel %u/%u/%u",
                     node.spcNode, node.dbNode, node.relNode);
-   if (ItemPointerIsValid(&key))
-       appendStringInfo(buf, "; tid %u/%u",
-                        ItemPointerGetBlockNumber(&key),
-                        ItemPointerGetOffsetNumber(&key));
 }
 
 static void
 out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
 {
-   out_target(buf, xlrec->node, xlrec->key);
+   out_target(buf, xlrec->node);
    appendStringInfo(buf, "; block number %u", xlrec->blkno);
 }
 
 out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
 {
    appendStringInfo(buf, "page_split: ");
-   out_target(buf, xlrec->node, xlrec->key);
+   out_target(buf, xlrec->node);
    appendStringInfo(buf, "; block number %u splits to %d pages",
                     xlrec->origblkno, xlrec->npage);
 }
        case XLOG_GIST_PAGE_DELETE:
            out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
            break;
-       case XLOG_GIST_NEW_ROOT:
-           appendStringInfo(buf, "new_root: ");
-           out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
-           break;
        case XLOG_GIST_PAGE_SPLIT:
            out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
            break;
                             ((RelFileNode *) rec)->dbNode,
                             ((RelFileNode *) rec)->relNode);
            break;
-       case XLOG_GIST_INSERT_COMPLETE:
-           appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
-                            ((gistxlogInsertComplete *) rec)->node.spcNode,
-                            ((gistxlogInsertComplete *) rec)->node.dbNode,
-                            ((gistxlogInsertComplete *) rec)->node.relNode);
-           break;
        default:
            appendStringInfo(buf, "unknown gist op code %u", info);
            break;
    }
 }
 
-IndexTuple
-gist_form_invalid_tuple(BlockNumber blkno)
-{
-   /*
-    * we don't alloc space for null's bitmap, this is invalid tuple, be
-    * carefull in read and write code
-    */
-   Size        size = IndexInfoFindDataOffset(0);
-   IndexTuple  tuple = (IndexTuple) palloc0(size);
-
-   tuple->t_info |= size;
-
-   ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
-   GistTupleSetInvalid(tuple);
-
-   return tuple;
-}
-
-
-static void
-gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
-{
-   GISTInsertStack *top;
-
-   insert->pathlen = 0;
-   insert->path = NULL;
-
-   if ((top = gistFindPath(index, insert->origblkno)) != NULL)
-   {
-       int         i;
-       GISTInsertStack *ptr;
-
-       for (ptr = top; ptr; ptr = ptr->parent)
-           insert->pathlen++;
-
-       insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
-
-       i = 0;
-       for (ptr = top; ptr; ptr = ptr->parent)
-           insert->path[i++] = ptr->blkno;
-   }
-   else
-       elog(ERROR, "lost parent for block %u", insert->origblkno);
-}
-
-static SplitedPageLayout *
-gistMakePageLayout(Buffer *buffers, int nbuffers)
-{
-   SplitedPageLayout *res = NULL,
-              *resptr;
-
-   while (nbuffers-- > 0)
-   {
-       Page        page = BufferGetPage(buffers[nbuffers]);
-       IndexTuple *vec;
-       int         veclen;
-
-       resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
-
-       resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
-       resptr->block.num = PageGetMaxOffsetNumber(page);
-
-       vec = gistextractpage(page, &veclen);
-       resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
-
-       resptr->next = res;
-       res = resptr;
-   }
-
-   return res;
-}
-
-/*
- * Continue insert after crash.  In normal situations, there aren't any
- * incomplete inserts, but if a crash occurs partway through an insertion
- * sequence, we'll need to finish making the index valid at the end of WAL
- * replay.
- *
- * Note that we assume the index is now in a valid state, except for the
- * unfinished insertion.  In particular it's safe to invoke gistFindPath();
- * there shouldn't be any garbage pages for it to run into.
- *
- * To complete insert we can't use basic insertion algorithm because
- * during insertion we can't call user-defined support functions of opclass.
- * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
- * 'invalid' tuple should be updated by vacuum full.
- */
-static void
-gistContinueInsert(gistIncompleteInsert *insert)
-{
-   IndexTuple *itup;
-   int         i,
-               lenitup;
-   Relation    index;
-
-   index = CreateFakeRelcacheEntry(insert->node);
-
-   /*
-    * needed vector itup never will be more than initial lenblkno+2, because
-    * during this processing Indextuple can be only smaller
-    */
-   lenitup = insert->lenblk;
-   itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
-
-   for (i = 0; i < insert->lenblk; i++)
-       itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
-
-   /*
-    * any insertion of itup[] should make LOG message about
-    */
-
-   if (insert->origblkno == GIST_ROOT_BLKNO)
-   {
-       /*
-        * it was split root, so we should only make new root. it can't be
-        * simple insert into root, we should replace all content of root.
-        */
-       Buffer      buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
-
-       gistnewroot(index, buffer, itup, lenitup, NULL);
-       UnlockReleaseBuffer(buffer);
-   }
-   else
-   {
-       Buffer     *buffers;
-       Page       *pages;
-       int         numbuffer;
-       OffsetNumber *todelete;
-
-       /* construct path */
-       gistxlogFindPath(index, insert);
-
-       Assert(insert->pathlen > 0);
-
-       buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
-       pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
-       todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
-
-       for (i = 0; i < insert->pathlen; i++)
-       {
-           int         j,
-                       k,
-                       pituplen = 0;
-           uint8       xlinfo;
-           XLogRecData *rdata;
-           XLogRecPtr  recptr;
-           Buffer      tempbuffer = InvalidBuffer;
-           int         ntodelete = 0;
-
-           numbuffer = 1;
-           buffers[0] = ReadBuffer(index, insert->path[i]);
-           LockBuffer(buffers[0], GIST_EXCLUSIVE);
-
-           /*
-            * we check buffer, because we restored page earlier
-            */
-           gistcheckpage(index, buffers[0]);
-
-           pages[0] = BufferGetPage(buffers[0]);
-           Assert(!GistPageIsLeaf(pages[0]));
-
-           pituplen = PageGetMaxOffsetNumber(pages[0]);
-
-           /* find remove old IndexTuples to remove */
-           for (j = 0; j < pituplen && ntodelete < lenitup; j++)
-           {
-               BlockNumber blkno;
-               ItemId      iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
-               IndexTuple  idxtup = (IndexTuple) PageGetItem(pages[0], iid);
-
-               blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
-
-               for (k = 0; k < lenitup; k++)
-                   if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
-                   {
-                       todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
-                       ntodelete++;
-                       break;
-                   }
-           }
-
-           if (ntodelete == 0)
-               elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
-
-           /*
-            * we check space with subtraction only first tuple to delete,
-            * hope, that wiil be enough space....
-            */
-
-           if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
-           {
-
-               /* no space left on page, so we must split */
-               buffers[numbuffer] = ReadBuffer(index, P_NEW);
-               LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
-               GISTInitBuffer(buffers[numbuffer], 0);
-               pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
-               gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
-               numbuffer++;
-
-               if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
-               {
-                   Buffer      tmp;
-
-                   /*
-                    * we split root, just copy content from root to new page
-                    */
-
-                   /* sanity check */
-                   if (i + 1 != insert->pathlen)
-                       elog(PANIC, "unexpected pathlen in index \"%s\"",
-                            RelationGetRelationName(index));
-
-                   /* fill new page, root will be changed later */
-                   tempbuffer = ReadBuffer(index, P_NEW);
-                   LockBuffer(tempbuffer, GIST_EXCLUSIVE);
-                   memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
-
-                   /* swap buffers[0] (was root) and temp buffer */
-                   tmp = buffers[0];
-                   buffers[0] = tempbuffer;
-                   tempbuffer = tmp;   /* now in tempbuffer GIST_ROOT_BLKNO,
-                                        * it is still unchanged */
-
-                   pages[0] = BufferGetPage(buffers[0]);
-               }
-
-               START_CRIT_SECTION();
-
-               for (j = 0; j < ntodelete; j++)
-                   PageIndexTupleDelete(pages[0], todelete[j]);
-
-               xlinfo = XLOG_GIST_PAGE_SPLIT;
-               rdata = formSplitRdata(index->rd_node, insert->path[i],
-                                      false, &(insert->key),
-                                    gistMakePageLayout(buffers, numbuffer));
-
-           }
-           else
-           {
-               START_CRIT_SECTION();
-
-               for (j = 0; j < ntodelete; j++)
-                   PageIndexTupleDelete(pages[0], todelete[j]);
-               gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
-
-               xlinfo = XLOG_GIST_PAGE_UPDATE;
-               rdata = formUpdateRdata(index->rd_node, buffers[0],
-                                       todelete, ntodelete,
-                                       itup, lenitup, &(insert->key));
-           }
-
-           /*
-            * use insert->key as mark for completion of insert (form*Rdata()
-            * above) for following possible replays
-            */
-
-           /* write pages, we should mark it dirty befor XLogInsert() */
-           for (j = 0; j < numbuffer; j++)
-           {
-               GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
-               MarkBufferDirty(buffers[j]);
-           }
-           recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
-           for (j = 0; j < numbuffer; j++)
-           {
-               PageSetLSN(pages[j], recptr);
-               PageSetTLI(pages[j], ThisTimeLineID);
-           }
-
-           END_CRIT_SECTION();
-
-           lenitup = numbuffer;
-           for (j = 0; j < numbuffer; j++)
-           {
-               itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
-               UnlockReleaseBuffer(buffers[j]);
-           }
-
-           if (tempbuffer != InvalidBuffer)
-           {
-               /*
-                * it was a root split, so fill it by new values
-                */
-               gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
-               UnlockReleaseBuffer(tempbuffer);
-           }
-       }
-   }
-
-   FreeFakeRelcacheEntry(index);
-
-   ereport(LOG,
-           (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
-           insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
-          errdetail("Incomplete insertion detected during crash replay.")));
-}
-
 void
 gist_xlog_startup(void)
 {
-   incomplete_inserts = NIL;
-   insertCtx = AllocSetContextCreate(CurrentMemoryContext,
-                                     "GiST recovery temporary context",
-                                     ALLOCSET_DEFAULT_MINSIZE,
-                                     ALLOCSET_DEFAULT_INITSIZE,
-                                     ALLOCSET_DEFAULT_MAXSIZE);
    opCtx = createTempGistContext();
 }
 
 void
 gist_xlog_cleanup(void)
 {
-   ListCell   *l;
-   MemoryContext oldCxt;
-
-   oldCxt = MemoryContextSwitchTo(opCtx);
-
-   foreach(l, incomplete_inserts)
-   {
-       gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
-       gistContinueInsert(insert);
-       MemoryContextReset(opCtx);
-   }
-   MemoryContextSwitchTo(oldCxt);
-
    MemoryContextDelete(opCtx);
-   MemoryContextDelete(insertCtx);
-}
-
-bool
-gist_safe_restartpoint(void)
-{
-   if (incomplete_inserts)
-       return false;
-   return true;
 }
 
-
-XLogRecData *
-formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
-              ItemPointer key, SplitedPageLayout *dist)
+/*
+ * Write WAL record of a page split.
+ */
+XLogRecPtr
+gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
+             SplitedPageLayout *dist,
+             BlockNumber origrlink, GistNSN orignsn,
+             Buffer leftchildbuf)
 {
    XLogRecData *rdata;
-   gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
+   gistxlogPageSplit xlrec;
    SplitedPageLayout *ptr;
    int         npage = 0,
-               cur = 1;
+               cur;
+   XLogRecPtr recptr;
 
-   ptr = dist;
-   while (ptr)
-   {
+   for (ptr = dist; ptr; ptr = ptr->next)
        npage++;
-       ptr = ptr->next;
-   }
 
    rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
 
-   xlrec->node = node;
-   xlrec->origblkno = blkno;
-   xlrec->origleaf = page_is_leaf;
-   xlrec->npage = (uint16) npage;
-   if (key)
-       xlrec->key = *key;
-   else
-       ItemPointerSetInvalid(&(xlrec->key));
+   xlrec.node = node;
+   xlrec.origblkno = blkno;
+   xlrec.origrlink = origrlink;
+   xlrec.orignsn = orignsn;
+   xlrec.origleaf = page_is_leaf;
+   xlrec.npage = (uint16) npage;
+   xlrec.leftchild =
+       BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
 
-   rdata[0].buffer = InvalidBuffer;
-   rdata[0].data = (char *) xlrec;
+   rdata[0].data = (char *) &xlrec;
    rdata[0].len = sizeof(gistxlogPageSplit);
-   rdata[0].next = NULL;
+   rdata[0].buffer = InvalidBuffer;
 
-   ptr = dist;
-   while (ptr)
+   cur = 1;
+
+   /*
+    * Include a full page image of the child buf. (only necessary if a
+    * checkpoint happened since the child page was split)
+    */
+   if (BufferIsValid(leftchildbuf))
    {
+       rdata[cur - 1].next = &(rdata[cur]);
+       rdata[cur].data = NULL;
+       rdata[cur].len = 0;
+       rdata[cur].buffer = leftchildbuf;
+       rdata[cur].buffer_std = true;
+       cur++;
+   }
+
+   for (ptr = dist; ptr; ptr = ptr->next)
+   {
+       rdata[cur - 1].next = &(rdata[cur]);
        rdata[cur].buffer = InvalidBuffer;
        rdata[cur].data = (char *) &(ptr->block);
        rdata[cur].len = sizeof(gistxlogPage);
-       rdata[cur - 1].next = &(rdata[cur]);
        cur++;
 
+       rdata[cur - 1].next = &(rdata[cur]);
        rdata[cur].buffer = InvalidBuffer;
        rdata[cur].data = (char *) (ptr->list);
        rdata[cur].len = ptr->lenlist;
-       rdata[cur - 1].next = &(rdata[cur]);
-       rdata[cur].next = NULL;
        cur++;
-       ptr = ptr->next;
    }
+   rdata[cur - 1].next = NULL;
+
+   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
 
-   return rdata;
+   pfree(rdata);
+   return recptr;
 }
 
 /*
- * Construct the rdata array for an XLOG record describing a page update
- * (deletion and/or insertion of tuples on a single index page).
+ * Write XLOG record describing a page update. The update can include any
+ * number of deletions and/or insertions of tuples on a single index page.
+ *
+ * If this update inserts a downlink for a split page, also record that
+ * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
  *
  * Note that both the todelete array and the tuples are marked as belonging
  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
  * at least one rdata item referencing the buffer, even when ntodelete and
  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
  */
-XLogRecData *
-formUpdateRdata(RelFileNode node, Buffer buffer,
-               OffsetNumber *todelete, int ntodelete,
-               IndexTuple *itup, int ituplen, ItemPointer key)
+XLogRecPtr
+gistXLogUpdate(RelFileNode node, Buffer buffer,
+              OffsetNumber *todelete, int ntodelete,
+              IndexTuple *itup, int ituplen,
+              Buffer leftchildbuf)
 {
    XLogRecData *rdata;
    gistxlogPageUpdate *xlrec;
    int         cur,
                i;
+   XLogRecPtr  recptr;
 
-   rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
+   rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
    xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
 
    xlrec->node = node;
    xlrec->blkno = BufferGetBlockNumber(buffer);
    xlrec->ntodelete = ntodelete;
-
-   if (key)
-       xlrec->key = *key;
-   else
-       ItemPointerSetInvalid(&(xlrec->key));
+   xlrec->leftchild =
+       BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
 
    rdata[0].buffer = buffer;
    rdata[0].buffer_std = true;
    rdata[1].next = &(rdata[2]);
 
    rdata[2].data = (char *) todelete;
-   rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+   rdata[2].len = sizeof(OffsetNumber) * ntodelete;
    rdata[2].buffer = buffer;
    rdata[2].buffer_std = true;
-   rdata[2].next = NULL;
 
-   /* new tuples */
    cur = 3;
+
+   /* new tuples */
    for (i = 0; i < ituplen; i++)
    {
        rdata[cur - 1].next = &(rdata[cur]);
        rdata[cur].len = IndexTupleSize(itup[i]);
        rdata[cur].buffer = buffer;
        rdata[cur].buffer_std = true;
-       rdata[cur].next = NULL;
        cur++;
    }
 
-   return rdata;
-}
-
-XLogRecPtr
-gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
-{
-   gistxlogInsertComplete xlrec;
-   XLogRecData rdata[2];
-   XLogRecPtr  recptr;
-
-   Assert(len > 0);
-   xlrec.node = node;
-
-   rdata[0].buffer = InvalidBuffer;
-   rdata[0].data = (char *) &xlrec;
-   rdata[0].len = sizeof(gistxlogInsertComplete);
-   rdata[0].next = &(rdata[1]);
-
-   rdata[1].buffer = InvalidBuffer;
-   rdata[1].data = (char *) keys;
-   rdata[1].len = sizeof(ItemPointerData) * len;
-   rdata[1].next = NULL;
-
-   START_CRIT_SECTION();
-
-   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
+   /*
+    * Include a full page image of the child buf. (only necessary if
+    * a checkpoint happened since the child page was split)
+    */
+   if (BufferIsValid(leftchildbuf))
+   {
+       rdata[cur - 1].next = &(rdata[cur]);
+       rdata[cur].data = NULL;
+       rdata[cur].len = 0;
+       rdata[cur].buffer = leftchildbuf;
+       rdata[cur].buffer_std = true;
+       cur++;
+   }
+   rdata[cur - 1].next = NULL;
 
-   END_CRIT_SECTION();
+   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
 
+   pfree(rdata);
    return recptr;
 }
 
    {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
    {"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
    {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
-   {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint},
+   {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL},
    {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
 };
 
 /*
  * Page opaque data in a GiST index page.
  */
-#define F_LEAF             (1 << 0)
-#define F_DELETED          (1 << 1)
-#define F_TUPLES_DELETED   (1 << 2)
+#define F_LEAF             (1 << 0)    /* leaf page */
+#define F_DELETED          (1 << 1)    /* the page has been deleted */
+#define F_TUPLES_DELETED   (1 << 2)    /* some tuples on the page are dead */
+#define F_FOLLOW_RIGHT     (1 << 3)    /* page to the right has no downlink */
 
 typedef XLogRecPtr GistNSN;
 
 #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
 #define GistClearTuplesDeleted(page)   ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
 
+#define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT)
+#define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT)
+#define GistClearFollowRight(page) ( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT)
+
 /*
  * Vector of GISTENTRY structs; user-defined methods union and picksplit
  * take it as one of their arguments
 
 /* XLog stuff */
 
 #define XLOG_GIST_PAGE_UPDATE      0x00
-#define XLOG_GIST_NEW_ROOT         0x20
+/* #define XLOG_GIST_NEW_ROOT          0x20 */ /* not used anymore */
 #define XLOG_GIST_PAGE_SPLIT       0x30
-#define XLOG_GIST_INSERT_COMPLETE  0x40
+/* #define XLOG_GIST_INSERT_COMPLETE   0x40 */ /* not used anymore */
 #define XLOG_GIST_CREATE_INDEX     0x50
 #define XLOG_GIST_PAGE_DELETE      0x60
 
    BlockNumber blkno;
 
    /*
-    * It used to identify completeness of insert. Sets to leaf itup
+    * If this operation completes a page split, by inserting a downlink for
+    * the split page, leftchild points to the left half of the split.
     */
-   ItemPointerData key;
+   BlockNumber leftchild;
 
    /* number of deleted offsets */
    uint16      ntodelete;
 {
    RelFileNode node;
    BlockNumber origblkno;      /* splitted page */
+   BlockNumber origrlink;      /* rightlink of the page before split */
+   GistNSN     orignsn;        /* NSN of the page before split */
    bool        origleaf;       /* was splitted page a leaf page? */
-   uint16      npage;
 
-   /* see comments on gistxlogPageUpdate */
-   ItemPointerData key;
+   BlockNumber leftchild;      /* like in gistxlogPageUpdate */
+   uint16      npage;          /* # of pages in the split */
 
    /*
     * follow: 1. gistxlogPage and array of IndexTupleData per page
    int         num;            /* number of index tuples following */
 } gistxlogPage;
 
-typedef struct gistxlogInsertComplete
-{
-   RelFileNode node;
-   /* follows ItemPointerData key to clean */
-} gistxlogInsertComplete;
-
 typedef struct gistxlogPageDelete
 {
    RelFileNode node;
  * GISTInsertStack used for locking buffers and transfer arguments during
  * insertion
  */
-
 typedef struct GISTInsertStack
 {
    /* current page */
    Page        page;
 
    /*
-    * log sequence number from page->lsn to recognize page update  and
+    * log sequence number from page->lsn to recognize page update and
     * compare it with page's nsn to recognize page split
     */
    GistNSN     lsn;
    /* child's offset */
    OffsetNumber childoffnum;
 
-   /* pointer to parent and child */
+   /* pointer to parent */
    struct GISTInsertStack *parent;
-   struct GISTInsertStack *child;
 
    /* for gistFindPath */
    struct GISTInsertStack *next;
    Datum       spl_lattr[INDEX_MAX_KEYS];      /* Union of subkeys in
                                                 * spl_left */
    bool        spl_lisnull[INDEX_MAX_KEYS];
-   bool        spl_leftvalid;
 
    Datum       spl_rattr[INDEX_MAX_KEYS];      /* Union of subkeys in
                                                 * spl_right */
    bool        spl_risnull[INDEX_MAX_KEYS];
-   bool        spl_rightvalid;
 
    bool       *spl_equiv;      /* equivalent tuples which can be freely
                                 * distributed between left and right pages */
 typedef struct
 {
    Relation    r;
-   IndexTuple *itup;           /* in/out, points to compressed entry */
-   int         ituplen;        /* length of itup */
    Size        freespace;      /* free space to be left */
-   GISTInsertStack *stack;
-   bool        needInsertComplete;
 
-   /* pointer to heap tuple */
-   ItemPointerData key;
+   GISTInsertStack *stack;
 } GISTInsertState;
 
 /* root page of a gist index */
 #define GIST_ROOT_BLKNO                0
 
 /*
- * mark tuples on inner pages during recovery
+ * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner
+ * pages to finish crash recovery of incomplete page splits. If a crash
+ * happened in the middle of a page split, so that the downlink pointers were
+ * not yet inserted, crash recovery inserted a special downlink pointer. The
+ * semantics of an invalid tuple was that it if you encounter one in a scan,
+ * it must always be followed, because we don't know if the tuples on the
+ * child page match or not.
+ *
+ * We no longer create such invalid tuples, we now mark the left-half of such
+ * an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the
+ * split properly the next time we need to insert on that page. To retain
+ * on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as
+ * the offset number of all inner tuples. If we encounter any invalid tuples
+ * with 0xfffe during insertion, we throw an error, though scans still handle
+ * them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1
+ * gist index which already has invalid tuples in it because of a crash. That
+ * should be rare, and you are recommended to REINDEX anyway if you have any
+ * invalid tuples in an index, so throwing an error is as far as we go with
+ * supporting that.
  */
 #define TUPLE_IS_VALID     0xffff
 #define TUPLE_IS_INVALID   0xfffe
 
 #define  GistTupleIsInvalid(itup)  ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
 #define  GistTupleSetValid(itup)   ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
-#define  GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
 
 /* gist.c */
 extern Datum gistbuild(PG_FUNCTION_ARGS);
 extern MemoryContext createTempGistContext(void);
 extern void initGISTstate(GISTSTATE *giststate, Relation index);
 extern void freeGISTstate(GISTSTATE *giststate);
-extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
-extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
 
 extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
          int len, GISTSTATE *giststate);
 extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
 extern void gist_xlog_startup(void);
 extern void gist_xlog_cleanup(void);
-extern bool gist_safe_restartpoint(void);
-extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
-
-extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
-               OffsetNumber *todelete, int ntodelete,
-               IndexTuple *itup, int ituplen, ItemPointer key);
 
-extern XLogRecData *formSplitRdata(RelFileNode node,
-              BlockNumber blkno, bool page_is_leaf,
-              ItemPointer key, SplitedPageLayout *dist);
+extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer,
+              OffsetNumber *todelete, int ntodelete,
+                                IndexTuple *itup, int ntup,
+              Buffer leftchild);
 
-extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
+extern XLogRecPtr gistXLogSplit(RelFileNode node,
+             BlockNumber blkno, bool page_is_leaf,
+             SplitedPageLayout *dist,
+             BlockNumber origrlink, GistNSN oldnsn,
+             Buffer leftchild);
 
 /* gistget.c */
 extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern float gistpenalty(GISTSTATE *giststate, int attno,
            GISTENTRY *key1, bool isNull1,
            GISTENTRY *key2, bool isNull2);
-extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
+extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
                   Datum *attr, bool *isnull);
 extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b);
 extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,