#include "access/genam.h"
 #include "access/gist_private.h"
+#include "access/transam.h"
 #include "commands/vacuum.h"
+#include "lib/integerset.h"
 #include "miscadmin.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
+#include "utils/memutils.h"
 
-/* Working state needed by gistbulkdelete */
+/*
+ * State kept across vacuum stages.
+ */
 typedef struct
 {
+   IndexBulkDeleteResult stats;    /* must be first */
+
    IndexVacuumInfo *info;
-   IndexBulkDeleteResult *stats;
+
+   /*
+    * These are used to memorize all internal and empty leaf pages in the 1st
+    * vacuum stage.  They are used in the 2nd stage, to delete all the empty
+    * pages.
+    */
+   IntegerSet *internal_page_set;
+   IntegerSet *empty_leaf_set;
+   MemoryContext page_set_context;
+} GistBulkDeleteResult;
+
+/* Working state needed by gistbulkdelete */
+typedef struct
+{
+   GistBulkDeleteResult *stats;
    IndexBulkDeleteCallback callback;
    void       *callback_state;
    GistNSN     startNSN;
-   BlockNumber totFreePages;   /* true total # of free pages */
 } GistVacState;
 
-static void gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+static void gistvacuumscan(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
               IndexBulkDeleteCallback callback, void *callback_state);
 static void gistvacuumpage(GistVacState *vstate, BlockNumber blkno,
               BlockNumber orig_blkno);
+static void gistvacuum_delete_empty_pages(GistBulkDeleteResult *stats);
+static bool gistdeletepage(GistBulkDeleteResult *stats,
+              Buffer buffer, OffsetNumber downlink,
+              Buffer leafBuffer);
+
+/* allocate the 'stats' struct that's kept over vacuum stages */
+static GistBulkDeleteResult *
+create_GistBulkDeleteResult(void)
+{
+   GistBulkDeleteResult *gist_stats;
+
+   gist_stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+   gist_stats->page_set_context =
+       GenerationContextCreate(CurrentMemoryContext,
+                               "GiST VACUUM page set context",
+                               16 * 1024);
+
+   return gist_stats;
+}
 
 /*
  * VACUUM bulkdelete stage: remove index entries.
 gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
               IndexBulkDeleteCallback callback, void *callback_state)
 {
+   GistBulkDeleteResult *gist_stats = (GistBulkDeleteResult *) stats;
+
    /* allocate stats if first time through, else re-use existing struct */
-   if (stats == NULL)
-       stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+   if (gist_stats == NULL)
+       gist_stats = create_GistBulkDeleteResult();
 
-   gistvacuumscan(info, stats, callback, callback_state);
+   gistvacuumscan(info, gist_stats, callback, callback_state);
 
-   return stats;
+   return (IndexBulkDeleteResult *) gist_stats;
 }
 
 /*
- * VACUUM cleanup stage: update index statistics.
+ * VACUUM cleanup stage: delete empty pages, and update index statistics.
  */
 IndexBulkDeleteResult *
 gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 {
+   GistBulkDeleteResult *gist_stats = (GistBulkDeleteResult *) stats;
+
    /* No-op in ANALYZE ONLY mode */
    if (info->analyze_only)
        return stats;
     * stats from the latest gistbulkdelete call.  If it wasn't called, we
     * still need to do a pass over the index, to obtain index statistics.
     */
-   if (stats == NULL)
+   if (gist_stats == NULL)
    {
-       stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-       gistvacuumscan(info, stats, NULL, NULL);
+       gist_stats = create_GistBulkDeleteResult();
+       gistvacuumscan(info, gist_stats, NULL, NULL);
    }
 
+   /*
+    * If we saw any empty pages, try to unlink them from the tree so that
+    * they can be reused.
+    */
+   gistvacuum_delete_empty_pages(gist_stats);
+
+   /* we don't need the internal and empty page sets anymore */
+   MemoryContextDelete(gist_stats->page_set_context);
+   gist_stats->page_set_context = NULL;
+   gist_stats->internal_page_set = NULL;
+   gist_stats->empty_leaf_set = NULL;
+
    /*
     * It's quite possible for us to be fooled by concurrent page splits into
     * double-counting some index tuples, so disbelieve any total that exceeds
     */
    if (!info->estimated_count)
    {
-       if (stats->num_index_tuples > info->num_heap_tuples)
-           stats->num_index_tuples = info->num_heap_tuples;
+       if (gist_stats->stats.num_index_tuples > info->num_heap_tuples)
+           gist_stats->stats.num_index_tuples = info->num_heap_tuples;
    }
 
-   return stats;
+   return (IndexBulkDeleteResult *) gist_stats;
 }
 
 /*
  * btvacuumcleanup invoke this (the latter only if no btbulkdelete call
  * occurred).
  *
- * This also adds unused/delete pages to the free space map, although that
- * is currently not very useful.  There is currently no support for deleting
- * empty pages, so recycleable pages can only be found if an error occurs
- * while the index is being expanded, leaving an all-zeros page behind.
+ * This also makes note of any empty leaf pages, as well as all internal
+ * pages.  The second stage, gistvacuum_delete_empty_pages(), needs that
+ * information.  Any deleted pages are added directly to the free space map.
+ * (They should've been added there when they were originally deleted, already,
+ * but it's possible that the FSM was lost at a crash, for example.)
  *
  * The caller is responsible for initially allocating/zeroing a stats struct.
  */
 static void
-gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+gistvacuumscan(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
               IndexBulkDeleteCallback callback, void *callback_state)
 {
    Relation    rel = info->index;
     * Reset counts that will be incremented during the scan; needed in case
     * of multiple scans during a single VACUUM command.
     */
-   stats->estimated_count = false;
-   stats->num_index_tuples = 0;
-   stats->pages_deleted = 0;
+   stats->stats.estimated_count = false;
+   stats->stats.num_index_tuples = 0;
+   stats->stats.pages_deleted = 0;
+   stats->stats.pages_free = 0;
+   MemoryContextReset(stats->page_set_context);
+   stats->internal_page_set = intset_create();
+   stats->empty_leaf_set = intset_create();
 
    /* Set up info to pass down to gistvacuumpage */
-   vstate.info = info;
+   stats->info = info;
    vstate.stats = stats;
    vstate.callback = callback;
    vstate.callback_state = callback_state;
        vstate.startNSN = GetInsertRecPtr();
    else
        vstate.startNSN = gistGetFakeLSN(rel);
-   vstate.totFreePages = 0;
 
    /*
     * The outer loop iterates over all index pages, in physical order (we
     * Note that if no recyclable pages exist, we don't bother vacuuming the
     * FSM at all.
     */
-   if (vstate.totFreePages > 0)
+   if (stats->stats.pages_free > 0)
        IndexFreeSpaceMapVacuum(rel);
 
    /* update statistics */
-   stats->num_pages = num_pages;
-   stats->pages_free = vstate.totFreePages;
+   stats->stats.num_pages = num_pages;
 }
 
 /*
 static void
 gistvacuumpage(GistVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
 {
-   IndexVacuumInfo *info = vstate->info;
-   IndexBulkDeleteResult *stats = vstate->stats;
+   GistBulkDeleteResult *stats = vstate->stats;
+   IndexVacuumInfo *info = stats->info;
    IndexBulkDeleteCallback callback = vstate->callback;
    void       *callback_state = vstate->callback_state;
    Relation    rel = info->index;
    LockBuffer(buffer, GIST_EXCLUSIVE);
    page = (Page) BufferGetPage(buffer);
 
-   if (PageIsNew(page) || GistPageIsDeleted(page))
+   if (gistPageRecyclable(page))
    {
        /* Okay to recycle this page */
        RecordFreeIndexPage(rel, blkno);
-       vstate->totFreePages++;
-       stats->pages_deleted++;
+       stats->stats.pages_free++;
+       stats->stats.pages_deleted++;
+   }
+   else if (GistPageIsDeleted(page))
+   {
+       /* Already deleted, but can't recycle yet */
+       stats->stats.pages_deleted++;
    }
    else if (GistPageIsLeaf(page))
    {
        OffsetNumber todelete[MaxOffsetNumber];
        int         ntodelete = 0;
+       int         nremain;
        GISTPageOpaque opaque = GistPageGetOpaque(page);
        OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
 
 
            END_CRIT_SECTION();
 
-           stats->tuples_removed += ntodelete;
+           stats->stats.tuples_removed += ntodelete;
            /* must recompute maxoff */
            maxoff = PageGetMaxOffsetNumber(page);
        }
 
-       stats->num_index_tuples += maxoff - FirstOffsetNumber + 1;
+       nremain = maxoff - FirstOffsetNumber + 1;
+       if (nremain == 0)
+       {
+           /*
+            * The page is now completely empty.  Remember its block number,
+            * so that we will try to delete the page in the second stage.
+            *
+            * Skip this when recursing, because IntegerSet requires that the
+            * values are added in ascending order.  The next VACUUM will pick
+            * it up.
+            */
+           if (blkno == orig_blkno)
+               intset_add_member(stats->empty_leaf_set, blkno);
+       }
+       else
+           stats->stats.num_index_tuples += nremain;
    }
    else
    {
                         errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
                         errhint("Please REINDEX it.")));
        }
+
+       /*
+        * Remember the block number of this page, so that we can revisit it
+        * later in gistvacuum_delete_empty_pages(), when we search for
+        * parents of empty leaf pages.
+        */
+       if (blkno == orig_blkno)
+           intset_add_member(stats->internal_page_set, blkno);
    }
 
    UnlockReleaseBuffer(buffer);
        goto restart;
    }
 }
+
+/*
+ * Scan all internal pages, and try to delete their empty child pages.
+ */
+static void
+gistvacuum_delete_empty_pages(GistBulkDeleteResult *stats)
+{
+   IndexVacuumInfo *info = stats->info;
+   Relation    rel = info->index;
+   BlockNumber empty_pages_remaining;
+   uint64      blkno;
+
+   /*
+    * Rescan all inner pages to find those that have empty child pages.
+    */
+   empty_pages_remaining = intset_num_entries(stats->empty_leaf_set);
+   intset_begin_iterate(stats->internal_page_set);
+   while (empty_pages_remaining > 0 &&
+          intset_iterate_next(stats->internal_page_set, &blkno))
+   {
+       Buffer      buffer;
+       Page        page;
+       OffsetNumber off,
+                   maxoff;
+       OffsetNumber todelete[MaxOffsetNumber];
+       BlockNumber leafs_to_delete[MaxOffsetNumber];
+       int         ntodelete;
+       int         deleted;
+
+       buffer = ReadBufferExtended(rel, MAIN_FORKNUM, (BlockNumber) blkno,
+                                   RBM_NORMAL, info->strategy);
+
+       LockBuffer(buffer, GIST_SHARE);
+       page = (Page) BufferGetPage(buffer);
+
+       if (PageIsNew(page) || GistPageIsDeleted(page) || GistPageIsLeaf(page))
+       {
+           /*
+            * This page was an internal page earlier, but now it's something
+            * else. Shouldn't happen...
+            */
+           Assert(false);
+           UnlockReleaseBuffer(buffer);
+           continue;
+       }
+
+       /*
+        * Scan all the downlinks, and see if any of them point to empty leaf
+        * pages.
+        */
+       maxoff = PageGetMaxOffsetNumber(page);
+       ntodelete = 0;
+       for (off = FirstOffsetNumber;
+            off <= maxoff && ntodelete < maxoff - 1;
+            off = OffsetNumberNext(off))
+       {
+           ItemId      iid = PageGetItemId(page, off);
+           IndexTuple  idxtuple = (IndexTuple) PageGetItem(page, iid);
+           BlockNumber leafblk;
+
+           leafblk = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+           if (intset_is_member(stats->empty_leaf_set, leafblk))
+           {
+               leafs_to_delete[ntodelete] = leafblk;
+               todelete[ntodelete++] = off;
+           }
+       }
+
+       /*
+        * In order to avoid deadlock, child page must be locked before
+        * parent, so we must release the lock on the parent, lock the child,
+        * and then re-acquire the lock the parent.  (And we wouldn't want to
+        * do I/O, while holding a lock, anyway.)
+        *
+        * At the instant that we're not holding a lock on the parent, the
+        * downlink might get moved by a concurrent insert, so we must
+        * re-check that it still points to the same child page after we have
+        * acquired both locks.  Also, another backend might have inserted a
+        * tuple to the page, so that it is no longer empty.  gistdeletepage()
+        * re-checks all these conditions.
+        */
+       LockBuffer(buffer, GIST_UNLOCK);
+
+       deleted = 0;
+       for (int i = 0; i < ntodelete; i++)
+       {
+           Buffer      leafbuf;
+
+           /*
+            * Don't remove the last downlink from the parent.  That would
+            * confuse the insertion code.
+            */
+           if (PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+               break;
+
+           leafbuf = ReadBufferExtended(rel, MAIN_FORKNUM, leafs_to_delete[i],
+                                        RBM_NORMAL, info->strategy);
+           LockBuffer(leafbuf, GIST_EXCLUSIVE);
+           gistcheckpage(rel, leafbuf);
+
+           LockBuffer(buffer, GIST_EXCLUSIVE);
+           if (gistdeletepage(stats, buffer, todelete[i] - deleted, leafbuf))
+               deleted++;
+           LockBuffer(buffer, GIST_UNLOCK);
+
+           UnlockReleaseBuffer(leafbuf);
+       }
+
+       ReleaseBuffer(buffer);
+
+       /* update stats */
+       stats->stats.pages_removed += deleted;
+
+       /*
+        * We can stop the scan as soon as we have seen the downlinks, even if
+        * we were not able to remove them all.
+        */
+       empty_pages_remaining -= ntodelete;
+   }
+}
+
+/*
+ * gistdeletepage takes a leaf page, and its parent, and tries to delete the
+ * leaf.  Both pages must be locked.
+ *
+ * Even if the page was empty when we first saw it, a concurrent inserter might
+ * have added a tuple to it since.  Similarly, the downlink might have moved.
+ * We re-check all the conditions, to make sure the page is still deletable,
+ * before modifying anything.
+ *
+ * Returns true, if the page was deleted, and false if a concurrent update
+ * prevented it.
+ */
+static bool
+gistdeletepage(GistBulkDeleteResult *stats,
+              Buffer parentBuffer, OffsetNumber downlink,
+              Buffer leafBuffer)
+{
+   Page        parentPage = BufferGetPage(parentBuffer);
+   Page        leafPage = BufferGetPage(leafBuffer);
+   ItemId      iid;
+   IndexTuple  idxtuple;
+   XLogRecPtr  recptr;
+   TransactionId txid;
+
+   /*
+    * Check that the leaf is still empty and deletable.
+    */
+   if (!GistPageIsLeaf(leafPage))
+   {
+       /* a leaf page should never become a non-leaf page */
+       Assert(false);
+       return false;
+   }
+
+   if (GistFollowRight(leafPage))
+       return false;           /* don't mess with a concurrent page split */
+
+   if (PageGetMaxOffsetNumber(leafPage) != InvalidOffsetNumber)
+       return false;           /* not empty anymore */
+
+   /*
+    * Ok, the leaf is deletable.  Is the downlink in the parent page still
+    * valid?  It might have been moved by a concurrent insert.  We could try
+    * to re-find it by scanning the page again, possibly moving right if the
+    * was split.  But for now, let's keep it simple and just give up.  The
+    * next VACUUM will pick it up.
+    */
+   if (PageIsNew(parentPage) || GistPageIsDeleted(parentPage) ||
+       GistPageIsLeaf(parentPage))
+   {
+       /* shouldn't happen, internal pages are never deleted */
+       Assert(false);
+       return false;
+   }
+
+   if (PageGetMaxOffsetNumber(parentPage) < downlink
+       || PageGetMaxOffsetNumber(parentPage) <= FirstOffsetNumber)
+       return false;
+
+   iid = PageGetItemId(parentPage, downlink);
+   idxtuple = (IndexTuple) PageGetItem(parentPage, iid);
+   if (BufferGetBlockNumber(leafBuffer) !=
+       ItemPointerGetBlockNumber(&(idxtuple->t_tid)))
+       return false;
+
+   /*
+    * All good, proceed with the deletion.
+    *
+    * The page cannot be immediately recycled, because in-progress scans that
+    * saw the downlink might still visit it.  Mark the page with the current
+    * next-XID counter, so that we know when it can be recycled.  Once that
+    * XID becomes older than GlobalXmin, we know that all scans that are
+    * currently in progress must have ended.  (That's much more conservative
+    * than needed, but let's keep it safe and simple.)
+    */
+   txid = ReadNewTransactionId();
+
+   START_CRIT_SECTION();
+
+   /* mark the page as deleted */
+   MarkBufferDirty(leafBuffer);
+   GistPageSetDeleteXid(leafPage, txid);
+   GistPageSetDeleted(leafPage);
+   stats->stats.pages_deleted++;
+
+   /* remove the downlink from the parent */
+   MarkBufferDirty(parentBuffer);
+   PageIndexTupleDelete(parentPage, downlink);
+
+   if (RelationNeedsWAL(stats->info->index))
+       recptr = gistXLogPageDelete(leafBuffer, txid, parentBuffer, downlink);
+   else
+       recptr = gistGetFakeLSN(stats->info->index);
+   PageSetLSN(parentPage, recptr);
+   PageSetLSN(leafPage, recptr);
+
+   END_CRIT_SECTION();
+
+   return true;
+}
 
 #include "miscadmin.h"
 #include "storage/procarray.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 
 static MemoryContext opCtx;        /* working memory for operations */
 
    UnlockReleaseBuffer(buffer);
 }
 
+/* redo page deletion */
+static void
+gistRedoPageDelete(XLogReaderState *record)
+{
+   XLogRecPtr  lsn = record->EndRecPtr;
+   gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
+   Buffer      parentBuffer;
+   Buffer      leafBuffer;
+
+   if (XLogReadBufferForRedo(record, 0, &leafBuffer) == BLK_NEEDS_REDO)
+   {
+       Page        page = (Page) BufferGetPage(leafBuffer);
+
+       GistPageSetDeleteXid(page, xldata->deleteXid);
+       GistPageSetDeleted(page);
+
+       PageSetLSN(page, lsn);
+       MarkBufferDirty(leafBuffer);
+   }
+
+   if (XLogReadBufferForRedo(record, 1, &parentBuffer) == BLK_NEEDS_REDO)
+   {
+       Page        page = (Page) BufferGetPage(parentBuffer);
+
+       PageIndexTupleDelete(page, xldata->downlinkOffset);
+
+       PageSetLSN(page, lsn);
+       MarkBufferDirty(parentBuffer);
+   }
+
+   if (BufferIsValid(parentBuffer))
+       UnlockReleaseBuffer(parentBuffer);
+   if (BufferIsValid(leafBuffer))
+       UnlockReleaseBuffer(leafBuffer);
+}
+
+static void
+gistRedoPageReuse(XLogReaderState *record)
+{
+   gistxlogPageReuse *xlrec = (gistxlogPageReuse *) XLogRecGetData(record);
+
+   /*
+    * PAGE_REUSE records exist to provide a conflict point when we reuse
+    * pages in the index via the FSM.  That's all they do though.
+    *
+    * latestRemovedXid was the page's deleteXid.  The deleteXid <
+    * RecentGlobalXmin test in gistPageRecyclable() conceptually mirrors the
+    * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
+    * Consequently, one XID value achieves the same exclusion effect on
+    * master and standby.
+    */
+   if (InHotStandby)
+   {
+       ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+                                           xlrec->node);
+   }
+}
+
 void
 gist_redo(XLogReaderState *record)
 {
        case XLOG_GIST_DELETE:
            gistRedoDeleteRecord(record);
            break;
+       case XLOG_GIST_PAGE_REUSE:
+           gistRedoPageReuse(record);
+           break;
        case XLOG_GIST_PAGE_SPLIT:
            gistRedoPageSplitRecord(record);
            break;
        case XLOG_GIST_CREATE_INDEX:
            gistRedoCreateIndex(record);
            break;
+       case XLOG_GIST_PAGE_DELETE:
+           gistRedoPageDelete(record);
+           break;
        default:
            elog(PANIC, "gist_redo: unknown op code %u", info);
    }
    return recptr;
 }
 
+/*
+ * Write XLOG record describing a page deletion. This also includes removal of
+ * downlink from the parent page.
+ */
+XLogRecPtr
+gistXLogPageDelete(Buffer buffer, TransactionId xid,
+                  Buffer parentBuffer, OffsetNumber downlinkOffset)
+{
+   gistxlogPageDelete xlrec;
+   XLogRecPtr  recptr;
+
+   xlrec.deleteXid = xid;
+   xlrec.downlinkOffset = downlinkOffset;
+
+   XLogBeginInsert();
+   XLogRegisterData((char *) &xlrec, SizeOfGistxlogPageDelete);
+
+   XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+   XLogRegisterBuffer(1, parentBuffer, REGBUF_STANDARD);
+
+   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE);
+
+   return recptr;
+}
+
+/*
+ * Write XLOG record about reuse of a deleted page.
+ */
+void
+gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXid)
+{
+   gistxlogPageReuse xlrec_reuse;
+
+   /*
+    * Note that we don't register the buffer with the record, because this
+    * operation doesn't modify the page. This record only exists to provide a
+    * conflict point for Hot Standby.
+    */
+
+   /* XLOG stuff */
+   xlrec_reuse.node = rel->rd_node;
+   xlrec_reuse.block = blkno;
+   xlrec_reuse.latestRemovedXid = latestRemovedXid;
+
+   XLogBeginInsert();
+   XLogRegisterData((char *) &xlrec_reuse, SizeOfGistxlogPageReuse);
+
+   XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_REUSE);
+}
+
 /*
  * Write XLOG record describing a page update. The update can include any
  * number of deletions and/or insertions of tuples on a single index page.