#include "catalog/catalog.h"
 #include "catalog/storage.h"
 #include "executor/instrument.h"
+#include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
 /* 64 bytes, about the size of a cache line on common systems */
 #define REFCOUNT_ARRAY_ENTRIES 8
 
+/*
+ * Status of buffers to checkpoint for a particular tablespace, used
+ * internally in BufferSync.
+ */
+typedef struct CkptTsStatus
+{
+       /* oid of the tablespace */
+       Oid                     tsId;
+
+       /*
+        * Checkpoint progress for this tablespace. To make progress comparable
+        * between tablespaces the progress is, for each tablespace, measured as a
+        * number between 0 and the total number of to-be-checkpointed pages. Each
+        * page checkpointed in this tablespace increments this space's progress
+        * by progress_slice.
+        */
+       float8          progress;
+       float8          progress_slice;
+
+       /* number of to-be checkpointed pages in this tablespace */
+       int                     num_to_scan;
+       /* already processed pages in this tablespace */
+       int                     num_scanned;
+
+       /* current offset in CkptBufferIds for this tablespace */
+       int                     index;
+} CkptTsStatus;
+
 /* GUC variables */
 bool           zero_damaged_pages = false;
 int                    bgwriter_lru_maxpages = 100;
 static void CheckForBufferLeaks(void);
 static int     rnode_comparator(const void *p1, const void *p2);
 static int     buffertag_comparator(const void *p1, const void *p2);
+static int     ckpt_buforder_comparator(const void *pa, const void *pb);
+static int     ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
 
 
 /*
 {
        int                     buf_id;
        int                     num_to_scan;
-       int                     num_to_write;
+       int                     num_spaces;
+       int                     num_processed;
        int                     num_written;
+       CkptTsStatus *per_ts_stat = NULL;
+       Oid                     last_tsid;
+       binaryheap *ts_heap;
+       int                     i;
        int                     mask = BM_DIRTY;
        WritebackContext wb_context;
 
 
        /*
         * Loop over all buffers, and mark the ones that need to be written with
-        * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
+        * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_scan), so that we
         * can estimate how much work needs to be done.
         *
         * This allows us to write only those pages that were dirty when the
         * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
         * certainly need to be written for the next checkpoint attempt, too.
         */
-       num_to_write = 0;
+       num_to_scan = 0;
        for (buf_id = 0; buf_id < NBuffers; buf_id++)
        {
                BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 
                if ((bufHdr->flags & mask) == mask)
                {
+                       CkptSortItem *item;
+
                        bufHdr->flags |= BM_CHECKPOINT_NEEDED;
-                       num_to_write++;
+
+                       item = &CkptBufferIds[num_to_scan++];
+                       item->buf_id = buf_id;
+                       item->tsId = bufHdr->tag.rnode.spcNode;
+                       item->relNode = bufHdr->tag.rnode.relNode;
+                       item->forkNum = bufHdr->tag.forkNum;
+                       item->blockNum = bufHdr->tag.blockNum;
                }
 
                UnlockBufHdr(bufHdr);
        }
 
-       if (num_to_write == 0)
+       if (num_to_scan == 0)
                return;                                 /* nothing to do */
 
        WritebackContextInit(&wb_context, &checkpoint_flush_after);
 
-       TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
+       TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
        /*
-        * Loop over all buffers again, and write the ones (still) marked with
-        * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
-        * since we might as well dump soon-to-be-recycled buffers first.
-        *
-        * Note that we don't read the buffer alloc count here --- that should be
-        * left untouched till the next BgBufferSync() call.
+        * Sort buffers that need to be written to reduce the likelihood of random
+        * IO. The sorting is also important for the implementation of balancing
+        * writes between tablespaces. Without balancing writes we'd potentially
+        * end up writing to the tablespaces one-by-one; possibly overloading the
+        * underlying system.
         */
-       buf_id = StrategySyncStart(NULL, NULL);
-       num_to_scan = NBuffers;
+       qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
+                 ckpt_buforder_comparator);
+
+       num_spaces = 0;
+
+       /*
+        * Allocate progress status for each tablespace with buffers that need to
+        * be flushed. This requires the to-be-flushed array to be sorted.
+        */
+       last_tsid = InvalidOid;
+       for (i = 0; i < num_to_scan; i++)
+       {
+               CkptTsStatus *s;
+               Oid                     cur_tsid;
+
+               cur_tsid = CkptBufferIds[i].tsId;
+
+               /*
+                * Grow array of per-tablespace status structs, every time a new
+                * tablespace is found.
+                */
+               if (last_tsid == InvalidOid || last_tsid != cur_tsid)
+               {
+                       Size            sz;
+
+                       num_spaces++;
+
+                       /*
+                        * Not worth adding grow-by-power-of-2 logic here - even with a
+                        * few hundred tablespaces this should be fine.
+                        */
+                       sz = sizeof(CkptTsStatus) * num_spaces;
+
+                       if (per_ts_stat == NULL)
+                               per_ts_stat = (CkptTsStatus *) palloc(sz);
+                       else
+                               per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
+
+                       s = &per_ts_stat[num_spaces - 1];
+                       memset(s, 0, sizeof(*s));
+                       s->tsId = cur_tsid;
+
+                       /*
+                        * The first buffer in this tablespace. As CkptBufferIds is sorted
+                        * by tablespace all (s->num_to_scan) buffers in this tablespace
+                        * will follow afterwards.
+                        */
+                       s->index = i;
+
+                       /*
+                        * progress_slice will be determined once we know how many buffers
+                        * are in each tablespace, i.e. after this loop.
+                        */
+
+                       last_tsid = cur_tsid;
+               }
+               else
+               {
+                       s = &per_ts_stat[num_spaces - 1];
+               }
+
+               s->num_to_scan++;
+       }
+
+       Assert(num_spaces > 0);
+
+       /*
+        * Build a min-heap over the write-progress in the individual tablespaces,
+        * and compute how large a portion of the total progress a single
+        * processed buffer is.
+        */
+       ts_heap = binaryheap_allocate(num_spaces,
+                                                                 ts_ckpt_progress_comparator,
+                                                                 NULL);
+
+       for (i = 0; i < num_spaces; i++)
+       {
+               CkptTsStatus *ts_stat = &per_ts_stat[i];
+
+               ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
+
+               binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+       }
+
+       binaryheap_build(ts_heap);
+
+       /*
+        * Iterate through to-be-checkpointed buffers and write the ones (still)
+        * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
+        * tablespaces; otherwise the sorting would lead to only one tablespace
+        * receiving writes at a time, making inefficient use of the hardware.
+        */
+       num_processed = 0;
        num_written = 0;
-       while (num_to_scan-- > 0)
+       while (!binaryheap_empty(ts_heap))
        {
-               BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+               BufferDesc *bufHdr = NULL;
+               CkptTsStatus *ts_stat = (CkptTsStatus *)
+               DatumGetPointer(binaryheap_first(ts_heap));
+
+               buf_id = CkptBufferIds[ts_stat->index].buf_id;
+               Assert(buf_id != -1);
+
+               bufHdr = GetBufferDescriptor(buf_id);
+
+               num_processed++;
 
                /*
                 * We don't need to acquire the lock here, because we're only looking
                                TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
                                BgWriterStats.m_buf_written_checkpoints++;
                                num_written++;
+                       }
+               }
 
-                               /*
-                                * We know there are at most num_to_write buffers with
-                                * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
-                                * num_written reaches num_to_write.
-                                *
-                                * Note that num_written doesn't include buffers written by
-                                * other backends, or by the bgwriter cleaning scan. That
-                                * means that the estimate of how much progress we've made is
-                                * conservative, and also that this test will often fail to
-                                * trigger.  But it seems worth making anyway.
-                                */
-                               if (num_written >= num_to_write)
-                                       break;
+               /*
+                * Measure progress independent of actualy having to flush the buffer
+                * - otherwise writing become unbalanced.
+                */
+               ts_stat->progress += ts_stat->progress_slice;
+               ts_stat->num_scanned++;
+               ts_stat->index++;
 
-                               /*
-                                * Sleep to throttle our I/O rate.
-                                */
-                               CheckpointWriteDelay(flags, (double) num_written / num_to_write);
-                       }
+               /* Have all the buffers from the tablespace been processed? */
+               if (ts_stat->num_scanned == ts_stat->num_to_scan)
+               {
+                       binaryheap_remove_first(ts_heap);
+               }
+               else
+               {
+                       /* update heap with the new progress */
+                       binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
                }
 
-               if (++buf_id >= NBuffers)
-                       buf_id = 0;
+               /*
+                * Sleep to throttle our I/O rate.
+                */
+               CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
        }
 
        /* issue all pending flushes */
        IssuePendingWritebacks(&wb_context);
 
+       pfree(per_ts_stat);
+       per_ts_stat = NULL;
+       binaryheap_free(ts_heap);
+
        /*
         * Update checkpoint statistics. As noted above, this doesn't include
         * buffers written by other backends or bgwriter scan.
        return 0;
 }
 
+/*
+ * Comparator determining the writeout order in a checkpoint.
+ *
+ * It is important that tablespaces are compared first, the logic balancing
+ * writes between tablespaces relies on it.
+ */
+static int
+ckpt_buforder_comparator(const void *pa, const void *pb)
+{
+       const CkptSortItem *a = (CkptSortItem *) pa;
+       const CkptSortItem *b = (CkptSortItem *) pb;
+
+       /* compare tablespace */
+       if (a->tsId < b->tsId)
+               return -1;
+       else if (a->tsId > b->tsId)
+               return 1;
+       /* compare relation */
+       if (a->relNode < b->relNode)
+               return -1;
+       else if (a->relNode > b->relNode)
+               return 1;
+       /* compare fork */
+       else if (a->forkNum < b->forkNum)
+               return -1;
+       else if (a->forkNum > b->forkNum)
+               return 1;
+       /* compare block number */
+       else if (a->blockNum < b->blockNum)
+               return -1;
+       else    /* should not be the same block ... */
+               return 1;
+}
+
+/*
+ * Comparator for a Min-Heap over the per-tablespace checkpoint completion
+ * progress.
+ */
+static int
+ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
+{
+       CkptTsStatus *sa = (CkptTsStatus *) a;
+       CkptTsStatus *sb = (CkptTsStatus *) b;
+
+       /* we want a min-heap, so return 1 for the a < b */
+       if (sa->progress < sb->progress)
+               return 1;
+       else if (sa->progress == sb->progress)
+               return 0;
+       else
+               return -1;
+}
+
 /*
  * Initialize a writeback context, discarding potential previous state.
  *