*/
 #include "postgres.h"
 
+#include "port/atomics.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
+#include "storage/proc.h"
+
+#define INT_ACCESS_ONCE(var)   ((int)(*((volatile int *)&(var))))
 
 
 /*
    /* Spinlock: protects the values below */
    slock_t     buffer_strategy_lock;
 
-   /* Clock sweep hand: index of next buffer to consider grabbing */
-   int         nextVictimBuffer;
+   /*
+    * Clock sweep hand: index of next buffer to consider grabbing. Note that
+    * this isn't a concrete buffer - we only ever increase the value. So, to
+    * get an actual buffer, it needs to be used modulo NBuffers.
+    */
+   pg_atomic_uint32 nextVictimBuffer;
 
    int         firstFreeBuffer;    /* Head of list of unused buffers */
    int         lastFreeBuffer; /* Tail of list of unused buffers */
     * Statistics.  These counters should be wide enough that they can't
     * overflow during a single bgwriter cycle.
     */
-   uint32      completePasses; /* Complete cycles of the clock sweep */
-   uint32      numBufferAllocs;    /* Buffers allocated since last reset */
+   uint32       completePasses; /* Complete cycles of the clock sweep */
+   pg_atomic_uint32 numBufferAllocs;   /* Buffers allocated since last reset */
 
    /*
-    * Notification latch, or NULL if none.  See StrategyNotifyBgWriter.
+    * Bgworker process to be notified upon activity or -1 if none. See
+    * StrategyNotifyBgWriter.
     */
-   Latch      *bgwriterLatch;
+   int         bgwprocno;
 } BufferStrategyControl;
 
 /* Pointers to shared state */
 static void AddBufferToRing(BufferAccessStrategy strategy,
                volatile BufferDesc *buf);
 
+/*
+ * ClockSweepTick - Helper routine for StrategyGetBuffer()
+ *
+ * Move the clock hand one buffer ahead of its current position and return the
+ * id of the buffer now under the hand.
+ */
+static inline uint32
+ClockSweepTick(void)
+{
+   uint32 victim;
+
+   /*
+    * Atomically move hand ahead one buffer - if there's several processes
+    * doing this, this can lead to buffers being returned slightly out of
+    * apparent order.
+    */
+   victim =
+       pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1);
+
+   if (victim >= NBuffers)
+   {
+       uint32 originalVictim = victim;
+
+       /* always wrap what we look up in BufferDescriptors */
+       victim = victim % NBuffers;
+
+       /*
+        * If we're the one that just caused a wraparound, force
+        * completePasses to be incremented while holding the spinlock. We
+        * need the spinlock so StrategySyncStart() can return a consistent
+        * value consisting of nextVictimBuffer and completePasses.
+        */
+       if (victim == 0)
+       {
+           uint32 expected;
+           uint32 wrapped;
+           bool success = false;
+
+           expected = originalVictim + 1;
+
+           while (!success)
+           {
+               /*
+                * Acquire the spinlock while increasing completePasses. That
+                * allows other readers to read nextVictimBuffer and
+                * completePasses in a consistent manner which is required for
+                * StrategySyncStart().  In theory delaying the increment
+                * could lead to a overflow of nextVictimBuffers, but that's
+                * highly unlikely and wouldn't be particularly harmful.
+                */
+               SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
+
+               wrapped = expected % NBuffers;
+
+               success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer,
+                                                        &expected, wrapped);
+               if (success)
+                   StrategyControl->completePasses++;
+               SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+           }
+       }
+   }
+   return victim;
+}
 
 /*
  * StrategyGetBuffer
 StrategyGetBuffer(BufferAccessStrategy strategy)
 {
    volatile BufferDesc *buf;
-   Latch      *bgwriterLatch;
+   int         bgwprocno;
    int         trycounter;
 
    /*
            return buf;
    }
 
-   /* Nope, so lock the freelist */
-   SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
+   /*
+    * If asked, we need to waken the bgwriter. Since we don't want to rely on
+    * a spinlock for this we force a read from shared memory once, and then
+    * set the latch based on that value. We need to go through that length
+    * because otherwise bgprocno might be reset while/after we check because
+    * the compiler might just reread from memory.
+    *
+    * This can possibly set the latch of the wrong process if the bgwriter
+    * dies in the wrong moment. But since PGPROC->procLatch is never
+    * deallocated the worst consequence of that is that we set the latch of
+    * some arbitrary process.
+    */
+   bgwprocno = INT_ACCESS_ONCE(StrategyControl->bgwprocno);
+   if (bgwprocno != -1)
+   {
+       /* reset bgwprocno first, before setting the latch */
+       StrategyControl->bgwprocno = -1;
+       pg_write_barrier();
+
+       /*
+        * Not acquiring ProcArrayLock here which is slightly icky. It's
+        * actually fine because procLatch isn't ever freed, so we just can
+        * potentially set the wrong process' (or no process') latch.
+        */
+       SetLatch(&ProcGlobal->allProcs[bgwprocno].procLatch);
+   }
 
    /*
     * We count buffer allocation requests so that the bgwriter can estimate
     * the rate of buffer consumption.  Note that buffers recycled by a
     * strategy object are intentionally not counted here.
     */
-   StrategyControl->numBufferAllocs++;
+   pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1);
 
    /*
-    * If bgwriterLatch is set, we need to waken the bgwriter, but we should
-    * not do so while holding buffer_strategy_lock; so release and re-grab.
-    * This is annoyingly tedious, but it happens at most once per bgwriter
-    * cycle, so the performance hit is minimal.
+    * First check, without acquiring the lock, whether there's buffers in the
+    * freelist. Since we otherwise don't require the spinlock in every
+    * StrategyGetBuffer() invocation, it'd be sad to acquire it here -
+    * uselessly in most cases. That obviously leaves a race where a buffer is
+    * put on the freelist but we don't see the store yet - but that's pretty
+    * harmless, it'll just get used during the next buffer acquisition.
+    *
+    * If there's buffers on the freelist, acquire the spinlock to pop one
+    * buffer of the freelist. Then check whether that buffer is usable and
+    * repeat if not.
+    *
+    * Note that the freeNext fields are considered to be protected by the
+    * buffer_strategy_lock not the individual buffer spinlocks, so it's OK to
+    * manipulate them without holding the spinlock.
     */
-   bgwriterLatch = StrategyControl->bgwriterLatch;
-   if (bgwriterLatch)
+   if (StrategyControl->firstFreeBuffer >= 0)
    {
-       StrategyControl->bgwriterLatch = NULL;
-       SpinLockRelease(&StrategyControl->buffer_strategy_lock);
-       SetLatch(bgwriterLatch);
-       SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
-   }
+       while (true)
+       {
+           /* Acquire the spinlock to remove element from the freelist */
+           SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
 
-   /*
-    * Try to get a buffer from the freelist.  Note that the freeNext fields
-    * are considered to be protected by the buffer_strategy_lock not the
-    * individual buffer spinlocks, so it's OK to manipulate them without
-    * holding the spinlock.
-    */
-   while (StrategyControl->firstFreeBuffer >= 0)
-   {
-       buf = &BufferDescriptors[StrategyControl->firstFreeBuffer];
-       Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);
+           if (StrategyControl->firstFreeBuffer < 0)
+           {
+               SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+               break;
+           }
 
-       /* Unconditionally remove buffer from freelist */
-       StrategyControl->firstFreeBuffer = buf->freeNext;
-       buf->freeNext = FREENEXT_NOT_IN_LIST;
+           buf = &BufferDescriptors[StrategyControl->firstFreeBuffer];
+           Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);
 
-       /*
-        * Release the lock so someone else can access the freelist (or run
-        * the clocksweep) while we check out this buffer.
-        */
-       SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+           /* Unconditionally remove buffer from freelist */
+           StrategyControl->firstFreeBuffer = buf->freeNext;
+           buf->freeNext = FREENEXT_NOT_IN_LIST;
 
-       /*
-        * If the buffer is pinned or has a nonzero usage_count, we cannot use
-        * it; discard it and retry.  (This can only happen if VACUUM put a
-        * valid buffer in the freelist and then someone else used it before
-        * we got to it.  It's probably impossible altogether as of 8.3, but
-        * we'd better check anyway.)
-        */
-       LockBufHdr(buf);
-       if (buf->refcount == 0 && buf->usage_count == 0)
-       {
-           if (strategy != NULL)
-               AddBufferToRing(strategy, buf);
-           return buf;
-       }
-       UnlockBufHdr(buf);
+           /*
+            * Release the lock so someone else can access the freelist while
+            * we check out this buffer.
+            */
+           SpinLockRelease(&StrategyControl->buffer_strategy_lock);
 
-       /* Reacquire the lock and go around for another pass. */
-       SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
+           /*
+            * If the buffer is pinned or has a nonzero usage_count, we cannot
+            * use it; discard it and retry.  (This can only happen if VACUUM
+            * put a valid buffer in the freelist and then someone else used
+            * it before we got to it.  It's probably impossible altogether as
+            * of 8.3, but we'd better check anyway.)
+            */
+           LockBufHdr(buf);
+           if (buf->refcount == 0 && buf->usage_count == 0)
+           {
+               if (strategy != NULL)
+                   AddBufferToRing(strategy, buf);
+               return buf;
+           }
+           UnlockBufHdr(buf);
+
+       }
    }
 
    /* Nothing on the freelist, so run the "clock sweep" algorithm */
    trycounter = NBuffers;
    for (;;)
    {
-       buf = &BufferDescriptors[StrategyControl->nextVictimBuffer];
-
-       if (++StrategyControl->nextVictimBuffer >= NBuffers)
-       {
-           StrategyControl->nextVictimBuffer = 0;
-           StrategyControl->completePasses++;
-       }
 
-       /* Release the lock before manipulating the candidate buffer. */
-       SpinLockRelease(&StrategyControl->buffer_strategy_lock);
+       buf = &BufferDescriptors[ClockSweepTick()];
 
        /*
         * If the buffer is pinned or has a nonzero usage_count, we cannot use
            elog(ERROR, "no unpinned buffers available");
        }
        UnlockBufHdr(buf);
-
-       /* Reacquire the lock and get a new candidate buffer. */
-       SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
    }
 }
 
 int
 StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc)
 {
+   uint32      nextVictimBuffer;
    int         result;
 
    SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
-   result = StrategyControl->nextVictimBuffer;
+   nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer);
+   result = nextVictimBuffer % NBuffers;
+
    if (complete_passes)
+   {
        *complete_passes = StrategyControl->completePasses;
+       /*
+        * Additionally add the number of wraparounds that happened before
+        * completePasses could be incremented. C.f. ClockSweepTick().
+        */
+       *complete_passes += nextVictimBuffer / NBuffers;
+   }
+
    if (num_buf_alloc)
    {
-       *num_buf_alloc = StrategyControl->numBufferAllocs;
-       StrategyControl->numBufferAllocs = 0;
+       *num_buf_alloc = pg_atomic_exchange_u32(&StrategyControl->numBufferAllocs, 0);
    }
    SpinLockRelease(&StrategyControl->buffer_strategy_lock);
    return result;
  * from hibernation, and is not meant for anybody else to use.
  */
 void
-StrategyNotifyBgWriter(Latch *bgwriterLatch)
+StrategyNotifyBgWriter(int bgwprocno)
 {
    /*
     * We acquire buffer_strategy_lock just to ensure that the store appears
     * infrequently, so there's no performance penalty from being safe.
     */
    SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
-   StrategyControl->bgwriterLatch = bgwriterLatch;
+   StrategyControl->bgwprocno = bgwprocno;
    SpinLockRelease(&StrategyControl->buffer_strategy_lock);
 }
 
        StrategyControl->lastFreeBuffer = NBuffers - 1;
 
        /* Initialize the clock sweep pointer */
-       StrategyControl->nextVictimBuffer = 0;
+       pg_atomic_init_u32(&StrategyControl->nextVictimBuffer, 0);
 
        /* Clear statistics */
        StrategyControl->completePasses = 0;
-       StrategyControl->numBufferAllocs = 0;
+       pg_atomic_init_u32(&StrategyControl->numBufferAllocs, 0);
 
        /* No pending notification */
-       StrategyControl->bgwriterLatch = NULL;
+       StrategyControl->bgwprocno = -1;
    }
    else
        Assert(!init);