nbtree: Allocate new pages in separate function.
authorPeter Geoghegan <pg@bowt.ie>
Sat, 10 Jun 2023 21:08:25 +0000 (14:08 -0700)
committerPeter Geoghegan <pg@bowt.ie>
Sat, 10 Jun 2023 21:08:25 +0000 (14:08 -0700)
Split nbtree's _bt_getbuf function is two: code that read locks or write
locks existing pages remains in _bt_getbuf, while code that deals with
allocating new pages is moved to a new, dedicated function called
_bt_allocbuf.  This simplifies most _bt_getbuf callers, since it is no
longer necessary for them to pass a heaprel argument.  Many of the
changes to nbtree from commit 61b313e4 can be reverted.  This minimizes
the divergence between HEAD/PostgreSQL 16 and earlier release branches.

_bt_allocbuf replaces the previous nbtree idiom of passing P_NEW to
_bt_getbuf.  There are only 3 affected call sites, all of which continue
to pass a heaprel for recovery conflict purposes.  Note that nbtree's
use of P_NEW was superficial; nbtree never actually relied on the P_NEW
code paths in bufmgr.c, so this change is strictly mechanical.

GiST already took the same approach; it has a dedicated function for
allocating new pages called gistNewBuffer().  That factor allowed commit
61b313e4 to make much more targeted changes to GiST.

Author: Peter Geoghegan <pg@bowt.ie>
Reviewed-By: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CAH2-Wz=8Z9qY58bjm_7TAHgtW6RzZ5Ke62q5emdCEy9BAzwhmg@mail.gmail.com

12 files changed:
contrib/amcheck/verify_nbtree.c
src/backend/access/heap/heapam_handler.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtree.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtsort.c
src/backend/access/nbtree/nbtutils.c
src/backend/optimizer/util/plancat.c
src/backend/utils/sort/tuplesortvariants.c
src/include/access/nbtree.h
src/include/utils/tuplesort.h

index 6979aff727be0dfd604183ec43f62545a6b4db90..94a9759322e43aa175b7f761ae1e8d352bbc2598 100644 (file)
@@ -183,7 +183,6 @@ static inline bool invariant_l_nontarget_offset(BtreeCheckState *state,
                                                                                                OffsetNumber upperbound);
 static Page palloc_btree_page(BtreeCheckState *state, BlockNumber blocknum);
 static inline BTScanInsert bt_mkscankey_pivotsearch(Relation rel,
-                                                                                                       Relation heaprel,
                                                                                                        IndexTuple itup);
 static ItemId PageGetItemIdCareful(BtreeCheckState *state, BlockNumber block,
                                                                   Page page, OffsetNumber offset);
@@ -332,7 +331,7 @@ bt_index_check_internal(Oid indrelid, bool parentcheck, bool heapallindexed,
                                                        RelationGetRelationName(indrel))));
 
                /* Extract metadata from metapage, and sanitize it in passing */
-               _bt_metaversion(indrel, heaprel, &heapkeyspace, &allequalimage);
+               _bt_metaversion(indrel, &heapkeyspace, &allequalimage);
                if (allequalimage && !heapkeyspace)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INDEX_CORRUPTED),
@@ -1259,7 +1258,7 @@ bt_target_page_check(BtreeCheckState *state)
                }
 
                /* Build insertion scankey for current page offset */
-               skey = bt_mkscankey_pivotsearch(state->rel, state->heaprel, itup);
+               skey = bt_mkscankey_pivotsearch(state->rel, itup);
 
                /*
                 * Make sure tuple size does not exceed the relevant BTREE_VERSION
@@ -1769,7 +1768,7 @@ bt_right_page_check_scankey(BtreeCheckState *state)
         * memory remaining allocated.
         */
        firstitup = (IndexTuple) PageGetItem(rightpage, rightitem);
-       return bt_mkscankey_pivotsearch(state->rel, state->heaprel, firstitup);
+       return bt_mkscankey_pivotsearch(state->rel, firstitup);
 }
 
 /*
@@ -2682,7 +2681,7 @@ bt_rootdescend(BtreeCheckState *state, IndexTuple itup)
        Buffer          lbuf;
        bool            exists;
 
-       key = _bt_mkscankey(state->rel, state->heaprel, itup);
+       key = _bt_mkscankey(state->rel, itup);
        Assert(key->heapkeyspace && key->scantid != NULL);
 
        /*
@@ -2695,7 +2694,7 @@ bt_rootdescend(BtreeCheckState *state, IndexTuple itup)
         */
        Assert(state->readonly && state->rootdescend);
        exists = false;
-       stack = _bt_search(state->rel, state->heaprel, key, &lbuf, BT_READ, NULL);
+       stack = _bt_search(state->rel, NULL, key, &lbuf, BT_READ, NULL);
 
        if (BufferIsValid(lbuf))
        {
@@ -3134,11 +3133,11 @@ palloc_btree_page(BtreeCheckState *state, BlockNumber blocknum)
  * the scankey is greater.
  */
 static inline BTScanInsert
-bt_mkscankey_pivotsearch(Relation rel, Relation heaprel, IndexTuple itup)
+bt_mkscankey_pivotsearch(Relation rel, IndexTuple itup)
 {
        BTScanInsert skey;
 
-       skey = _bt_mkscankey(rel, heaprel, itup);
+       skey = _bt_mkscankey(rel, itup);
        skey->pivotsearch = true;
 
        return skey;
index 646135cc21c5a22603b654f087b89e3b052c4203..0755be8390112287c77e49ce7075bf9d94672c76 100644 (file)
@@ -731,14 +731,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                                                                 *multi_cutoff);
 
 
-       /*
-        * Set up sorting if wanted. NewHeap is being passed to
-        * tuplesort_begin_cluster(), it could have been OldHeap too. It does not
-        * really matter, as the goal is to have a heap relation being passed to
-        * _bt_log_reuse_page() (which should not be called from this code path).
-        */
+       /* Set up sorting if wanted */
        if (use_sort)
-               tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex, NewHeap,
+               tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
                                                                                        maintenance_work_mem,
                                                                                        NULL, TUPLESORT_NONE);
        else
index 84bbd78d59ca13435d0c691642aa583a40e1eac6..d33f814a938acb282f9cd7c8841ad511a75469da 100644 (file)
@@ -59,7 +59,7 @@ static Buffer _bt_split(Relation rel, Relation heaprel, BTScanInsert itup_key,
                                                IndexTuple nposting, uint16 postingoff);
 static void _bt_insert_parent(Relation rel, Relation heaprel, Buffer buf,
                                                          Buffer rbuf, BTStack stack, bool isroot, bool isonly);
-static Buffer _bt_newroot(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf);
+static Buffer _bt_newlevel(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf);
 static inline bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
                                                                OffsetNumber itup_off, bool newfirstdataitem);
 static void _bt_delete_or_dedup_one_page(Relation rel, Relation heapRel,
@@ -110,7 +110,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
        bool            checkingunique = (checkUnique != UNIQUE_CHECK_NO);
 
        /* we need an insertion scan key to do our search, so build one */
-       itup_key = _bt_mkscankey(rel, heapRel, itup);
+       itup_key = _bt_mkscankey(rel, itup);
 
        if (checkingunique)
        {
@@ -1024,13 +1024,15 @@ _bt_findinsertloc(Relation rel,
  * indexes.
  */
 static void
-_bt_stepright(Relation rel, Relation heaprel, BTInsertState insertstate, BTStack stack)
+_bt_stepright(Relation rel, Relation heaprel, BTInsertState insertstate,
+                         BTStack stack)
 {
        Page            page;
        BTPageOpaque opaque;
        Buffer          rbuf;
        BlockNumber rblkno;
 
+       Assert(heaprel != NULL);
        page = BufferGetPage(insertstate->buf);
        opaque = BTPageGetOpaque(page);
 
@@ -1145,7 +1147,7 @@ _bt_insertonpg(Relation rel,
 
        /*
         * Every internal page should have exactly one negative infinity item at
-        * all times.  Only _bt_split() and _bt_newroot() should add items that
+        * all times.  Only _bt_split() and _bt_newlevel() should add items that
         * become negative infinity items through truncation, since they're the
         * only routines that allocate new internal pages.
         */
@@ -1250,14 +1252,14 @@ _bt_insertonpg(Relation rel,
                 * only one on its tree level, but was not the root, it may have been
                 * the "fast root".  We need to ensure that the fast root link points
                 * at or above the current page.  We can safely acquire a lock on the
-                * metapage here --- see comments for _bt_newroot().
+                * metapage here --- see comments for _bt_newlevel().
                 */
                if (unlikely(split_only_page))
                {
                        Assert(!isleaf);
                        Assert(BufferIsValid(cbuf));
 
-                       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_WRITE);
+                       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
                        metapg = BufferGetPage(metabuf);
                        metad = BTPageGetMeta(metapg);
 
@@ -1421,7 +1423,7 @@ _bt_insertonpg(Relation rel,
                 * call _bt_getrootheight while holding a buffer lock.
                 */
                if (BlockNumberIsValid(blockcache) &&
-                       _bt_getrootheight(rel, heaprel) >= BTREE_FASTPATH_MIN_LEVEL)
+                       _bt_getrootheight(rel) >= BTREE_FASTPATH_MIN_LEVEL)
                        RelationSetTargetBlock(rel, blockcache);
        }
 
@@ -1715,7 +1717,7 @@ _bt_split(Relation rel, Relation heaprel, BTScanInsert itup_key, Buffer buf,
         * way because it avoids an unnecessary PANIC when either origpage or its
         * existing sibling page are corrupt.
         */
-       rbuf = _bt_getbuf(rel, heaprel, P_NEW, BT_WRITE);
+       rbuf = _bt_allocbuf(rel, heaprel);
        rightpage = BufferGetPage(rbuf);
        rightpagenumber = BufferGetBlockNumber(rbuf);
        /* rightpage was initialized by _bt_getbuf */
@@ -1888,7 +1890,7 @@ _bt_split(Relation rel, Relation heaprel, BTScanInsert itup_key, Buffer buf,
         */
        if (!isrightmost)
        {
-               sbuf = _bt_getbuf(rel, heaprel, oopaque->btpo_next, BT_WRITE);
+               sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
                spage = BufferGetPage(sbuf);
                sopaque = BTPageGetOpaque(spage);
                if (sopaque->btpo_prev != origpagenumber)
@@ -2102,6 +2104,8 @@ _bt_insert_parent(Relation rel,
                                  bool isroot,
                                  bool isonly)
 {
+       Assert(heaprel != NULL);
+
        /*
         * Here we have to do something Lehman and Yao don't talk about: deal with
         * a root split and construction of a new root.  If our stack is empty
@@ -2121,8 +2125,8 @@ _bt_insert_parent(Relation rel,
 
                Assert(stack == NULL);
                Assert(isonly);
-               /* create a new root node and update the metapage */
-               rootbuf = _bt_newroot(rel, heaprel, buf, rbuf);
+               /* create a new root node one level up and update the metapage */
+               rootbuf = _bt_newlevel(rel, heaprel, buf, rbuf);
                /* release the split buffers */
                _bt_relbuf(rel, rootbuf);
                _bt_relbuf(rel, rbuf);
@@ -2161,8 +2165,7 @@ _bt_insert_parent(Relation rel,
                                         BlockNumberIsValid(RelationGetTargetBlock(rel))));
 
                        /* Find the leftmost page at the next level up */
-                       pbuf = _bt_get_endpoint(rel, heaprel, opaque->btpo_level + 1, false,
-                                                                       NULL);
+                       pbuf = _bt_get_endpoint(rel, opaque->btpo_level + 1, false, NULL);
                        /* Set up a phony stack entry pointing there */
                        stack = &fakestack;
                        stack->bts_blkno = BufferGetBlockNumber(pbuf);
@@ -2230,6 +2233,9 @@ _bt_insert_parent(Relation rel,
  *
  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
  * and unpinned.
+ *
+ * Caller must provide a valid heaprel, since finishing a page split requires
+ * allocating a new page if and when the parent page splits in turn.
  */
 void
 _bt_finish_split(Relation rel, Relation heaprel, Buffer lbuf, BTStack stack)
@@ -2243,9 +2249,10 @@ _bt_finish_split(Relation rel, Relation heaprel, Buffer lbuf, BTStack stack)
        bool            wasonly;
 
        Assert(P_INCOMPLETE_SPLIT(lpageop));
+       Assert(heaprel != NULL);
 
        /* Lock right sibling, the one missing the downlink */
-       rbuf = _bt_getbuf(rel, heaprel, lpageop->btpo_next, BT_WRITE);
+       rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
        rpage = BufferGetPage(rbuf);
        rpageop = BTPageGetOpaque(rpage);
 
@@ -2257,7 +2264,7 @@ _bt_finish_split(Relation rel, Relation heaprel, Buffer lbuf, BTStack stack)
                BTMetaPageData *metad;
 
                /* acquire lock on the metapage */
-               metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_WRITE);
+               metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
                metapg = BufferGetPage(metabuf);
                metad = BTPageGetMeta(metapg);
 
@@ -2323,10 +2330,11 @@ _bt_getstackbuf(Relation rel, Relation heaprel, BTStack stack, BlockNumber child
                Page            page;
                BTPageOpaque opaque;
 
-               buf = _bt_getbuf(rel, heaprel, blkno, BT_WRITE);
+               buf = _bt_getbuf(rel, blkno, BT_WRITE);
                page = BufferGetPage(buf);
                opaque = BTPageGetOpaque(page);
 
+               Assert(heaprel != NULL);
                if (P_INCOMPLETE_SPLIT(opaque))
                {
                        _bt_finish_split(rel, heaprel, buf, stack->bts_parent);
@@ -2415,7 +2423,7 @@ _bt_getstackbuf(Relation rel, Relation heaprel, BTStack stack, BlockNumber child
 }
 
 /*
- *     _bt_newroot() -- Create a new root page for the index.
+ *     _bt_newlevel() -- Create a new level above root page.
  *
  *             We've just split the old root page and need to create a new one.
  *             In order to do this, we add a new root page to the file, then lock
@@ -2433,7 +2441,7 @@ _bt_getstackbuf(Relation rel, Relation heaprel, BTStack stack, BlockNumber child
  *             lbuf, rbuf & rootbuf.
  */
 static Buffer
-_bt_newroot(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf)
+_bt_newlevel(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf)
 {
        Buffer          rootbuf;
        Page            lpage,
@@ -2459,12 +2467,12 @@ _bt_newroot(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf)
        lopaque = BTPageGetOpaque(lpage);
 
        /* get a new root page */
-       rootbuf = _bt_getbuf(rel, heaprel, P_NEW, BT_WRITE);
+       rootbuf = _bt_allocbuf(rel, heaprel);
        rootpage = BufferGetPage(rootbuf);
        rootblknum = BufferGetBlockNumber(rootbuf);
 
        /* acquire lock on the metapage */
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_WRITE);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
        metapg = BufferGetPage(metabuf);
        metad = BTPageGetMeta(metapg);
 
index 486d1563d8a479cc106c99f3ec898c58f33ce81c..c2050656e43a87be583594d9a3fda4a1a20b1933 100644 (file)
 #include "utils/snapmgr.h"
 
 static BTMetaPageData *_bt_getmeta(Relation rel, Buffer metabuf);
-static void _bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
-                                                          FullTransactionId safexid);
-static void _bt_delitems_delete(Relation rel, Relation heaprel, Buffer buf,
+static void _bt_delitems_delete(Relation rel, Buffer buf,
                                                                TransactionId snapshotConflictHorizon,
+                                                               bool isCatalogRel,
                                                                OffsetNumber *deletable, int ndeletable,
                                                                BTVacuumPosting *updatable, int nupdatable);
 static char *_bt_delitems_update(BTVacuumPosting *updatable, int nupdatable,
@@ -177,7 +176,7 @@ _bt_getmeta(Relation rel, Buffer metabuf)
  * index tuples needed to be deleted.
  */
 bool
-_bt_vacuum_needs_cleanup(Relation rel, Relation heaprel)
+_bt_vacuum_needs_cleanup(Relation rel)
 {
        Buffer          metabuf;
        Page            metapg;
@@ -190,7 +189,7 @@ _bt_vacuum_needs_cleanup(Relation rel, Relation heaprel)
         *
         * Note that we deliberately avoid using cached version of metapage here.
         */
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
        metapg = BufferGetPage(metabuf);
        metad = BTPageGetMeta(metapg);
        btm_version = metad->btm_version;
@@ -230,7 +229,7 @@ _bt_vacuum_needs_cleanup(Relation rel, Relation heaprel)
  * finalized.
  */
 void
-_bt_set_cleanup_info(Relation rel, Relation heaprel, BlockNumber num_delpages)
+_bt_set_cleanup_info(Relation rel, BlockNumber num_delpages)
 {
        Buffer          metabuf;
        Page            metapg;
@@ -254,7 +253,7 @@ _bt_set_cleanup_info(Relation rel, Relation heaprel, BlockNumber num_delpages)
         * no longer used as of PostgreSQL 14.  We set it to -1.0 on rewrite, just
         * to be consistent.
         */
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
        metapg = BufferGetPage(metabuf);
        metad = BTPageGetMeta(metapg);
 
@@ -326,6 +325,9 @@ _bt_set_cleanup_info(Relation rel, Relation heaprel, BlockNumber num_delpages)
  *             NOTE that the returned root page will have only a read lock set
  *             on it even if access = BT_WRITE!
  *
+ *             If access = BT_WRITE, heaprel must be set; otherwise caller can just
+ *             pass NULL.  See _bt_allocbuf for an explanation.
+ *
  *             The returned page is not necessarily the true root --- it could be
  *             a "fast root" (a page that is alone in its level due to deletions).
  *             Also, if the root page is split while we are "in flight" to it,
@@ -349,6 +351,8 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
        uint32          rootlevel;
        BTMetaPageData *metad;
 
+       Assert(access == BT_READ || heaprel != NULL);
+
        /*
         * Try to use previously-cached metapage data to find the root.  This
         * normally saves one buffer access per index search, which is a very
@@ -369,7 +373,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
                Assert(rootblkno != P_NONE);
                rootlevel = metad->btm_fastlevel;
 
-               rootbuf = _bt_getbuf(rel, heaprel, rootblkno, BT_READ);
+               rootbuf = _bt_getbuf(rel, rootblkno, BT_READ);
                rootpage = BufferGetPage(rootbuf);
                rootopaque = BTPageGetOpaque(rootpage);
 
@@ -395,7 +399,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
                rel->rd_amcache = NULL;
        }
 
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
        metad = _bt_getmeta(rel, metabuf);
 
        /* if no root page initialized yet, do it */
@@ -436,7 +440,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
                 * the new root page.  Since this is the first page in the tree, it's
                 * a leaf as well as the root.
                 */
-               rootbuf = _bt_getbuf(rel, heaprel, P_NEW, BT_WRITE);
+               rootbuf = _bt_allocbuf(rel, heaprel);
                rootblkno = BufferGetBlockNumber(rootbuf);
                rootpage = BufferGetPage(rootbuf);
                rootopaque = BTPageGetOpaque(rootpage);
@@ -573,7 +577,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
  * moving to the root --- that'd deadlock against any concurrent root split.)
  */
 Buffer
-_bt_gettrueroot(Relation rel, Relation heaprel)
+_bt_gettrueroot(Relation rel)
 {
        Buffer          metabuf;
        Page            metapg;
@@ -595,7 +599,7 @@ _bt_gettrueroot(Relation rel, Relation heaprel)
                pfree(rel->rd_amcache);
        rel->rd_amcache = NULL;
 
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
        metapg = BufferGetPage(metabuf);
        metaopaque = BTPageGetOpaque(metapg);
        metad = BTPageGetMeta(metapg);
@@ -668,7 +672,7 @@ _bt_gettrueroot(Relation rel, Relation heaprel)
  *             about updating previously cached data.
  */
 int
-_bt_getrootheight(Relation rel, Relation heaprel)
+_bt_getrootheight(Relation rel)
 {
        BTMetaPageData *metad;
 
@@ -676,7 +680,7 @@ _bt_getrootheight(Relation rel, Relation heaprel)
        {
                Buffer          metabuf;
 
-               metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+               metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
                metad = _bt_getmeta(rel, metabuf);
 
                /*
@@ -732,7 +736,7 @@ _bt_getrootheight(Relation rel, Relation heaprel)
  *             pg_upgrade'd from Postgres 12.
  */
 void
-_bt_metaversion(Relation rel, Relation heaprel, bool *heapkeyspace, bool *allequalimage)
+_bt_metaversion(Relation rel, bool *heapkeyspace, bool *allequalimage)
 {
        BTMetaPageData *metad;
 
@@ -740,7 +744,7 @@ _bt_metaversion(Relation rel, Relation heaprel, bool *heapkeyspace, bool *allequ
        {
                Buffer          metabuf;
 
-               metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+               metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
                metad = _bt_getmeta(rel, metabuf);
 
                /*
@@ -821,37 +825,7 @@ _bt_checkpage(Relation rel, Buffer buf)
 }
 
 /*
- * Log the reuse of a page from the FSM.
- */
-static void
-_bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
-                                  FullTransactionId safexid)
-{
-       xl_btree_reuse_page xlrec_reuse;
-
-       /*
-        * Note that we don't register the buffer with the record, because this
-        * operation doesn't modify the page. This record only exists to provide a
-        * conflict point for Hot Standby.
-        */
-
-       /* XLOG stuff */
-       xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
-       xlrec_reuse.locator = rel->rd_locator;
-       xlrec_reuse.block = blkno;
-       xlrec_reuse.snapshotConflictHorizon = safexid;
-
-       XLogBeginInsert();
-       XLogRegisterData((char *) &xlrec_reuse, SizeOfBtreeReusePage);
-
-       XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE);
-}
-
-/*
- *     _bt_getbuf() -- Get a buffer by block number for read or write.
- *
- *             blkno == P_NEW means to get an unallocated index page.  The page
- *             will be initialized before returning it.
+ *     _bt_getbuf() -- Get an existing block in a buffer, for read or write.
  *
  *             The general rule in nbtree is that it's never okay to access a
  *             page without holding both a buffer pin and a buffer lock on
@@ -860,136 +834,165 @@ _bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
  *             When this routine returns, the appropriate lock is set on the
  *             requested buffer and its reference count has been incremented
  *             (ie, the buffer is "locked and pinned").  Also, we apply
- *             _bt_checkpage to sanity-check the page (except in P_NEW case),
- *             and perform Valgrind client requests that help Valgrind detect
- *             unsafe page accesses.
+ *             _bt_checkpage to sanity-check the page, and perform Valgrind
+ *             client requests that help Valgrind detect unsafe page accesses.
  *
  *             Note: raw LockBuffer() calls are disallowed in nbtree; all
  *             buffer lock requests need to go through wrapper functions such
  *             as _bt_lockbuf().
  */
 Buffer
-_bt_getbuf(Relation rel, Relation heaprel, BlockNumber blkno, int access)
+_bt_getbuf(Relation rel, BlockNumber blkno, int access)
 {
        Buffer          buf;
 
-       if (blkno != P_NEW)
-       {
-               /* Read an existing block of the relation */
-               buf = ReadBuffer(rel, blkno);
-               _bt_lockbuf(rel, buf, access);
-               _bt_checkpage(rel, buf);
-       }
-       else
-       {
-               Page            page;
+       Assert(BlockNumberIsValid(blkno));
 
-               Assert(access == BT_WRITE);
+       /* Read an existing block of the relation */
+       buf = ReadBuffer(rel, blkno);
+       _bt_lockbuf(rel, buf, access);
+       _bt_checkpage(rel, buf);
 
-               /*
-                * First see if the FSM knows of any free pages.
-                *
-                * We can't trust the FSM's report unreservedly; we have to check that
-                * the page is still free.  (For example, an already-free page could
-                * have been re-used between the time the last VACUUM scanned it and
-                * the time the VACUUM made its FSM updates.)
-                *
-                * In fact, it's worse than that: we can't even assume that it's safe
-                * to take a lock on the reported page.  If somebody else has a lock
-                * on it, or even worse our own caller does, we could deadlock.  (The
-                * own-caller scenario is actually not improbable. Consider an index
-                * on a serial or timestamp column.  Nearly all splits will be at the
-                * rightmost page, so it's entirely likely that _bt_split will call us
-                * while holding a lock on the page most recently acquired from FSM. A
-                * VACUUM running concurrently with the previous split could well have
-                * placed that page back in FSM.)
-                *
-                * To get around that, we ask for only a conditional lock on the
-                * reported page.  If we fail, then someone else is using the page,
-                * and we may reasonably assume it's not free.  (If we happen to be
-                * wrong, the worst consequence is the page will be lost to use till
-                * the next VACUUM, which is no big problem.)
-                */
-               for (;;)
+       return buf;
+}
+
+/*
+ *     _bt_allocbuf() -- Allocate a new block/page.
+ *
+ * Returns a write-locked buffer containing an unallocated nbtree page.
+ *
+ * Callers are required to pass a valid heaprel.  We need heaprel so that we
+ * can handle generating a snapshotConflictHorizon that makes reusing a page
+ * from the FSM safe for queries that may be running on standbys.
+ */
+Buffer
+_bt_allocbuf(Relation rel, Relation heaprel)
+{
+       Buffer          buf;
+       BlockNumber blkno;
+       Page            page;
+
+       Assert(heaprel != NULL);
+
+       /*
+        * First see if the FSM knows of any free pages.
+        *
+        * We can't trust the FSM's report unreservedly; we have to check that the
+        * page is still free.  (For example, an already-free page could have been
+        * re-used between the time the last VACUUM scanned it and the time the
+        * VACUUM made its FSM updates.)
+        *
+        * In fact, it's worse than that: we can't even assume that it's safe to
+        * take a lock on the reported page.  If somebody else has a lock on it,
+        * or even worse our own caller does, we could deadlock.  (The own-caller
+        * scenario is actually not improbable. Consider an index on a serial or
+        * timestamp column.  Nearly all splits will be at the rightmost page, so
+        * it's entirely likely that _bt_split will call us while holding a lock
+        * on the page most recently acquired from FSM. A VACUUM running
+        * concurrently with the previous split could well have placed that page
+        * back in FSM.)
+        *
+        * To get around that, we ask for only a conditional lock on the reported
+        * page.  If we fail, then someone else is using the page, and we may
+        * reasonably assume it's not free.  (If we happen to be wrong, the worst
+        * consequence is the page will be lost to use till the next VACUUM, which
+        * is no big problem.)
+        */
+       for (;;)
+       {
+               blkno = GetFreeIndexPage(rel);
+               if (blkno == InvalidBlockNumber)
+                       break;
+               buf = ReadBuffer(rel, blkno);
+               if (_bt_conditionallockbuf(rel, buf))
                {
-                       blkno = GetFreeIndexPage(rel);
-                       if (blkno == InvalidBlockNumber)
-                               break;
-                       buf = ReadBuffer(rel, blkno);
-                       if (_bt_conditionallockbuf(rel, buf))
+                       page = BufferGetPage(buf);
+
+                       /*
+                        * It's possible to find an all-zeroes page in an index.  For
+                        * example, a backend might successfully extend the relation one
+                        * page and then crash before it is able to make a WAL entry for
+                        * adding the page.  If we find a zeroed page then reclaim it
+                        * immediately.
+                        */
+                       if (PageIsNew(page))
                        {
-                               page = BufferGetPage(buf);
+                               /* Okay to use page.  Initialize and return it. */
+                               _bt_pageinit(page, BufferGetPageSize(buf));
+                               return buf;
+                       }
 
+                       if (BTPageIsRecyclable(page, heaprel))
+                       {
                                /*
-                                * It's possible to find an all-zeroes page in an index.  For
-                                * example, a backend might successfully extend the relation
-                                * one page and then crash before it is able to make a WAL
-                                * entry for adding the page.  If we find a zeroed page then
-                                * reclaim it immediately.
+                                * If we are generating WAL for Hot Standby then create a WAL
+                                * record that will allow us to conflict with queries running
+                                * on standby, in case they have snapshots older than safexid
+                                * value
                                 */
-                               if (PageIsNew(page))
+                               if (RelationNeedsWAL(rel) && XLogStandbyInfoActive())
                                {
-                                       /* Okay to use page.  Initialize and return it. */
-                                       _bt_pageinit(page, BufferGetPageSize(buf));
-                                       return buf;
-                               }
+                                       xl_btree_reuse_page xlrec_reuse;
 
-                               if (BTPageIsRecyclable(page, heaprel))
-                               {
                                        /*
-                                        * If we are generating WAL for Hot Standby then create a
-                                        * WAL record that will allow us to conflict with queries
-                                        * running on standby, in case they have snapshots older
-                                        * than safexid value
+                                        * Note that we don't register the buffer with the record,
+                                        * because this operation doesn't modify the page (that
+                                        * already happened, back when VACUUM deleted the page).
+                                        * This record only exists to provide a conflict point for
+                                        * Hot Standby.  See record REDO routine comments.
                                         */
-                                       if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
-                                               _bt_log_reuse_page(rel, heaprel, blkno,
-                                                                                  BTPageGetDeleteXid(page));
+                                       xlrec_reuse.locator = rel->rd_locator;
+                                       xlrec_reuse.block = blkno;
+                                       xlrec_reuse.snapshotConflictHorizon = BTPageGetDeleteXid(page);
+                                       xlrec_reuse.isCatalogRel =
+                                               RelationIsAccessibleInLogicalDecoding(heaprel);
+
+                                       XLogBeginInsert();
+                                       XLogRegisterData((char *) &xlrec_reuse, SizeOfBtreeReusePage);
 
-                                       /* Okay to use page.  Re-initialize and return it. */
-                                       _bt_pageinit(page, BufferGetPageSize(buf));
-                                       return buf;
+                                       XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE);
                                }
-                               elog(DEBUG2, "FSM returned nonrecyclable page");
-                               _bt_relbuf(rel, buf);
-                       }
-                       else
-                       {
-                               elog(DEBUG2, "FSM returned nonlockable page");
-                               /* couldn't get lock, so just drop pin */
-                               ReleaseBuffer(buf);
+
+                               /* Okay to use page.  Re-initialize and return it. */
+                               _bt_pageinit(page, BufferGetPageSize(buf));
+                               return buf;
                        }
+                       elog(DEBUG2, "FSM returned nonrecyclable page");
+                       _bt_relbuf(rel, buf);
                }
+               else
+               {
+                       elog(DEBUG2, "FSM returned nonlockable page");
+                       /* couldn't get lock, so just drop pin */
+                       ReleaseBuffer(buf);
+               }
+       }
 
-               /*
-                * Extend the relation by one page. Need to use RBM_ZERO_AND_LOCK or
-                * we risk a race condition against btvacuumscan --- see comments
-                * therein. This forces us to repeat the valgrind request that
-                * _bt_lockbuf() otherwise would make, as we can't use _bt_lockbuf()
-                * without introducing a race.
-                */
-               buf = ExtendBufferedRel(EB_REL(rel), MAIN_FORKNUM, NULL,
-                                                               EB_LOCK_FIRST);
-               if (!RelationUsesLocalBuffers(rel))
-                       VALGRIND_MAKE_MEM_DEFINED(BufferGetPage(buf), BLCKSZ);
+       /*
+        * Extend the relation by one page. Need to use RBM_ZERO_AND_LOCK or we
+        * risk a race condition against btvacuumscan --- see comments therein.
+        * This forces us to repeat the valgrind request that _bt_lockbuf()
+        * otherwise would make, as we can't use _bt_lockbuf() without introducing
+        * a race.
+        */
+       buf = ExtendBufferedRel(EB_REL(rel), MAIN_FORKNUM, NULL, EB_LOCK_FIRST);
+       if (!RelationUsesLocalBuffers(rel))
+               VALGRIND_MAKE_MEM_DEFINED(BufferGetPage(buf), BLCKSZ);
 
-               /* Initialize the new page before returning it */
-               page = BufferGetPage(buf);
-               Assert(PageIsNew(page));
-               _bt_pageinit(page, BufferGetPageSize(buf));
-       }
+       /* Initialize the new page before returning it */
+       page = BufferGetPage(buf);
+       Assert(PageIsNew(page));
+       _bt_pageinit(page, BufferGetPageSize(buf));
 
-       /* ref count and lock type are correct */
        return buf;
 }
 
 /*
  *     _bt_relandgetbuf() -- release a locked buffer and get another one.
  *
- * This is equivalent to _bt_relbuf followed by _bt_getbuf, with the
- * exception that blkno may not be P_NEW.  Also, if obuf is InvalidBuffer
- * then it reduces to just _bt_getbuf; allowing this case simplifies some
- * calle