* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.43 2006/03/31 23:32:06 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.44 2006/04/14 20:27:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "utils/hsearch.h"
 
 
+/*
+ * During XLOG replay, we may see XLOG records for incremental updates of
+ * pages that no longer exist, because their relation was later dropped or
+ * truncated.  (Note: this is only possible when full_page_writes = OFF,
+ * since when it's ON, the first reference we see to a page should always
+ * be a full-page rewrite not an incremental update.)  Rather than simply
+ * ignoring such records, we make a note of the referenced page, and then
+ * complain if we don't actually see a drop or truncate covering the page
+ * later in replay.
+ */
+typedef struct xl_invalid_page_key
+{
+   RelFileNode node;           /* the relation */
+   BlockNumber blkno;          /* the page */
+} xl_invalid_page_key;
+
+typedef struct xl_invalid_page
+{
+   xl_invalid_page_key key;    /* hash key ... must be first */
+   bool        present;        /* page existed but contained zeroes */
+} xl_invalid_page;
+
+static HTAB *invalid_page_tab = NULL;
+
+
+/* Log a reference to an invalid page */
+static void
+log_invalid_page(RelFileNode node, BlockNumber blkno, bool present)
+{
+   xl_invalid_page_key key;
+   xl_invalid_page *hentry;
+   bool        found;
+
+   /*
+    * Log references to invalid pages at DEBUG1 level.  This allows some
+    * tracing of the cause (note the elog context mechanism will tell us
+    * something about the XLOG record that generated the reference).
+    */
+   if (present)
+       elog(DEBUG1, "page %u of relation %u/%u/%u is uninitialized",
+            blkno, node.spcNode, node.dbNode, node.relNode);
+   else
+       elog(DEBUG1, "page %u of relation %u/%u/%u does not exist",
+            blkno, node.spcNode, node.dbNode, node.relNode);
+
+   if (invalid_page_tab == NULL)
+   {
+       /* create hash table when first needed */
+       HASHCTL     ctl;
+
+       memset(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(xl_invalid_page_key);
+       ctl.entrysize = sizeof(xl_invalid_page);
+       ctl.hash = tag_hash;
+
+       invalid_page_tab = hash_create("XLOG invalid-page table",
+                                      100,
+                                      &ctl,
+                                      HASH_ELEM | HASH_FUNCTION);
+   }
+
+   /* we currently assume xl_invalid_page_key contains no padding */
+   key.node = node;
+   key.blkno = blkno;
+   hentry = (xl_invalid_page *)
+       hash_search(invalid_page_tab, (void *) &key, HASH_ENTER, &found);
+
+   if (!found)
+   {
+       /* hash_search already filled in the key */
+       hentry->present = present;
+   }
+   else
+   {
+       /* repeat reference ... leave "present" as it was */
+   }
+}
+
+/* Forget any invalid pages >= minblkno, because they've been dropped */
+static void
+forget_invalid_pages(RelFileNode node, BlockNumber minblkno)
+{
+   HASH_SEQ_STATUS status;
+   xl_invalid_page *hentry;
+
+   if (invalid_page_tab == NULL)
+       return;                 /* nothing to do */
+
+   hash_seq_init(&status, invalid_page_tab);
+
+   while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
+   {
+       if (RelFileNodeEquals(hentry->key.node, node) &&
+           hentry->key.blkno >= minblkno)
+       {
+           elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
+                hentry->key.blkno, hentry->key.node.spcNode,
+                hentry->key.node.dbNode, hentry->key.node.relNode);
+
+           if (hash_search(invalid_page_tab,
+                           (void *) &hentry->key,
+                           HASH_REMOVE, NULL) == NULL)
+               elog(ERROR, "hash table corrupted");
+       }
+   }
+}
+
+/* Forget any invalid pages in a whole database */
+static void
+forget_invalid_pages_db(Oid dbid)
+{
+   HASH_SEQ_STATUS status;
+   xl_invalid_page *hentry;
+
+   if (invalid_page_tab == NULL)
+       return;                 /* nothing to do */
+
+   hash_seq_init(&status, invalid_page_tab);
+
+   while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
+   {
+       if (hentry->key.node.dbNode == dbid)
+       {
+           elog(DEBUG2, "page %u of relation %u/%u/%u has been dropped",
+                hentry->key.blkno, hentry->key.node.spcNode,
+                hentry->key.node.dbNode, hentry->key.node.relNode);
+
+           if (hash_search(invalid_page_tab,
+                           (void *) &hentry->key,
+                           HASH_REMOVE, NULL) == NULL)
+               elog(ERROR, "hash table corrupted");
+       }
+   }
+}
+
+/* Complain about any remaining invalid-page entries */
+void
+XLogCheckInvalidPages(void)
+{
+   HASH_SEQ_STATUS status;
+   xl_invalid_page *hentry;
+   bool        foundone = false;
+
+   if (invalid_page_tab == NULL)
+       return;                 /* nothing to do */
+
+   hash_seq_init(&status, invalid_page_tab);
+
+   /*
+    * Our strategy is to emit WARNING messages for all remaining entries
+    * and only PANIC after we've dumped all the available info.
+    */
+   while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
+   {
+       if (hentry->present)
+           elog(WARNING, "page %u of relation %u/%u/%u was uninitialized",
+                hentry->key.blkno, hentry->key.node.spcNode,
+                hentry->key.node.dbNode, hentry->key.node.relNode);
+       else
+           elog(WARNING, "page %u of relation %u/%u/%u did not exist",
+                hentry->key.blkno, hentry->key.node.spcNode,
+                hentry->key.node.dbNode, hentry->key.node.relNode);
+       foundone = true;
+   }
+
+   if (foundone)
+       elog(PANIC, "WAL contains references to invalid pages");
+}
+
+
 /*
  * XLogReadBuffer
  *     Read a page during XLOG replay
  * the page being "new" (all zeroes).
  *
  * If "init" is false then the caller needs the page to be valid already.
- * If the page doesn't exist or contains zeroes, we report failure.
- *
- * If the return value is InvalidBuffer (only possible when init = false),
- * the caller should silently skip the update on this page.  This currently
- * never happens, but we retain it as part of the API spec for possible future
- * use.
+ * If the page doesn't exist or contains zeroes, we return InvalidBuffer.
+ * In this case the caller should silently skip the update on this page.
+ * (In this situation, we expect that the page was later dropped or truncated.
+ * If we don't see evidence of that later in the WAL sequence, we'll complain
+ * at the end of WAL replay.)
  */
 Buffer
 XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
    {
        /* hm, page doesn't exist in file */
        if (!init)
-           elog(PANIC, "block %u of relation %u/%u/%u does not exist",
-                blkno, reln->rd_node.spcNode,
-                reln->rd_node.dbNode, reln->rd_node.relNode);
+       {
+           log_invalid_page(reln->rd_node, blkno, false);
+           return InvalidBuffer;
+       }
        /* OK to extend the file */
        /* we do this in recovery only - no rel-extension lock needed */
        Assert(InRecovery);
        Page    page = (Page) BufferGetPage(buffer);
 
        if (PageIsNew((PageHeader) page))
-           elog(PANIC, "block %u of relation %u/%u/%u is uninitialized",
-                blkno, reln->rd_node.spcNode,
-                reln->rd_node.dbNode, reln->rd_node.relNode);
+       {
+           UnlockReleaseBuffer(buffer);
+           log_invalid_page(reln->rd_node, blkno, true);
+           return InvalidBuffer;
+       }
    }
 
    return buffer;
 XLogInitRelationCache(void)
 {
    _xl_init_rel_cache();
+   invalid_page_tab = NULL;
 }
 
 void
  *
  * Currently, we don't bother to physically remove the relation from the
  * cache, we just let it age out normally.
+ *
+ * This also takes care of removing any open "invalid-page" records for
+ * the relation.
  */
 void
 XLogDropRelation(RelFileNode rnode)
 {
-   XLogRelDesc *rdesc;
    XLogRelCacheEntry *hentry;
 
    hentry = (XLogRelCacheEntry *)
        hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
 
-   if (!hentry)
-       return;                 /* not in cache so no work */
+   if (hentry)
+   {
+       XLogRelDesc *rdesc = hentry->rdesc;
 
-   rdesc = hentry->rdesc;
+       RelationCloseSmgr(&(rdesc->reldata));
+   }
 
-   RelationCloseSmgr(&(rdesc->reldata));
+   forget_invalid_pages(rnode, 0);
 }
 
 /*
        if (hentry->rnode.dbNode == dbid)
            RelationCloseSmgr(&(rdesc->reldata));
    }
+
+   forget_invalid_pages_db(dbid);
+}
+
+/*
+ * Truncate a relation during XLOG replay
+ *
+ * We don't need to do anything to the fake relcache, but we do need to
+ * clean up any open "invalid-page" records for the dropped pages.
+ */
+void
+XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks)
+{
+   forget_invalid_pages(rnode, nblocks);
 }