#include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/smgr.h"
 #include "storage/standby.h"
                                ForkNumber forkNum, BlockNumber blockNum,
                                ReadBufferMode mode, BufferAccessStrategy strategy,
                                bool *hit);
+static BlockNumber ExtendBufferedRelCommon(ExtendBufferedWhat eb,
+                                          ForkNumber fork,
+                                          BufferAccessStrategy strategy,
+                                          uint32 flags,
+                                          uint32 extend_by,
+                                          BlockNumber extend_upto,
+                                          Buffer *buffers,
+                                          uint32 *extended_by);
+static BlockNumber ExtendBufferedRelShared(ExtendBufferedWhat eb,
+                                          ForkNumber fork,
+                                          BufferAccessStrategy strategy,
+                                          uint32 flags,
+                                          uint32 extend_by,
+                                          BlockNumber extend_upto,
+                                          Buffer *buffers,
+                                          uint32 *extended_by);
 static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(BufferDesc *buf);
 static void UnpinBuffer(BufferDesc *buf);
                             mode, strategy, &hit);
 }
 
+/*
+ * Convenience wrapper around ExtendBufferedRelBy() extending by one block.
+ */
+Buffer
+ExtendBufferedRel(ExtendBufferedWhat eb,
+                 ForkNumber forkNum,
+                 BufferAccessStrategy strategy,
+                 uint32 flags)
+{
+   Buffer      buf;
+   uint32      extend_by = 1;
+
+   ExtendBufferedRelBy(eb, forkNum, strategy, flags, extend_by,
+                       &buf, &extend_by);
+
+   return buf;
+}
+
+/*
+ * Extend relation by multiple blocks.
+ *
+ * Tries to extend the relation by extend_by blocks. Depending on the
+ * availability of resources the relation may end up being extended by a
+ * smaller number of pages (unless an error is thrown, always by at least one
+ * page). *extended_by is updated to the number of pages the relation has been
+ * extended to.
+ *
+ * buffers needs to be an array that is at least extend_by long. Upon
+ * completion, the first extend_by array elements will point to a pinned
+ * buffer.
+ *
+ * If EB_LOCK_FIRST is part of flags, the first returned buffer is
+ * locked. This is useful for callers that want a buffer that is guaranteed to
+ * be empty.
+ */
+BlockNumber
+ExtendBufferedRelBy(ExtendBufferedWhat eb,
+                   ForkNumber fork,
+                   BufferAccessStrategy strategy,
+                   uint32 flags,
+                   uint32 extend_by,
+                   Buffer *buffers,
+                   uint32 *extended_by)
+{
+   Assert((eb.rel != NULL) != (eb.smgr != NULL));
+   Assert(eb.smgr == NULL || eb.relpersistence != 0);
+   Assert(extend_by > 0);
+
+   if (eb.smgr == NULL)
+   {
+       eb.smgr = RelationGetSmgr(eb.rel);
+       eb.relpersistence = eb.rel->rd_rel->relpersistence;
+   }
+
+   return ExtendBufferedRelCommon(eb, fork, strategy, flags,
+                                  extend_by, InvalidBlockNumber,
+                                  buffers, extended_by);
+}
+
+/*
+ * Extend the relation so it is at least extend_to blocks large, return buffer
+ * (extend_to - 1).
+ *
+ * This is useful for callers that want to write a specific page, regardless
+ * of the current size of the relation (e.g. useful for visibilitymap and for
+ * crash recovery).
+ */
+Buffer
+ExtendBufferedRelTo(ExtendBufferedWhat eb,
+                   ForkNumber fork,
+                   BufferAccessStrategy strategy,
+                   uint32 flags,
+                   BlockNumber extend_to,
+                   ReadBufferMode mode)
+{
+   BlockNumber current_size;
+   uint32      extended_by = 0;
+   Buffer      buffer = InvalidBuffer;
+   Buffer      buffers[64];
+
+   Assert((eb.rel != NULL) != (eb.smgr != NULL));
+   Assert(eb.smgr == NULL || eb.relpersistence != 0);
+   Assert(extend_to != InvalidBlockNumber && extend_to > 0);
+   Assert(mode == RBM_NORMAL || mode == RBM_ZERO_ON_ERROR ||
+          mode == RBM_ZERO_AND_LOCK);
+
+   if (eb.smgr == NULL)
+   {
+       eb.smgr = RelationGetSmgr(eb.rel);
+       eb.relpersistence = eb.rel->rd_rel->relpersistence;
+   }
+
+   /*
+    * If desired, create the file if it doesn't exist.  If
+    * smgr_cached_nblocks[fork] is positive then it must exist, no need for
+    * an smgrexists call.
+    */
+   if ((flags & EB_CREATE_FORK_IF_NEEDED) &&
+       (eb.smgr->smgr_cached_nblocks[fork] == 0 ||
+        eb.smgr->smgr_cached_nblocks[fork] == InvalidBlockNumber) &&
+       !smgrexists(eb.smgr, fork))
+   {
+       LockRelationForExtension(eb.rel, ExclusiveLock);
+
+       /* could have been closed while waiting for lock */
+       if (eb.rel)
+           eb.smgr = RelationGetSmgr(eb.rel);
+
+       /* recheck, fork might have been created concurrently */
+       if (!smgrexists(eb.smgr, fork))
+           smgrcreate(eb.smgr, fork, flags & EB_PERFORMING_RECOVERY);
+
+       UnlockRelationForExtension(eb.rel, ExclusiveLock);
+   }
+
+   /*
+    * If requested, invalidate size cache, so that smgrnblocks asks the
+    * kernel.
+    */
+   if (flags & EB_CLEAR_SIZE_CACHE)
+       eb.smgr->smgr_cached_nblocks[fork] = InvalidBlockNumber;
+
+   /*
+    * Estimate how many pages we'll need to extend by. This avoids acquiring
+    * unnecessarily many victim buffers.
+    */
+   current_size = smgrnblocks(eb.smgr, fork);
+
+   if (mode == RBM_ZERO_AND_LOCK)
+       flags |= EB_LOCK_TARGET;
+
+   while (current_size < extend_to)
+   {
+       uint32      num_pages = lengthof(buffers);
+       BlockNumber first_block;
+
+       if ((uint64) current_size + num_pages > extend_to)
+           num_pages = extend_to - current_size;
+
+       first_block = ExtendBufferedRelCommon(eb, fork, strategy, flags,
+                                             num_pages, extend_to,
+                                             buffers, &extended_by);
+
+       current_size = first_block + extended_by;
+       Assert(current_size <= extend_to);
+       Assert(num_pages != 0 || current_size >= extend_to);
+
+       for (int i = 0; i < extended_by; i++)
+       {
+           if (first_block + i != extend_to - 1)
+               ReleaseBuffer(buffers[i]);
+           else
+               buffer = buffers[i];
+       }
+   }
+
+   /*
+    * It's possible that another backend concurrently extended the relation.
+    * In that case read the buffer.
+    *
+    * XXX: Should we control this via a flag?
+    */
+   if (buffer == InvalidBuffer)
+   {
+       bool        hit;
+
+       Assert(extended_by == 0);
+       buffer = ReadBuffer_common(eb.smgr, eb.relpersistence,
+                                  fork, extend_to - 1, mode, strategy,
+                                  &hit);
+   }
+
+   return buffer;
+}
 
 /*
  * ReadBuffer_common -- common logic for all ReadBuffer variants
    bool        found;
    IOContext   io_context;
    IOObject    io_object;
-   bool        isExtend;
    bool        isLocalBuf = SmgrIsTemp(smgr);
 
    *hit = false;
 
+   /*
+    * Backward compatibility path, most code should use ExtendBufferedRel()
+    * instead, as acquiring the extension lock inside ExtendBufferedRel()
+    * scales a lot better.
+    */
+   if (unlikely(blockNum == P_NEW))
+   {
+       uint32      flags = EB_SKIP_EXTENSION_LOCK;
+
+       Assert(mode == RBM_NORMAL ||
+              mode == RBM_ZERO_AND_LOCK ||
+              mode == RBM_ZERO_ON_ERROR);
+
+       if (mode == RBM_ZERO_AND_LOCK)
+           flags |= EB_LOCK_FIRST;
+
+       return ExtendBufferedRel(EB_SMGR(smgr, relpersistence),
+                                forkNum, strategy, flags);
+   }
+
    /* Make sure we will have room to remember the buffer pin */
    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
 
-   isExtend = (blockNum == P_NEW);
-
    TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
                                       smgr->smgr_rlocator.locator.spcOid,
                                       smgr->smgr_rlocator.locator.dbOid,
                                       smgr->smgr_rlocator.locator.relNumber,
-                                      smgr->smgr_rlocator.backend,
-                                      isExtend);
-
-   /* Substitute proper block number if caller asked for P_NEW */
-   if (isExtend)
-   {
-       blockNum = smgrnblocks(smgr, forkNum);
-       /* Fail if relation is already at maximum possible length */
-       if (blockNum == P_NEW)
-           ereport(ERROR,
-                   (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                    errmsg("cannot extend relation %s beyond %u blocks",
-                           relpath(smgr->smgr_rlocator, forkNum),
-                           P_NEW)));
-   }
+                                      smgr->smgr_rlocator.backend);
 
    if (isLocalBuf)
    {
        bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
        if (found)
            pgBufferUsage.local_blks_hit++;
-       else if (isExtend)
-           pgBufferUsage.local_blks_written++;
        else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
                 mode == RBM_ZERO_ON_ERROR)
            pgBufferUsage.local_blks_read++;
                             strategy, &found, io_context);
        if (found)
            pgBufferUsage.shared_blks_hit++;
-       else if (isExtend)
-           pgBufferUsage.shared_blks_written++;
        else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
                 mode == RBM_ZERO_ON_ERROR)
            pgBufferUsage.shared_blks_read++;
    /* if it was already in the buffer pool, we're done */
    if (found)
    {
-       if (!isExtend)
-       {
-           /* Just need to update stats before we exit */
-           *hit = true;
-           VacuumPageHit++;
-           pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
-           if (VacuumCostActive)
-               VacuumCostBalance += VacuumCostPageHit;
-
-           TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-                                             smgr->smgr_rlocator.locator.spcOid,
-                                             smgr->smgr_rlocator.locator.dbOid,
-                                             smgr->smgr_rlocator.locator.relNumber,
-                                             smgr->smgr_rlocator.backend,
-                                             isExtend,
-                                             found);
+       /* Just need to update stats before we exit */
+       *hit = true;
+       VacuumPageHit++;
+       pgstat_count_io_op(io_object, io_context, IOOP_HIT);
 
-           /*
-            * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
-            * locked on return.
-            */
-           if (!isLocalBuf)
-           {
-               if (mode == RBM_ZERO_AND_LOCK)
-                   LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-                                 LW_EXCLUSIVE);
-               else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-                   LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-           }
-
-           return BufferDescriptorGetBuffer(bufHdr);
-       }
+       if (VacuumCostActive)
+           VacuumCostBalance += VacuumCostPageHit;
 
-       /*
-        * We get here only in the corner case where we are trying to extend
-        * the relation but we found a pre-existing buffer marked BM_VALID.
-        * This can happen because mdread doesn't complain about reads beyond
-        * EOF (when zero_damaged_pages is ON) and so a previous attempt to
-        * read a block beyond EOF could have left a "valid" zero-filled
-        * buffer.  Unfortunately, we have also seen this case occurring
-        * because of buggy Linux kernels that sometimes return an
-        * lseek(SEEK_END) result that doesn't account for a recent write. In
-        * that situation, the pre-existing buffer would contain valid data
-        * that we don't want to overwrite.  Since the legitimate case should
-        * always have left a zero-filled buffer, complain if not PageIsNew.
-        */
-       bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
-       if (!PageIsNew((Page) bufBlock))
-           ereport(ERROR,
-                   (errmsg("unexpected data beyond EOF in block %u of relation %s",
-                           blockNum, relpath(smgr->smgr_rlocator, forkNum)),
-                    errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
+       TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
+                                         smgr->smgr_rlocator.locator.spcOid,
+                                         smgr->smgr_rlocator.locator.dbOid,
+                                         smgr->smgr_rlocator.locator.relNumber,
+                                         smgr->smgr_rlocator.backend,
+                                         found);
 
        /*
-        * We *must* do smgrextend before succeeding, else the page will not
-        * be reserved by the kernel, and the next P_NEW call will decide to
-        * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
-        * call that BufferAlloc didn't, and proceed.
+        * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
+        * on return.
         */
-       if (isLocalBuf)
+       if (!isLocalBuf)
        {
-           /* Only need to adjust flags */
-           uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
-
-           Assert(buf_state & BM_VALID);
-           buf_state &= ~BM_VALID;
-           pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+           if (mode == RBM_ZERO_AND_LOCK)
+               LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                             LW_EXCLUSIVE);
+           else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
+               LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
        }
-       else
-       {
-           /*
-            * Loop to handle the very small possibility that someone re-sets
-            * BM_VALID between our clearing it and StartBufferIO inspecting
-            * it.
-            */
-           do
-           {
-               uint32      buf_state = LockBufHdr(bufHdr);
 
-               Assert(buf_state & BM_VALID);
-               buf_state &= ~BM_VALID;
-               UnlockBufHdr(bufHdr, buf_state);
-           } while (!StartBufferIO(bufHdr, true));
-       }
+       return BufferDescriptorGetBuffer(bufHdr);
    }
 
    /*
     * if we have gotten to this point, we have allocated a buffer for the
     * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
     * if it's a shared buffer.
-    *
-    * Note: if smgrextend fails, we will end up with a buffer that is
-    * allocated but not marked BM_VALID.  P_NEW will still select the same
-    * block number (because the relation didn't get any longer on disk) and
-    * so future attempts to extend the relation will find the same buffer (if
-    * it's not been recycled) but come right back here to try smgrextend
-    * again.
     */
    Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));   /* spinlock not needed */
 
    bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
 
-   if (isExtend)
-   {
-       /* new buffers are zero-filled */
+   /*
+    * Read in the page, unless the caller intends to overwrite it and just
+    * wants us to allocate a buffer.
+    */
+   if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
        MemSet((char *) bufBlock, 0, BLCKSZ);
-       /* don't set checksum for all-zero page */
-       smgrextend(smgr, forkNum, blockNum, bufBlock, false);
-
-       pgstat_count_io_op(io_object, io_context, IOOP_EXTEND);
-
-       /*
-        * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
-        * although we're essentially performing a write. At least on linux
-        * doing so defeats the 'delayed allocation' mechanism, leading to
-        * increased file fragmentation.
-        */
-   }
    else
    {
-       /*
-        * Read in the page, unless the caller intends to overwrite it and
-        * just wants us to allocate a buffer.
-        */
-       if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-           MemSet((char *) bufBlock, 0, BLCKSZ);
-       else
-       {
-           instr_time  io_start,
-                       io_time;
+       instr_time  io_start,
+                   io_time;
 
-           if (track_io_timing)
-               INSTR_TIME_SET_CURRENT(io_start);
-           else
-               INSTR_TIME_SET_ZERO(io_start);
+       if (track_io_timing)
+           INSTR_TIME_SET_CURRENT(io_start);
 
-           smgrread(smgr, forkNum, blockNum, bufBlock);
+       smgrread(smgr, forkNum, blockNum, bufBlock);
 
-           pgstat_count_io_op(io_object, io_context, IOOP_READ);
+       if (track_io_timing)
+       {
+           INSTR_TIME_SET_CURRENT(io_time);
+           INSTR_TIME_SUBTRACT(io_time, io_start);
+           pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
+           INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
+       }
 
-           if (track_io_timing)
-           {
-               INSTR_TIME_SET_CURRENT(io_time);
-               INSTR_TIME_SUBTRACT(io_time, io_start);
-               pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
-               INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
-           }
+       pgstat_count_io_op(io_object, io_context, IOOP_READ);
 
-           /* check for garbage data */
-           if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-                                       PIV_LOG_WARNING | PIV_REPORT_STAT))
+       /* check for garbage data */
+       if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
+                                   PIV_LOG_WARNING | PIV_REPORT_STAT))
+       {
+           if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
            {
-               if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
-               {
-                   ereport(WARNING,
-                           (errcode(ERRCODE_DATA_CORRUPTED),
-                            errmsg("invalid page in block %u of relation %s; zeroing out page",
-                                   blockNum,
-                                   relpath(smgr->smgr_rlocator, forkNum))));
-                   MemSet((char *) bufBlock, 0, BLCKSZ);
-               }
-               else
-                   ereport(ERROR,
-                           (errcode(ERRCODE_DATA_CORRUPTED),
-                            errmsg("invalid page in block %u of relation %s",
-                                   blockNum,
-                                   relpath(smgr->smgr_rlocator, forkNum))));
+               ereport(WARNING,
+                       (errcode(ERRCODE_DATA_CORRUPTED),
+                        errmsg("invalid page in block %u of relation %s; zeroing out page",
+                               blockNum,
+                               relpath(smgr->smgr_rlocator, forkNum))));
+               MemSet((char *) bufBlock, 0, BLCKSZ);
            }
+           else
+               ereport(ERROR,
+                       (errcode(ERRCODE_DATA_CORRUPTED),
+                        errmsg("invalid page in block %u of relation %s",
+                               blockNum,
+                               relpath(smgr->smgr_rlocator, forkNum))));
        }
    }
 
                                      smgr->smgr_rlocator.locator.dbOid,
                                      smgr->smgr_rlocator.locator.relNumber,
                                      smgr->smgr_rlocator.backend,
-                                     isExtend,
                                      found);
 
    return BufferDescriptorGetBuffer(bufHdr);
        UnpinBuffer(victim_buf_hdr);
 
        /*
-        * The victim buffer we acquired peviously is clean and unused,
-        * let it be found again quickly
+        * The victim buffer we acquired peviously is clean and unused, let it
+        * be found again quickly
         */
        StrategyFreeBuffer(victim_buf_hdr);
 
    return buf;
 }
 
+/*
+ * Limit the number of pins a batch operation may additionally acquire, to
+ * avoid running out of pinnable buffers.
+ *
+ * One additional pin is always allowed, as otherwise the operation likely
+ * cannot be performed at all.
+ *
+ * The number of allowed pins for a backend is computed based on
+ * shared_buffers and the maximum number of connections possible. That's very
+ * pessimistic, but outside of toy-sized shared_buffers it should allow
+ * sufficient pins.
+ */
+static void
+LimitAdditionalPins(uint32 *additional_pins)
+{
+   uint32      max_backends;
+   int         max_proportional_pins;
+
+   if (*additional_pins <= 1)
+       return;
+
+   max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
+   max_proportional_pins = NBuffers / max_backends;
+
+   /*
+    * Subtract the approximate number of buffers already pinned by this
+    * backend. We get the number of "overflowed" pins for free, but don't
+    * know the number of pins in PrivateRefCountArray. The cost of
+    * calculating that exactly doesn't seem worth it, so just assume the max.
+    */
+   max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
+
+   if (max_proportional_pins < 0)
+       max_proportional_pins = 1;
+
+   if (*additional_pins > max_proportional_pins)
+       *additional_pins = max_proportional_pins;
+}
+
+/*
+ * Logic shared between ExtendBufferedRelBy(), ExtendBufferedRelTo(). Just to
+ * avoid duplicating the tracing and relpersistence related logic.
+ */
+static BlockNumber
+ExtendBufferedRelCommon(ExtendBufferedWhat eb,
+                       ForkNumber fork,
+                       BufferAccessStrategy strategy,
+                       uint32 flags,
+                       uint32 extend_by,
+                       BlockNumber extend_upto,
+                       Buffer *buffers,
+                       uint32 *extended_by)
+{
+   BlockNumber first_block;
+
+   TRACE_POSTGRESQL_BUFFER_EXTEND_START(fork,
+                                        eb.smgr->smgr_rlocator.locator.spcOid,
+                                        eb.smgr->smgr_rlocator.locator.dbOid,
+                                        eb.smgr->smgr_rlocator.locator.relNumber,
+                                        eb.smgr->smgr_rlocator.backend,
+                                        extend_by);
+
+   if (eb.relpersistence == RELPERSISTENCE_TEMP)
+       first_block = ExtendBufferedRelLocal(eb, fork, flags,
+                                            extend_by, extend_upto,
+                                            buffers, &extend_by);
+   else
+       first_block = ExtendBufferedRelShared(eb, fork, strategy, flags,
+                                             extend_by, extend_upto,
+                                             buffers, &extend_by);
+   *extended_by = extend_by;
+
+   TRACE_POSTGRESQL_BUFFER_EXTEND_DONE(fork,
+                                       eb.smgr->smgr_rlocator.locator.spcOid,
+                                       eb.smgr->smgr_rlocator.locator.dbOid,
+                                       eb.smgr->smgr_rlocator.locator.relNumber,
+                                       eb.smgr->smgr_rlocator.backend,
+                                       *extended_by,
+                                       first_block);
+
+   return first_block;
+}
+
+/*
+ * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
+ * shared buffers.
+ */
+static BlockNumber
+ExtendBufferedRelShared(ExtendBufferedWhat eb,
+                       ForkNumber fork,
+                       BufferAccessStrategy strategy,
+                       uint32 flags,
+                       uint32 extend_by,
+                       BlockNumber extend_upto,
+                       Buffer *buffers,
+                       uint32 *extended_by)
+{
+   BlockNumber first_block;
+   IOContext   io_context = IOContextForStrategy(strategy);
+
+   LimitAdditionalPins(&extend_by);
+
+   /*
+    * Acquire victim buffers for extension without holding extension lock.
+    * Writing out victim buffers is the most expensive part of extending the
+    * relation, particularly when doing so requires WAL flushes. Zeroing out
+    * the buffers is also quite expensive, so do that before holding the
+    * extension lock as well.
+    *
+    * These pages are pinned by us and not valid. While we hold the pin they
+    * can't be acquired as victim buffers by another backend.
+    */
+   for (uint32 i = 0; i < extend_by; i++)
+   {
+       Block       buf_block;
+
+       buffers[i] = GetVictimBuffer(strategy, io_context);
+       buf_block = BufHdrGetBlock(GetBufferDescriptor(buffers[i] - 1));
+
+       /* new buffers are zero-filled */
+       MemSet((char *) buf_block, 0, BLCKSZ);
+   }
+
+   /* in case we need to pin an existing buffer below */
+   ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+   /*
+    * Lock relation against concurrent extensions, unless requested not to.
+    *
+    * We use the same extension lock for all forks. That's unnecessarily
+    * restrictive, but currently extensions for forks don't happen often
+    * enough to make it worth locking more granularly.
+    *
+    * Note that another backend might have extended the relation by the time
+    * we get the lock.
+    */
+   if (!(flags & EB_SKIP_EXTENSION_LOCK))
+   {
+       LockRelationForExtension(eb.rel, ExclusiveLock);
+       if (eb.rel)
+           eb.smgr = RelationGetSmgr(eb.rel);
+   }
+
+   /*
+    * If requested, invalidate size cache, so that smgrnblocks asks the
+    * kernel.
+    */
+   if (flags & EB_CLEAR_SIZE_CACHE)
+       eb.smgr->smgr_cached_nblocks[fork] = InvalidBlockNumber;
+
+   first_block = smgrnblocks(eb.smgr, fork);
+
+   /*
+    * Now that we have the accurate relation size, check if the caller wants
+    * us to extend to only up to a specific size. If there were concurrent
+    * extensions, we might have acquired too many buffers and need to release
+    * them.
+    */
+   if (extend_upto != InvalidBlockNumber)
+   {
+       uint32      orig_extend_by = extend_by;
+
+       if (first_block > extend_upto)
+           extend_by = 0;
+       else if ((uint64) first_block + extend_by > extend_upto)
+           extend_by = extend_upto - first_block;
+
+       for (uint32 i = extend_by; i < orig_extend_by; i++)
+       {
+           BufferDesc *buf_hdr = GetBufferDescriptor(buffers[i] - 1);
+
+           /*
+            * The victim buffer we acquired peviously is clean and unused,
+            * let it be found again quickly
+            */
+           StrategyFreeBuffer(buf_hdr);
+           UnpinBuffer(buf_hdr);
+       }
+
+       if (extend_by == 0)
+       {
+           if (!(flags & EB_SKIP_EXTENSION_LOCK))
+               UnlockRelationForExtension(eb.rel, ExclusiveLock);
+           *extended_by = extend_by;
+           return first_block;
+       }
+   }
+
+   /* Fail if relation is already at maximum possible length */
+   if ((uint64) first_block + extend_by >= MaxBlockNumber)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                errmsg("cannot extend relation %s beyond %u blocks",
+                       relpath(eb.smgr->smgr_rlocator, fork),
+                       MaxBlockNumber)));
+
+   /*
+    * Insert buffers into buffer table, mark as IO_IN_PROGRESS.
+    *
+    * This needs to happen before we extend the relation, because as soon as
+    * we do, other backends can start to read in those pages.
+    */
+   for (int i = 0; i < extend_by; i++)
+   {
+       Buffer      victim_buf = buffers[i];
+       BufferDesc *victim_buf_hdr = GetBufferDescriptor(victim_buf - 1);
+       BufferTag   tag;
+       uint32      hash;
+       LWLock     *partition_lock;
+       int         existing_id;
+
+       InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
+       hash = BufTableHashCode(&tag);
+       partition_lock = BufMappingPartitionLock(hash);
+
+       LWLockAcquire(partition_lock, LW_EXCLUSIVE);
+
+       existing_id = BufTableInsert(&tag, hash, victim_buf_hdr->buf_id);
+
+       /*
+        * We get here only in the corner case where we are trying to extend
+        * the relation but we found a pre-existing buffer. This can happen
+        * because a prior attempt at extending the relation failed, and
+        * because mdread doesn't complain about reads beyond EOF (when
+        * zero_damaged_pages is ON) and so a previous attempt to read a block
+        * beyond EOF could have left a "valid" zero-filled buffer.
+        * Unfortunately, we have also seen this case occurring because of
+        * buggy Linux kernels that sometimes return an lseek(SEEK_END) result
+        * that doesn't account for a recent write. In that situation, the
+        * pre-existing buffer would contain valid data that we don't want to
+        * overwrite.  Since the legitimate cases should always have left a
+        * zero-filled buffer, complain if not PageIsNew.
+        */
+       if (existing_id >= 0)
+       {
+           BufferDesc *existing_hdr = GetBufferDescriptor(existing_id);
+           Block       buf_block;
+           bool        valid;
+
+           /*
+            * Pin the existing buffer before releasing the partition lock,
+            * preventing it from being evicted.
+            */
+           valid = PinBuffer(existing_hdr, strategy);
+
+           LWLockRelease(partition_lock);
+
+           /*
+            * The victim buffer we acquired peviously is clean and unused,
+            * let it be found again quickly
+            */
+           StrategyFreeBuffer(victim_buf_hdr);
+           UnpinBuffer(victim_buf_hdr);
+
+           buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
+           buf_block = BufHdrGetBlock(existing_hdr);
+
+           if (valid && !PageIsNew((Page) buf_block))
+               ereport(ERROR,
+                       (errmsg("unexpected data beyond EOF in block %u of relation %s",
+                               existing_hdr->tag.blockNum, relpath(eb.smgr->smgr_rlocator, fork)),
+                        errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
+
+           /*
+            * We *must* do smgr[zero]extend before succeeding, else the page
+            * will not be reserved by the kernel, and the next P_NEW call
+            * will decide to return the same page.  Clear the BM_VALID bit,
+            * do StartBufferIO() and proceed.
+            *
+            * Loop to handle the very small possibility that someone re-sets
+            * BM_VALID between our clearing it and StartBufferIO inspecting
+            * it.
+            */
+           do
+           {
+               uint32      buf_state = LockBufHdr(existing_hdr);
+
+               buf_state &= ~BM_VALID;
+               UnlockBufHdr(existing_hdr, buf_state);
+           } while (!StartBufferIO(existing_hdr, true));
+       }
+       else
+       {
+           uint32      buf_state;
+
+           buf_state = LockBufHdr(victim_buf_hdr);
+
+           /* some sanity checks while we hold the buffer header lock */
+           Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
+           Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
+
+           victim_buf_hdr->tag = tag;
+
+           buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
+           if (eb.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM)
+               buf_state |= BM_PERMANENT;
+
+           UnlockBufHdr(victim_buf_hdr, buf_state);
+
+           LWLockRelease(partition_lock);
+
+           /* XXX: could combine the locked operations in it with the above */
+           StartBufferIO(victim_buf_hdr, true);
+       }
+   }
+
+   /*
+    * Note: if smgzerorextend fails, we will end up with buffers that are
+    * allocated but not marked BM_VALID.  The next relation extension will
+    * still select the same block number (because the relation didn't get any
+    * longer on disk) and so future attempts to extend the relation will find
+    * the same buffers (if they have not been recycled) but come right back
+    * here to try smgrzeroextend again.
+    *
+    * We don't need to set checksum for all-zero pages.
+    */
+   smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
+
+   /*
+    * Release the file-extension lock; it's now OK for someone else to extend
+    * the relation some more.
+    *
+    * We remove IO_IN_PROGRESS after this, as waking up waiting backends can
+    * take noticeable time.
+    */
+   if (!(flags & EB_SKIP_EXTENSION_LOCK))
+       UnlockRelationForExtension(eb.rel, ExclusiveLock);
+
+   /* Set BM_VALID, terminate IO, and wake up any waiters */
+   for (int i = 0; i < extend_by; i++)
+   {
+       Buffer      buf = buffers[i];
+       BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
+       bool        lock = false;
+
+       if (flags & EB_LOCK_FIRST && i == 0)
+           lock = true;
+       else if (flags & EB_LOCK_TARGET)
+       {
+           Assert(extend_upto != InvalidBlockNumber);
+           if (first_block + i + 1 == extend_upto)
+               lock = true;
+       }
+
+       if (lock)
+           LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
+
+       TerminateBufferIO(buf_hdr, false, BM_VALID);
+   }
+
+   pgBufferUsage.shared_blks_written += extend_by;
+   pgstat_count_io_op_n(IOOBJECT_RELATION, io_context, IOOP_EXTEND,
+                        extend_by);
+
+   *extended_by = extend_by;
+
+   return first_block;
+}
+
 /*
  * MarkBufferDirty
  *