BlockNumber blockNum,
                               BufferAccessStrategy strategy,
                               bool *foundPtr, IOContext io_context);
+static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
                        IOObject io_object, IOContext io_context);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
            BufferAccessStrategy strategy,
            bool *foundPtr, IOContext io_context)
 {
-   bool        from_ring;
    BufferTag   newTag;         /* identity of requested block */
    uint32      newHash;        /* hash value for newTag */
    LWLock     *newPartitionLock;   /* buffer partition lock for it */
-   BufferTag   oldTag;         /* previous identity of selected buffer */
-   uint32      oldHash;        /* hash value for oldTag */
-   LWLock     *oldPartitionLock;   /* buffer partition lock for it */
-   uint32      oldFlags;
-   int         buf_id;
-   BufferDesc *buf;
-   bool        valid;
-   uint32      buf_state;
+   int         existing_buf_id;
+   Buffer      victim_buffer;
+   BufferDesc *victim_buf_hdr;
+   uint32      victim_buf_state;
 
    /* create a tag so we can lookup the buffer */
    InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
 
    /* see if the block is in the buffer pool already */
    LWLockAcquire(newPartitionLock, LW_SHARED);
-   buf_id = BufTableLookup(&newTag, newHash);
-   if (buf_id >= 0)
+   existing_buf_id = BufTableLookup(&newTag, newHash);
+   if (existing_buf_id >= 0)
    {
+       BufferDesc *buf;
+       bool        valid;
+
        /*
         * Found it.  Now, pin the buffer so no one can steal it from the
         * buffer pool, and check to see if the correct data has been loaded
         * into the buffer.
         */
-       buf = GetBufferDescriptor(buf_id);
+       buf = GetBufferDescriptor(existing_buf_id);
 
        valid = PinBuffer(buf, strategy);
 
     */
    LWLockRelease(newPartitionLock);
 
-   /* Loop here in case we have to try another victim buffer */
-   for (;;)
+   /*
+    * Acquire a victim buffer. Somebody else might try to do the same, we
+    * don't hold any conflicting locks. If so we'll have to undo our work
+    * later.
+    */
+   victim_buffer = GetVictimBuffer(strategy, io_context);
+   victim_buf_hdr = GetBufferDescriptor(victim_buffer - 1);
+
+   /*
+    * Try to make a hashtable entry for the buffer under its new tag. If
+    * somebody else inserted another buffer for the tag, we'll release the
+    * victim buffer we acquired and use the already inserted one.
+    */
+   LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
+   existing_buf_id = BufTableInsert(&newTag, newHash, victim_buf_hdr->buf_id);
+   if (existing_buf_id >= 0)
    {
-       /*
-        * Ensure, while the spinlock's not yet held, that there's a free
-        * refcount entry.
-        */
-       ReservePrivateRefCountEntry();
+       BufferDesc *existing_buf_hdr;
+       bool        valid;
 
        /*
-        * Select a victim buffer.  The buffer is returned with its header
-        * spinlock still held!
+        * Got a collision. Someone has already done what we were about to do.
+        * We'll just handle this as if it were found in the buffer pool in
+        * the first place.  First, give up the buffer we were planning to
+        * use.
+        *
+        * We could do this after releasing the partition lock, but then we'd
+        * have to call ResourceOwnerEnlargeBuffers() &
+        * ReservePrivateRefCountEntry() before acquiring the lock, for the
+        * rare case of such a collision.
         */
-       buf = StrategyGetBuffer(strategy, &buf_state, &from_ring);
-
-       Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
-
-       /* Must copy buffer flags while we still hold the spinlock */
-       oldFlags = buf_state & BUF_FLAG_MASK;
-
-       /* Pin the buffer and then release the buffer spinlock */
-       PinBuffer_Locked(buf);
+       UnpinBuffer(victim_buf_hdr);
 
        /*
-        * If the buffer was dirty, try to write it out.  There is a race
-        * condition here, in that someone might dirty it after we released it
-        * above, or even while we are writing it out (since our share-lock
-        * won't prevent hint-bit updates).  We will recheck the dirty bit
-        * after re-locking the buffer header.
+        * The victim buffer we acquired peviously is clean and unused,
+        * let it be found again quickly
         */
-       if (oldFlags & BM_DIRTY)
-       {
-           /*
-            * We need a share-lock on the buffer contents to write it out
-            * (else we might write invalid data, eg because someone else is
-            * compacting the page contents while we write).  We must use a
-            * conditional lock acquisition here to avoid deadlock.  Even
-            * though the buffer was not pinned (and therefore surely not
-            * locked) when StrategyGetBuffer returned it, someone else could
-            * have pinned and exclusive-locked it by the time we get here. If
-            * we try to get the lock unconditionally, we'd block waiting for
-            * them; if they later block waiting for us, deadlock ensues.
-            * (This has been observed to happen when two backends are both
-            * trying to split btree index pages, and the second one just
-            * happens to be trying to split the page the first one got from
-            * StrategyGetBuffer.)
-            */
-           if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
-                                        LW_SHARED))
-           {
-               /*
-                * If using a nondefault strategy, and writing the buffer
-                * would require a WAL flush, let the strategy decide whether
-                * to go ahead and write/reuse the buffer or to choose another
-                * victim.  We need lock to inspect the page LSN, so this
-                * can't be done inside StrategyGetBuffer.
-                */
-               if (strategy != NULL)
-               {
-                   XLogRecPtr  lsn;
-
-                   /* Read the LSN while holding buffer header lock */
-                   buf_state = LockBufHdr(buf);
-                   lsn = BufferGetLSN(buf);
-                   UnlockBufHdr(buf, buf_state);
-
-                   if (XLogNeedsFlush(lsn) &&
-                       StrategyRejectBuffer(strategy, buf, from_ring))
-                   {
-                       /* Drop lock/pin and loop around for another buffer */
-                       LWLockRelease(BufferDescriptorGetContentLock(buf));
-                       UnpinBuffer(buf);
-                       continue;
-                   }
-               }
+       StrategyFreeBuffer(victim_buf_hdr);
 
-               /* OK, do the I/O */
-               FlushBuffer(buf, NULL, IOOBJECT_RELATION, io_context);
-               LWLockRelease(BufferDescriptorGetContentLock(buf));
+       /* remaining code should match code at top of routine */
 
-               ScheduleBufferTagForWriteback(&BackendWritebackContext,
-                                             &buf->tag);
-           }
-           else
-           {
-               /*
-                * Someone else has locked the buffer, so give it up and loop
-                * back to get another one.
-                */
-               UnpinBuffer(buf);
-               continue;
-           }
-       }
+       existing_buf_hdr = GetBufferDescriptor(existing_buf_id);
 
-       /*
-        * To change the association of a valid buffer, we'll need to have
-        * exclusive lock on both the old and new mapping partitions.
-        */
-       if (oldFlags & BM_TAG_VALID)
-       {
-           /*
-            * Need to compute the old tag's hashcode and partition lock ID.
-            * XXX is it worth storing the hashcode in BufferDesc so we need
-            * not recompute it here?  Probably not.
-            */
-           oldTag = buf->tag;
-           oldHash = BufTableHashCode(&oldTag);
-           oldPartitionLock = BufMappingPartitionLock(oldHash);
+       valid = PinBuffer(existing_buf_hdr, strategy);
 
-           /*
-            * Must lock the lower-numbered partition first to avoid
-            * deadlocks.
-            */
-           if (oldPartitionLock < newPartitionLock)
-           {
-               LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
-               LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-           }
-           else if (oldPartitionLock > newPartitionLock)
-           {
-               LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-               LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
-           }
-           else
-           {
-               /* only one partition, only one lock */
-               LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-           }
-       }
-       else
-       {
-           /* if it wasn't valid, we need only the new partition */
-           LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-           /* remember we have no old-partition lock or tag */
-           oldPartitionLock = NULL;
-           /* keep the compiler quiet about uninitialized variables */
-           oldHash = 0;
-       }
+       /* Can release the mapping lock as soon as we've pinned it */
+       LWLockRelease(newPartitionLock);
 
-       /*
-        * Try to make a hashtable entry for the buffer under its new tag.
-        * This could fail because while we were writing someone else
-        * allocated another buffer for the same block we want to read in.
-        * Note that we have not yet removed the hashtable entry for the old
-        * tag.
-        */
-       buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
+       *foundPtr = true;
 
-       if (buf_id >= 0)
+       if (!valid)
        {
            /*
-            * Got a collision. Someone has already done what we were about to
-            * do. We'll just handle this as if it were found in the buffer
-            * pool in the first place.  First, give up the buffer we were
-            * planning to use.
+            * We can only get here if (a) someone else is still reading in
+            * the page, or (b) a previous read attempt failed.  We have to
+            * wait for any active read attempt to finish, and then set up our
+            * own read attempt if the page is still not BM_VALID.
+            * StartBufferIO does it all.
             */
-           UnpinBuffer(buf);
-
-           /* Can give up that buffer's mapping partition lock now */
-           if (oldPartitionLock != NULL &&
-               oldPartitionLock != newPartitionLock)
-               LWLockRelease(oldPartitionLock);
-
-           /* remaining code should match code at top of routine */
-
-           buf = GetBufferDescriptor(buf_id);
-
-           valid = PinBuffer(buf, strategy);
-
-           /* Can release the mapping lock as soon as we've pinned it */
-           LWLockRelease(newPartitionLock);
-
-           *foundPtr = true;
-
-           if (!valid)
+           if (StartBufferIO(existing_buf_hdr, true))
            {
                /*
-                * We can only get here if (a) someone else is still reading
-                * in the page, or (b) a previous read attempt failed.  We
-                * have to wait for any active read attempt to finish, and
-                * then set up our own read attempt if the page is still not
-                * BM_VALID.  StartBufferIO does it all.
+                * If we get here, previous attempts to read the buffer must
+                * have failed ... but we shall bravely try again.
                 */
-               if (StartBufferIO(buf, true))
-               {
-                   /*
-                    * If we get here, previous attempts to read the buffer
-                    * must have failed ... but we shall bravely try again.
-                    */
-                   *foundPtr = false;
-               }
+               *foundPtr = false;
            }
-
-           return buf;
        }
 
-       /*
-        * Need to lock the buffer header too in order to change its tag.
-        */
-       buf_state = LockBufHdr(buf);
+       return existing_buf_hdr;
+   }
 
-       /*
-        * Somebody could have pinned or re-dirtied the buffer while we were
-        * doing the I/O and making the new hashtable entry.  If so, we can't
-        * recycle this buffer; we must undo everything we've done and start
-        * over with a new victim buffer.
-        */
-       oldFlags = buf_state & BUF_FLAG_MASK;
-       if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
-           break;
+   /*
+    * Need to lock the buffer header too in order to change its tag.
+    */
+   victim_buf_state = LockBufHdr(victim_buf_hdr);
 
-       UnlockBufHdr(buf, buf_state);
-       BufTableDelete(&newTag, newHash);
-       if (oldPartitionLock != NULL &&
-           oldPartitionLock != newPartitionLock)
-           LWLockRelease(oldPartitionLock);
-       LWLockRelease(newPartitionLock);
-       UnpinBuffer(buf);
-   }
+   /* some sanity checks while we hold the buffer header lock */
+   Assert(BUF_STATE_GET_REFCOUNT(victim_buf_state) == 1);
+   Assert(!(victim_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY | BM_IO_IN_PROGRESS)));
+
+   victim_buf_hdr->tag = newTag;
 
    /*
-    * Okay, it's finally safe to rename the buffer.
-    *
-    * Clearing BM_VALID here is necessary, clearing the dirtybits is just
-    * paranoia.  We also reset the usage_count since any recency of use of
-    * the old content is no longer relevant.  (The usage_count starts out at
-    * 1 so that the buffer can survive one clock-sweep pass.)
-    *
     * Make sure BM_PERMANENT is set for buffers that must be written at every
     * checkpoint.  Unlogged buffers only need to be written at shutdown
     * checkpoints, except for their "init" forks, which need to be treated
     * just like permanent relations.
     */
-   buf->tag = newTag;
-   buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
-                  BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
-                  BUF_USAGECOUNT_MASK);
+   victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
    if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
-       buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
-   else
-       buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
+       victim_buf_state |= BM_PERMANENT;
 
-   UnlockBufHdr(buf, buf_state);
-
-   if (oldPartitionLock != NULL)
-   {
-       BufTableDelete(&oldTag, oldHash);
-       if (oldPartitionLock != newPartitionLock)
-           LWLockRelease(oldPartitionLock);
-   }
+   UnlockBufHdr(victim_buf_hdr, victim_buf_state);
 
    LWLockRelease(newPartitionLock);
 
-   if (oldFlags & BM_VALID)
-   {
-       /*
-        * When a BufferAccessStrategy is in use, blocks evicted from shared
-        * buffers are counted as IOOP_EVICT in the corresponding context
-        * (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted by a
-        * strategy in two cases: 1) while initially claiming buffers for the
-        * strategy ring 2) to replace an existing strategy ring buffer
-        * because it is pinned or in use and cannot be reused.
-        *
-        * Blocks evicted from buffers already in the strategy ring are
-        * counted as IOOP_REUSE in the corresponding strategy context.
-        *
-        * At this point, we can accurately count evictions and reuses,
-        * because we have successfully claimed the valid buffer. Previously,
-        * we may have been forced to release the buffer due to concurrent
-        * pinners or erroring out.
-        */
-       pgstat_count_io_op(IOOBJECT_RELATION, io_context,
-                          from_ring ? IOOP_REUSE : IOOP_EVICT);
-   }
-
    /*
     * Buffer contents are currently invalid.  Try to obtain the right to
     * start I/O.  If StartBufferIO returns false, then someone else managed
     * to read it before we did, so there's nothing left for BufferAlloc() to
     * do.
     */
-   if (StartBufferIO(buf, true))
+   if (StartBufferIO(victim_buf_hdr, true))
        *foundPtr = false;
    else
        *foundPtr = true;
 
-   return buf;
+   return victim_buf_hdr;
 }
 
 /*
    StrategyFreeBuffer(buf);
 }
 
+/*
+ * Helper routine for GetVictimBuffer()
+ *
+ * Needs to be called on a buffer with a valid tag, pinned, but without the
+ * buffer header spinlock held.
+ *
+ * Returns true if the buffer can be reused, in which case the buffer is only
+ * pinned by this backend and marked as invalid, false otherwise.
+ */
+static bool
+InvalidateVictimBuffer(BufferDesc *buf_hdr)
+{
+   uint32      buf_state;
+   uint32      hash;
+   LWLock     *partition_lock;
+   BufferTag   tag;
+
+   Assert(GetPrivateRefCount(BufferDescriptorGetBuffer(buf_hdr)) == 1);
+
+   /* have buffer pinned, so it's safe to read tag without lock */
+   tag = buf_hdr->tag;
+
+   hash = BufTableHashCode(&tag);
+   partition_lock = BufMappingPartitionLock(hash);
+
+   LWLockAcquire(partition_lock, LW_EXCLUSIVE);
+
+   /* lock the buffer header */
+   buf_state = LockBufHdr(buf_hdr);
+
+   /*
+    * We have the buffer pinned nobody else should have been able to unset
+    * this concurrently.
+    */
+   Assert(buf_state & BM_TAG_VALID);
+   Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+   Assert(BufferTagsEqual(&buf_hdr->tag, &tag));
+
+   /*
+    * If somebody else pinned the buffer since, or even worse, dirtied it,
+    * give up on this buffer: It's clearly in use.
+    */
+   if (BUF_STATE_GET_REFCOUNT(buf_state) != 1 || (buf_state & BM_DIRTY))
+   {
+       Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+
+       UnlockBufHdr(buf_hdr, buf_state);
+       LWLockRelease(partition_lock);
+
+       return false;
+   }
+
+   /*
+    * Clear out the buffer's tag and flags and usagecount.  This is not
+    * strictly required, as BM_TAG_VALID/BM_VALID needs to be checked before
+    * doing anything with the buffer. But currently it's beneficial, as the
+    * cheaper pre-check for several linear scans of shared buffers use the
+    * tag (see e.g. FlushDatabaseBuffers()).
+    */
+   ClearBufferTag(&buf_hdr->tag);
+   buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
+   UnlockBufHdr(buf_hdr, buf_state);
+
+   Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+
+   /* finally delete buffer from the buffer mapping table */
+   BufTableDelete(&tag, hash);
+
+   LWLockRelease(partition_lock);
+
+   Assert(!(buf_state & (BM_DIRTY | BM_VALID | BM_TAG_VALID)));
+   Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+   Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u32(&buf_hdr->state)) > 0);
+
+   return true;
+}
+
+static Buffer
+GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
+{
+   BufferDesc *buf_hdr;
+   Buffer      buf;
+   uint32      buf_state;
+   bool        from_ring;
+
+   /*
+    * Ensure, while the spinlock's not yet held, that there's a free refcount
+    * entry.
+    */
+   ReservePrivateRefCountEntry();
+   ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+   /* we return here if a prospective victim buffer gets used concurrently */
+again:
+
+   /*
+    * Select a victim buffer.  The buffer is returned with its header
+    * spinlock still held!
+    */
+   buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
+   buf = BufferDescriptorGetBuffer(buf_hdr);
+
+   Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
+
+   /* Pin the buffer and then release the buffer spinlock */
+   PinBuffer_Locked(buf_hdr);
+
+   /*
+    * We shouldn't have any other pins for this buffer.
+    */
+   CheckBufferIsPinnedOnce(buf);
+
+   /*
+    * If the buffer was dirty, try to write it out.  There is a race
+    * condition here, in that someone might dirty it after we released the
+    * buffer header lock above, or even while we are writing it out (since
+    * our share-lock won't prevent hint-bit updates).  We will recheck the
+    * dirty bit after re-locking the buffer header.
+    */
+   if (buf_state & BM_DIRTY)
+   {
+       LWLock     *content_lock;
+
+       Assert(buf_state & BM_TAG_VALID);
+       Assert(buf_state & BM_VALID);
+
+       /*
+        * We need a share-lock on the buffer contents to write it out (else
+        * we might write invalid data, eg because someone else is compacting
+        * the page contents while we write).  We must use a conditional lock
+        * acquisition here to avoid deadlock.  Even though the buffer was not
+        * pinned (and therefore surely not locked) when StrategyGetBuffer
+        * returned it, someone else could have pinned and exclusive-locked it
+        * by the time we get here. If we try to get the lock unconditionally,
+        * we'd block waiting for them; if they later block waiting for us,
+        * deadlock ensues. (This has been observed to happen when two
+        * backends are both trying to split btree index pages, and the second
+        * one just happens to be trying to split the page the first one got
+        * from StrategyGetBuffer.)
+        */
+       content_lock = BufferDescriptorGetContentLock(buf_hdr);
+       if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+       {
+           /*
+            * Someone else has locked the buffer, so give it up and loop back
+            * to get another one.
+            */
+           UnpinBuffer(buf_hdr);
+           goto again;
+       }
+
+       /*
+        * If using a nondefault strategy, and writing the buffer would
+        * require a WAL flush, let the strategy decide whether to go ahead
+        * and write/reuse the buffer or to choose another victim.  We need a
+        * lock to inspect the page LSN, so this can't be done inside
+        * StrategyGetBuffer.
+        */
+       if (strategy != NULL)
+       {
+           XLogRecPtr  lsn;
+
+           /* Read the LSN while holding buffer header lock */
+           buf_state = LockBufHdr(buf_hdr);
+           lsn = BufferGetLSN(buf_hdr);
+           UnlockBufHdr(buf_hdr, buf_state);
+
+           if (XLogNeedsFlush(lsn)
+               && StrategyRejectBuffer(strategy, buf_hdr, from_ring))
+           {
+               LWLockRelease(content_lock);
+               UnpinBuffer(buf_hdr);
+               goto again;
+           }
+       }
+
+       /* OK, do the I/O */
+       FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
+       LWLockRelease(content_lock);
+
+       ScheduleBufferTagForWriteback(&BackendWritebackContext,
+                                     &buf_hdr->tag);
+   }
+
+
+   if (buf_state & BM_VALID)
+   {
+       /*
+        * When a BufferAccessStrategy is in use, blocks evicted from shared
+        * buffers are counted as IOOP_EVICT in the corresponding context
+        * (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted by a
+        * strategy in two cases: 1) while initially claiming buffers for the
+        * strategy ring 2) to replace an existing strategy ring buffer
+        * because it is pinned or in use and cannot be reused.
+        *
+        * Blocks evicted from buffers already in the strategy ring are
+        * counted as IOOP_REUSE in the corresponding strategy context.
+        *
+        * At this point, we can accurately count evictions and reuses,
+        * because we have successfully claimed the valid buffer. Previously,
+        * we may have been forced to release the buffer due to concurrent
+        * pinners or erroring out.
+        */
+       pgstat_count_io_op(IOOBJECT_RELATION, io_context,
+                          from_ring ? IOOP_REUSE : IOOP_EVICT);
+   }
+
+   /*
+    * If the buffer has an entry in the buffer mapping table, delete it. This
+    * can fail because another backend could have pinned or dirtied the
+    * buffer.
+    */
+   if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
+   {
+       UnpinBuffer(buf_hdr);
+       goto again;
+   }
+
+   /* a final set of sanity checks */
+#ifdef USE_ASSERT_CHECKING
+   buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+   Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
+   Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
+
+   CheckBufferIsPinnedOnce(buf);
+#endif
+
+   return buf;
+}
+
 /*
  * MarkBufferDirty
  *
 
 Block     *LocalBufferBlockPointers = NULL;
 int32     *LocalRefCount = NULL;
 
-static int nextFreeLocalBuf = 0;
+static int nextFreeLocalBufId = 0;
 
 static HTAB *LocalBufHash = NULL;
 
 
 static void InitLocalBuffers(void);
 static Block GetLocalBufferStorage(void);
+static Buffer GetLocalVictimBuffer(void);
 
 
 /*
    BufferTag   newTag;         /* identity of requested block */
    LocalBufferLookupEnt *hresult;
    BufferDesc *bufHdr;
-   int         b;
-   int         trycounter;
+   Buffer      victim_buffer;
+   int         bufid;
    bool        found;
-   uint32      buf_state;
 
    InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
 
 
    if (hresult)
    {
-       b = hresult->id;
-       bufHdr = GetLocalBufferDescriptor(b);
+       bufid = hresult->id;
+       bufHdr = GetLocalBufferDescriptor(bufid);
        Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
-#ifdef LBDEBUG
-       fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
-               smgr->smgr_rlocator.locator.relNumber, forkNum, blockNum, -b - 1);
-#endif
 
        *foundPtr = PinLocalBuffer(bufHdr, true);
-       return bufHdr;
    }
+   else
+   {
+       uint32      buf_state;
 
-#ifdef LBDEBUG
-   fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
-           smgr->smgr_rlocator.locator.relNumber, forkNum, blockNum,
-           -nextFreeLocalBuf - 1);
-#endif
+       victim_buffer = GetLocalVictimBuffer();
+       bufid = -victim_buffer - 1;
+       bufHdr = GetLocalBufferDescriptor(bufid);
+
+       hresult = (LocalBufferLookupEnt *)
+           hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
+       if (found)              /* shouldn't happen */
+           elog(ERROR, "local buffer hash table corrupted");
+       hresult->id = bufid;
+
+       /*
+        * it's all ours now.
+        */
+       bufHdr->tag = newTag;
+
+       buf_state = pg_atomic_read_u32(&bufHdr->state);
+       buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
+       buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
+       pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+
+       *foundPtr = false;
+   }
+
+   return bufHdr;
+}
+
+static Buffer
+GetLocalVictimBuffer(void)
+{
+   int         victim_bufid;
+   int         trycounter;
+   uint32      buf_state;
+   BufferDesc *bufHdr;
+
+   ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
 
    /*
     * Need to get a new buffer.  We use a clock sweep algorithm (essentially
    trycounter = NLocBuffer;
    for (;;)
    {
-       b = nextFreeLocalBuf;
+       victim_bufid = nextFreeLocalBufId;
 
-       if (++nextFreeLocalBuf >= NLocBuffer)
-           nextFreeLocalBuf = 0;
+       if (++nextFreeLocalBufId >= NLocBuffer)
+           nextFreeLocalBufId = 0;
 
-       bufHdr = GetLocalBufferDescriptor(b);
+       bufHdr = GetLocalBufferDescriptor(victim_bufid);
 
-       if (LocalRefCount[b] == 0)
+       if (LocalRefCount[victim_bufid] == 0)
        {
            buf_state = pg_atomic_read_u32(&bufHdr->state);
 
                     errmsg("no empty local buffer available")));
    }
 
+   /*
+    * lazy memory allocation: allocate space on first use of a buffer.
+    */
+   if (LocalBufHdrGetBlock(bufHdr) == NULL)
+   {
+       /* Set pointer for use by BufferGetBlock() macro */
+       LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
+   }
+
    /*
     * this buffer is not referenced but it might still be dirty. if that's
     * the case, write it out before reusing it!
    }
 
    /*
-    * lazy memory allocation: allocate space on first use of a buffer.
-    */
-   if (LocalBufHdrGetBlock(bufHdr) == NULL)
-   {
-       /* Set pointer for use by BufferGetBlock() macro */
-       LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
-   }
-
-   /*
-    * Update the hash table: remove old entry, if any, and make new one.
+    * Remove the victim buffer from the hashtable and mark as invalid.
     */
    if (buf_state & BM_TAG_VALID)
    {
+       LocalBufferLookupEnt *hresult;
+
        hresult = (LocalBufferLookupEnt *)
            hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
        if (!hresult)           /* shouldn't happen */
            elog(ERROR, "local buffer hash table corrupted");
        /* mark buffer invalid just in case hash insert fails */
        ClearBufferTag(&bufHdr->tag);
-       buf_state &= ~(BM_VALID | BM_TAG_VALID);
+       buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
        pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
        pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
    }
 
-   hresult = (LocalBufferLookupEnt *)
-       hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
-   if (found)                  /* shouldn't happen */
-       elog(ERROR, "local buffer hash table corrupted");
-   hresult->id = b;
-
-   /*
-    * it's all ours now.
-    */
-   bufHdr->tag = newTag;
-   buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
-   buf_state |= BM_TAG_VALID;
-   buf_state &= ~BUF_USAGECOUNT_MASK;
-   buf_state += BUF_USAGECOUNT_ONE;
-   pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-
-   *foundPtr = false;
-   return bufHdr;
+   return BufferDescriptorGetBuffer(bufHdr);
 }
 
 /*
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory")));
 
-   nextFreeLocalBuf = 0;
+   nextFreeLocalBufId = 0;
 
    /* initialize fields that need to start off nonzero */
    for (i = 0; i < nbufs; i++)