bdr: Use the old local tuple to build the new tuple version for updates.
authorAndres Freund <andres@anarazel.de>
Fri, 14 Mar 2014 22:31:38 +0000 (23:31 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:20 +0000 (17:55 +0200)
This allows us to handle unchanged toast columns.

contrib/bdr/bdr_apply.c

index b53bc624113ced0b6c530e4e60646bd1a0666716..3f866de5052b1963d246abf6e4403d021632a4f3 100644 (file)
@@ -60,7 +60,7 @@ typedef struct BDRTupleData
 } BDRTupleData;
 
 static void build_scan_key(ScanKey skey, Relation rel, Relation idx_rel, HeapTuple key);
-static bool find_pkey_tuple(ScanKey skey, Relation rel, Relation idx_rel, ItemPointer tid, bool lock);
+static HeapTuple find_pkey_tuple(ScanKey skey, Relation rel, Relation idx_rel, ItemPointer tid, bool lock);
 static void UserTableUpdateIndexes(Relation rel, HeapTuple tuple);
 static Relation read_rel(StringInfo s, LOCKMODE mode);
 extern void read_tuple_parts(StringInfo s, Relation rel, BDRTupleData *tup);
@@ -327,13 +327,13 @@ process_remote_update(StringInfo s)
    StringInfoData s_key;
    char        action;
    HeapTuple   old_key;
-   HeapTuple   new_tuple;
+   HeapTuple   old_tuple;
+   BDRTupleData new_tuple;
    Oid         idxoid;
    HeapTuple   generated_key = NULL;
    ItemPointerData oldtid;
    Relation    rel;
    Relation    idxrel;
-   bool        found_old;
    ScanKeyData skey[INDEX_MAX_KEYS];
    bool        primary_key_changed = false;
 
@@ -358,20 +358,22 @@ process_remote_update(StringInfo s)
        elog(ERROR, "expected action 'N', got %c",
             action);
 
-   /* read new tuple */
-   new_tuple = read_tuple(s, rel);
-
    if (rel->rd_rel->relkind != RELKIND_RELATION)
        elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
             rel->rd_rel->relkind, RelationGetRelationName(rel));
 
+   /* read new tuple */
+   read_tuple_parts(s, rel, &new_tuple);
+
    /*
     * Check which tuple we want to use for the pkey lookup.
     */
    if (!primary_key_changed)
    {
        /* key hasn't changed, just use columns from the new tuple */
-       old_key = new_tuple;
+       old_key = heap_form_tuple(RelationGetDescr(rel),
+                                 new_tuple.values, new_tuple.isnull);
+
    }
 
    /* lookup index to build scankey */
@@ -393,9 +395,9 @@ process_remote_update(StringInfo s)
    build_scan_key(skey, rel, idxrel, old_key);
 
    /* look for tuple identified by the (old) primary key */
-   found_old = find_pkey_tuple(skey, rel, idxrel, &oldtid, true);
+   old_tuple = find_pkey_tuple(skey, rel, idxrel, &oldtid, true);
 
-   if (found_old)
+   if (old_key != NULL)
    {
        HeapTupleData oldtuple;
        Buffer      buf;
@@ -521,9 +523,13 @@ process_remote_update(StringInfo s)
 
        if (apply_update)
        {
-           simple_heap_update(rel, &oldtid, new_tuple);
+           HeapTuple nt;
+           Assert(old_tuple != NULL);
+           nt = heap_modify_tuple(old_tuple, RelationGetDescr(rel),
+                                  new_tuple.values, new_tuple.isnull, new_tuple.changed);
+           simple_heap_update(rel, &oldtid, nt);
            /* FIXME: HOT support */
-           UserTableUpdateIndexes(rel, new_tuple);
+           UserTableUpdateIndexes(rel, nt);
            bdr_count_update();
        }
        else
@@ -605,7 +611,7 @@ process_remote_delete(StringInfo s)
    build_scan_key(skey, rel, idxrel, old_key);
 
    /* try to find tuple via a (candidate|primary) key */
-   found_old = find_pkey_tuple(skey, rel, idxrel, &oldtid, true);
+   found_old = find_pkey_tuple(skey, rel, idxrel, &oldtid, true) != NULL;
 
    if (found_old)
    {
@@ -988,12 +994,12 @@ build_scan_key(ScanKey skey, Relation rel, Relation idxrel, HeapTuple key)
  * If a matching tuple is found setup 'tid' to point to it and return true,
  * false is returned otherwise.
  */
-static bool
+static HeapTuple
 find_pkey_tuple(ScanKey skey, Relation rel, Relation idxrel,
                ItemPointer tid, bool lock)
 {
-   HeapTuple   tuple;
-   bool        found = false;
+   HeapTuple   scantuple;
+   HeapTuple   tuple = NULL;
    IndexScanDesc scan;
    Snapshot snap = GetActiveSnapshot();
 
@@ -1009,17 +1015,17 @@ find_pkey_tuple(ScanKey skey, Relation rel, Relation idxrel,
                           0);
    index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
 
-   while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+   while ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
    {
-       if (found)
+       if (tuple != NULL)
            elog(ERROR, "WTF, more than one tuple found via pk???");
-       found = true;
-       ItemPointerCopy(&tuple->t_self, tid);
+       tuple = heap_copytuple(scantuple);
+       ItemPointerCopy(&scantuple->t_self, tid);
    }
 
    index_endscan(scan);
 
-   if (lock && found)
+   if (lock && tuple != NULL)
    {
        Buffer buf;
        HeapUpdateFailureData hufd;
@@ -1046,7 +1052,8 @@ find_pkey_tuple(ScanKey skey, Relation rel, Relation idxrel,
        }
        ReleaseBuffer(buf);
    }
-   return found;
+
+   return tuple;
 }
 
 void