- Add check of already changed page while replay WAL. This touches only
authorTeodor Sigaev <teodor@sigaev.ru>
Mon, 29 Oct 2007 19:27:21 +0000 (19:27 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Mon, 29 Oct 2007 19:27:21 +0000 (19:27 +0000)
ginRedoInsert(), because other ginRedo* functions rewrite whole page or
make changes which could be applied several times without consistent's loss

- Remove check of identifying of corresponding split record:
it's possible that replaying of WAL starts after actual page split, but before
removing of that split from incomplete splits list. In this case, that check
cause FATAL error.

Per stress test which reproduces bug reported by Craig McElroy
<craig.mcelroy@contegix.com>

src/backend/access/gin/ginxlog.c

index 03e34fd9840e4eb8117b34e4d5fb5d699e823c43..23ee18bcd78985de2e2380dddcae84c18389d2ac 100644 (file)
@@ -53,7 +53,6 @@ static void
 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
 {
        ListCell   *l;
-       bool            found = false;
 
        foreach(l, incomplete_splits)
        {
@@ -62,16 +61,9 @@ forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updat
                if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno)
                {
                        incomplete_splits = list_delete_ptr(incomplete_splits, split);
-                       found = true;
                        break;
                }
        }
-
-       if (!found)
-       {
-               elog(ERROR, "failed to identify corresponding split record for %u/%u/%u",
-                                       node.relNode, leftBlkno, updateBlkno);
-       }
 }
 
 static void
@@ -129,7 +121,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
 
-       /* nothing else to do if page was backed up (and no info to do it with) */
+       /* nothing else to do if page was backed up */
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
@@ -143,37 +135,44 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                Assert(data->isDelete == FALSE);
                Assert(GinPageIsData(page));
 
-               if (data->isLeaf)
+               if ( ! XLByteLE(lsn, PageGetLSN(page)) )
                {
-                       OffsetNumber i;
-                       ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       if (data->isLeaf)
+                       {
+                               OffsetNumber i;
+                               ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 
-                       Assert(GinPageIsLeaf(page));
-                       Assert(data->updateBlkno == InvalidBlockNumber);
+                               Assert(GinPageIsLeaf(page));
+                               Assert(data->updateBlkno == InvalidBlockNumber);
 
-                       for (i = 0; i < data->nitem; i++)
-                               GinDataPageAddItem(page, items + i, data->offset + i);
-               }
-               else
-               {
-                       PostingItem *pitem;
+                               for (i = 0; i < data->nitem; i++)
+                                       GinDataPageAddItem(page, items + i, data->offset + i);
+                       }
+                       else
+                       {
+                               PostingItem *pitem;
 
-                       Assert(!GinPageIsLeaf(page));
+                               Assert(!GinPageIsLeaf(page));
 
-                       if (data->updateBlkno != InvalidBlockNumber)
-                       {
-                               /* update link to right page after split */
-                               pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
-                               PostingItemSetBlockNumber(pitem, data->updateBlkno);
-                       }
+                               if (data->updateBlkno != InvalidBlockNumber)
+                               {
+                                       /* update link to right page after split */
+                                       pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
+                                       PostingItemSetBlockNumber(pitem, data->updateBlkno);
+                               }
 
-                       pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                               pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 
-                       GinDataPageAddItem(page, pitem, data->offset);
+                               GinDataPageAddItem(page, pitem, data->offset);
+                       }
+               }
 
-                       if (data->updateBlkno != InvalidBlockNumber)
-                               forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
+               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               {
+                       PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
                }
+
        }
        else
        {
@@ -181,36 +180,45 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 
                Assert(!GinPageIsData(page));
 
-               if (data->updateBlkno != InvalidBlockNumber)
+               if ( ! XLByteLE(lsn, PageGetLSN(page)) )
                {
-                       /* update link to right page after split */
-                       Assert(!GinPageIsLeaf(page));
-                       Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
-                       itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
-                       ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
-               }
+                       if (data->updateBlkno != InvalidBlockNumber)
+                       {
+                               /* update link to right page after split */
+                               Assert(!GinPageIsLeaf(page));
+                               Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
+                               itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
+                               ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
+                       }
 
-               if (data->isDelete)
-               {
-                       Assert(GinPageIsLeaf(page));
-                       Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
-                       PageIndexTupleDelete(page, data->offset);
-               }
+                       if (data->isDelete)
+                       {
+                               Assert(GinPageIsLeaf(page));
+                               Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
+                               PageIndexTupleDelete(page, data->offset);
+                       }
 
-               itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
 
-               if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
-                       elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                data->node.spcNode, data->node.dbNode, data->node.relNode);
+                       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
+                               elog(ERROR, "failed to add item to index page in %u/%u/%u",
+                                        data->node.spcNode, data->node.dbNode, data->node.relNode);
+               }
 
                if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               {
+                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
                        forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
+               }
        }
 
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
+       if ( ! XLByteLE(lsn, PageGetLSN(page)) ) 
+       {
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
 
-       MarkBufferDirty(buffer);
+               MarkBufferDirty(buffer);
+       }
        UnlockReleaseBuffer(buffer);
 }