More hacking.
authorRobert Haas <rhaas@postgresql.org>
Thu, 20 Feb 2014 03:00:30 +0000 (22:00 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 20 Feb 2014 03:00:30 +0000 (22:00 -0500)
src/backend/utils/mmgr/freepage.c

index eeb2ab23a8632fc922987ef1630e9780a7e01fe8..5f1b73b1ae6dc905e6b62e161a1822b86c77061b 100644 (file)
@@ -85,21 +85,26 @@ typedef struct FreePageBtreeSearchResult
 /* Helper functions */
 static void FreePageBtreeAdjustAncestorKeys(FreePageManager *fpm,
                                        FreePageBtree *btp);
-static bool FreePageManagerGetInternal(FreePageManager *fpm, Size npages,
-                                                  Size *first_page);
+static Size FreePageBtreeFirstKey(FreePageBtree *btp);
+static void FreePageBtreeInsertInternal(char *base, FreePageBtree *btp,
+                                                       Size index, Size first_page, FreePageBtree *child);
+static void FreePageBtreeInsertLeaf(FreePageBtree *btp, Size index,
+                                               Size first_page, Size npages);
 static void FreePageBtreeRecycle(FreePageManager *fpm, Size pageno);
-static void FreePageBtreeRemove(FreePageManager *fpm, FreePageBtree *btp,
-                                       Size index);
-static void FreePagePushSpanLeader(FreePageManager *fpm, Size first_page,
-                                          Size npages);
-static bool FreePageManagerPutInternal(FreePageManager *fpm, Size first_page,
-                                                  Size npages, bool soft);
 static void FreePageBtreeRemove(FreePageManager *fpm, FreePageBtree *btp,
                                        Size index);
 static void FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
                                        FreePageBtreeSearchResult *result);
 static Size FreePageBtreeSearchInternal(FreePageBtree *btp, Size first_page);
 static Size FreePageBtreeSearchLeaf(FreePageBtree *btp, Size first_page);
+static FreePageBtree *FreePageBtreeSplitPage(FreePageManager *fpm,
+                                          FreePageBtree *btp);
+static bool FreePageManagerGetInternal(FreePageManager *fpm, Size npages,
+                                                  Size *first_page);
+static bool FreePageManagerPutInternal(FreePageManager *fpm, Size first_page,
+                                                  Size npages, bool soft);
+static void FreePagePushSpanLeader(FreePageManager *fpm, Size first_page,
+                                          Size npages);
 
 /*
  * Initialize a new, empty free page manager.
@@ -404,6 +409,21 @@ FreePageManagerGetInternal(FreePageManager *fpm, Size npages, Size *first_page)
        return true;
 }
 
+/*
+ * Get the first key on a btree page.
+ */
+static Size
+FreePageBtreeFirstKey(FreePageBtree *btp)
+{
+       if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+               return btp->u.leaf_key[0].first_page;
+       else
+       {
+               Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+               return btp->u.internal_key[0].first_page;
+       }
+}
+
 /*
  * Get a page from the btree recycle list for use as a btree page.
  */
@@ -442,6 +462,34 @@ FreePageBtreeRecycle(FreePageManager *fpm, Size pageno)
        relptr_store(base, fpm->btree_recycle, span);
 }
 
+/*
+ * Insert an item into an internal page.
+ */
+static void
+FreePageBtreeInsertInternal(char *base, FreePageBtree *btp, Size index,
+                                                       Size first_page, FreePageBtree *child)
+{
+       memmove(&btp->u.internal_key[index + 1], &btp->u.internal_key[index],
+                       sizeof(FreePageBtreeInternalKey) * (btp->hdr.nused - index));
+       btp->u.internal_key[index].first_page = first_page;
+       relptr_store(base, btp->u.internal_key[index].child, child);
+       ++btp->hdr.nused;
+}
+
+/*
+ * Insert an item into a leaf page.
+ */
+static void
+FreePageBtreeInsertLeaf(FreePageBtree *btp, Size index, Size first_page,
+                                               Size npages)
+{
+       memmove(&btp->u.leaf_key[index + 1], &btp->u.leaf_key[index],
+                       sizeof(FreePageBtreeLeafKey) * (btp->hdr.nused - index));
+       btp->u.leaf_key[index].first_page = first_page;
+       btp->u.leaf_key[index].npages = npages;
+       ++btp->hdr.nused;
+}
+
 /*
  * Remove an item from the btree at the given position on the given page.
  */
@@ -722,6 +770,38 @@ FreePageBtreeSearchLeaf(FreePageBtree *btp, Size first_page)
        return low;
 }
 
+/*
+ * Allocate a new btree page and move half the keys from the provided page
+ * to the new page.  Caller is responsible for making sure that there's a
+ * page available from fpm->btree_recycle.  Returns a pointer to the new page,
+ * to which caller must add a downlink.
+ */
+static FreePageBtree *
+FreePageBtreeSplitPage(FreePageManager *fpm, FreePageBtree *btp)
+{
+       FreePageBtree *newsibling;
+
+       newsibling = FreePageBtreeGetRecycled(fpm);
+       newsibling->hdr.magic = btp->hdr.magic;
+       newsibling->hdr.nused = btp->hdr.nused / 2;
+       relptr_copy(newsibling->hdr.parent, btp->hdr.parent);
+       btp->hdr.nused -= newsibling->hdr.nused;
+
+       if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+               memcpy(&newsibling->u.leaf_key,
+                          &btp->u.leaf_key[btp->hdr.nused],
+                          sizeof(FreePageBtreeLeafKey) * newsibling->hdr.nused);
+       else
+       {
+               Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+               memcpy(&newsibling->u.internal_key,
+                          &btp->u.internal_key[btp->hdr.nused],
+                          sizeof(FreePageBtreeInternalKey) * newsibling->hdr.nused);
+       }
+
+       return newsibling;
+}
+
 /*
  * Remove a FreePageSpanLeader from the linked-list that contains it, either
  * because we're changing the size of the span, or because we're allocating it.
@@ -782,10 +862,10 @@ static bool
 FreePageManagerPutInternal(FreePageManager *fpm, Size first_page, Size npages,
                                                   bool soft)
 {
+       char *base = fpm_segment_base(fpm);
        FreePageBtreeSearchResult result;
        FreePageBtreeLeafKey *prevkey = NULL;
        FreePageBtreeLeafKey *nextkey = NULL;
-       FreePageBtree *btp;
        Size    index;
 
        /*
@@ -806,7 +886,6 @@ FreePageManagerPutInternal(FreePageManager *fpm, Size first_page, Size npages,
         */
        if (fpm->btree_depth == 0)
        {
-               char *base = fpm_segment_base(fpm);
                Size    root_page;
                FreePageBtree *root;
 
@@ -963,23 +1042,106 @@ FreePageManagerPutInternal(FreePageManager *fpm, Size first_page, Size npages,
                /* If we still need to perform a split, do it. */
                if (result.split_pages > 0)
                {
-                       /*
-                        * XXX. Split the page and as many of its ancestors as needed.
-                        * Make sure the search object remains up to date, or re-do the
-                        * search after splitting.
-                        */
+                       FreePageBtree   *target = result.page_next;
+                       FreePageBtree   *child = NULL;
+                       Size    key = first_page;
+
+                       for (;;)
+                       {
+                               FreePageBtree *newsibling;
+                               FreePageBtree *parent;
+
+                               /* Identify parent page, which must receive downlink. */
+                               parent = relptr_access(base, target->hdr.parent);
+
+                               /* Split the page - downlink not added yet. */
+                               newsibling = FreePageBtreeSplitPage(fpm, target);
+
+                               /*
+                                * At this point in the loop, we're always carrying a pending
+                                * insertion.  On the first pass, it's the actual key we're
+                                * trying to insert; on subsequent passes, it's the downlink
+                                * that needs to be added as a result of the split performed
+                                * during the previous loop iteration.  Since we've just split
+                                * the page, there's definitely room on one of the two
+                                * resulting pages.
+                                */
+                               if (child == NULL)
+                               {
+                                       Size    index;
+                                       FreePageBtree *insert_into;
+
+                                       insert_into = key < newsibling->u.leaf_key[0].first_page ?
+                                               target : newsibling;
+                                       index = FreePageBtreeSearchLeaf(insert_into, key);
+                                       FreePageBtreeInsertLeaf(insert_into, index, key, npages);
+                                       if (index == 0 && insert_into == target)
+                                               FreePageBtreeAdjustAncestorKeys(fpm, target);
+                               }
+                               else
+                               {
+                                       Size    index;
+                                       FreePageBtree *insert_into;
+
+                                       insert_into =
+                                               key < newsibling->u.internal_key[0].first_page ?
+                                               target : newsibling;
+                                       index = FreePageBtreeSearchInternal(insert_into, key);
+                                       FreePageBtreeInsertInternal(base, insert_into, index,
+                                                                                               key, child);
+                                       if (index == 0 && insert_into == target)
+                                               FreePageBtreeAdjustAncestorKeys(fpm, target);
+                               }
+
+                               /* If the page we just split has no parent, split the root. */
+                               if (parent == NULL)
+                               {
+                                       FreePageBtree *newroot;
+
+                                       newroot = FreePageBtreeGetRecycled(fpm);
+                                       newroot->hdr.magic = FREE_PAGE_INTERNAL_MAGIC;
+                                       newroot->hdr.nused = 2;
+                                       relptr_store(base, newroot->hdr.parent,
+                                                                (FreePageBtree *) NULL);
+                                       newroot->u.internal_key[0].first_page =
+                                               FreePageBtreeFirstKey(target);
+                                       relptr_store(base, newroot->u.internal_key[0].child,
+                                               target);
+                                       newroot->u.internal_key[1].first_page =
+                                               FreePageBtreeFirstKey(newsibling);
+                                       relptr_store(base, newroot->u.internal_key[1].child,
+                                               target);
+
+                                       break;
+                               }
+
+                               /* If the parent page isn't full, insert the downlink. */
+                               key = newsibling->u.internal_key[0].first_page;
+                               if (parent->hdr.nused < FPM_ITEMS_PER_INTERNAL_PAGE)
+                               {
+                                       Size    index;
+
+                                       index = FreePageBtreeSearchInternal(parent, key);
+                                       FreePageBtreeInsertInternal(base, parent, index,
+                                                                                               key, newsibling);
+                                       if (index == 0)
+                                               FreePageBtreeAdjustAncestorKeys(fpm, parent);
+                                       break;
+                               }
+
+                               /* The parent also needs to be split, so loop around. */
+                               child = newsibling;
+                               target = parent;
+                       }
+
+                       /* The loop above did the insert, so we're done! */                     
+                       return true;
                }
        }
 
        /* Physically add the key to the page. */
        Assert(result.page_next->hdr.nused < FPM_ITEMS_PER_LEAF_PAGE);
-       btp = result.page_next;
-       index = result.index_next;
-       memmove(&btp->u.leaf_key[index + 1], &btp->u.leaf_key[index],
-                       sizeof(FreePageBtreeLeafKey) * (btp->hdr.nused - index));
-       btp->u.leaf_key[index].first_page = first_page;
-       btp->u.leaf_key[index].npages = npages;
-       ++btp->hdr.nused;
+       FreePageBtreeInsertLeaf(result.page_next, index, first_page, npages);
 
        /* If new first key on page, ancestors might need adjustment. */
        if (index == 0)