bdr: moved apply/log update check code to a dedicated function
authorChristian Kruse <cjk@defunct.ch>
Fri, 28 Mar 2014 14:58:02 +0000 (15:58 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:22 +0000 (17:55 +0200)
contrib/bdr/bdr_apply.c

index cd4d2a6ce0e071789cd37ac5ab179d886291f5ce..c18f7957eaf7a0098dcb02d7db822e268b2f7b84 100644 (file)
@@ -74,6 +74,10 @@ static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple
 
 static void check_sequencer_wakeup(Relation rel);
 
+static bool check_apply_update(RepNodeId local_node_id, TimestampTz ts, bool *log_update);
+static void do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts, Relation idxrel, HeapTuple old_key);
+static void do_apply_update(Relation rel, ItemPointerData oldtid, HeapTuple old_tuple, BDRTupleData new_tuple);
+
 bool       request_sequencer_wakeup = false;
 bool       started_transaction = false;
 Oid            QueuedDDLCommandsRelid = InvalidOid;
@@ -432,7 +436,7 @@ process_remote_update(StringInfo s)
    {
        old_key = read_tuple(s, rel);
        action = pq_getmsgbyte(s);
-       primary_key_changed = true;;
+       primary_key_changed = true;
    }
 
    /* check for new  tuple */
@@ -490,10 +494,6 @@ process_remote_update(StringInfo s)
        bool        apply_update;
        bool        log_update;
 
-       uint64      local_sysid,
-                   remote_sysid;
-       TimeLineID  local_tli,
-                   remote_tli;
        CommitExtraData local_node_id_raw;
 
        ItemPointerCopy(&oldtid, &oldtuple.t_self);
@@ -517,102 +517,13 @@ process_remote_update(StringInfo s)
        TransactionIdGetCommitTsData(xmin, &ts, &local_node_id_raw);
        local_node_id = local_node_id_raw;
 
-       if (local_node_id == bdr_apply_con->origin_id)
-       {
-           /*
-            * If the row got updated twice within a single node, just apply
-            * the update with no conflict.  Don't warn/log either, regardless
-            * of the timing; that's just too common and valid since normal row
-            * level locking guarantees are met.
-            */
-           apply_update = true;
-           log_update = false;
-       }
-       else
-       {
-           int     cmp;
-
-           /*
-            * Decide what update wins based on transaction timestamp difference.
-            * The later transaction wins.  If the timestamps compare equal,
-            * use sysid + TLI to discern.
-            */
-
-           cmp = timestamptz_cmp_internal(replication_origin_timestamp, ts);
-
-           if (cmp > 0)
-           {
-               apply_update = true;
-               log_update = false;
-           }
-           else if (cmp == 0)
-           {
-               log_update = true;
-
-               fetch_sysid_via_node_id(local_node_id,
-                                       &local_sysid, &local_tli);
-               fetch_sysid_via_node_id(bdr_apply_con->origin_id,
-                                       &remote_sysid, &remote_tli);
-
-               if (local_sysid < remote_sysid)
-                   apply_update = true;
-               else if (local_sysid > remote_sysid)
-                   apply_update = false;
-               else if (local_tli < remote_tli)
-                   apply_update = true;
-               else if (local_tli > remote_tli)
-                   apply_update = false;
-               else
-                   /* shouldn't happen */
-                   elog(ERROR, "unsuccessful node comparison");
-           }
-           else
-           {
-               apply_update = false;
-               log_update = true;
-           }
-       }
+       apply_update = check_apply_update(local_node_id, ts, &log_update);
 
        if (log_update)
-       {
-           char        remote_ts[MAXDATELEN + 1];
-           char        local_ts[MAXDATELEN + 1];
-
-           fetch_sysid_via_node_id(local_node_id,
-                                   &local_sysid, &local_tli);
-           fetch_sysid_via_node_id(bdr_apply_con->origin_id,
-                                   &remote_sysid, &remote_tli);
-           Assert(remote_sysid == bdr_apply_con->sysid);
-           Assert(remote_tli == bdr_apply_con->timeline);
-
-           memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
-                  MAXDATELEN);
-           memcpy(local_ts, timestamptz_to_str(ts),
-                  MAXDATELEN);
-
-           initStringInfo(&s_key);
-           tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
-
-           ereport(LOG,
-                   (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                    errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
-                           apply_update ? "applying" : "skipping",
-                           remote_sysid, remote_tli, remote_ts,
-                     local_node_id == InvalidRepNodeId ? "local" : "remote",
-                           local_sysid, local_tli, local_ts, s_key.data)));
-           resetStringInfo(&s_key);
-       }
+           do_log_update(local_node_id, apply_update, ts, idxrel, old_key);
 
        if (apply_update)
-       {
-           HeapTuple nt;
-           Assert(old_tuple != NULL);
-           nt = heap_modify_tuple(old_tuple, RelationGetDescr(rel),
-                                  new_tuple.values, new_tuple.isnull, new_tuple.changed);
-           simple_heap_update(rel, &oldtid, nt);
-           UserTableUpdateIndexes(rel, nt);
-           bdr_count_update();
-       }
+           do_apply_update(rel, oldtid, old_tuple, new_tuple);
        else
            bdr_count_update_conflict();
    }
@@ -625,12 +536,13 @@ process_remote_update(StringInfo s)
        ereport(ERROR,
                (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
                 errmsg("CONFLICT: could not find existing tuple for pkey %s", s_key.data)));
-       /* XXX dead code */
-       resetStringInfo(&s_key);
-       goto err;
-   }
 
-err:
+       /*
+        * idxrel and rel will be closed by context cleanup, s_key will be
+        * cleaned up as well
+        * TODO but what about check_sequencer_wakeup()?
+        */
+   }
 
    check_sequencer_wakeup(rel);
 
@@ -639,6 +551,126 @@ err:
    heap_close(rel, NoLock);
 }
 
+static bool
+check_apply_update(RepNodeId local_node_id, TimestampTz ts, bool *log_update)
+{
+   uint64      local_sysid,
+               remote_sysid;
+   TimeLineID  local_tli,
+               remote_tli;
+   int         cmp;
+
+   if (local_node_id == bdr_apply_con->origin_id)
+   {
+       /*
+        * If the row got updated twice within a single node, just apply the
+        * update with no conflict.  Don't warn/log either, regardless of the
+        * timing; that's just too common and valid since normal row level
+        * locking guarantees are met.
+        */
+       *log_update = false;
+       return true;
+   }
+   else
+   {
+       /*
+        * Decide what update wins based on transaction timestamp difference.
+        * The later transaction wins.  If the timestamps compare equal, use
+        * sysid + TLI to discern.
+        */
+
+       cmp = timestamptz_cmp_internal(replication_origin_timestamp, ts);
+
+       if (cmp > 0)
+       {
+           *log_update = false;
+           return true;
+       }
+       else if (cmp == 0)
+       {
+           *log_update = true;
+
+           fetch_sysid_via_node_id(local_node_id,
+                                   &local_sysid, &local_tli);
+           fetch_sysid_via_node_id(bdr_apply_con->origin_id,
+                                   &remote_sysid, &remote_tli);
+
+           if (local_sysid < remote_sysid)
+               return true;
+           else if (local_sysid > remote_sysid)
+               return false;
+           else if (local_tli < remote_tli)
+               return true;
+           else if (local_tli > remote_tli)
+               return false;
+           else
+               /* shouldn't happen */
+               elog(ERROR, "unsuccessful node comparison");
+       }
+       else
+       {
+           *log_update = true;
+           return false;
+       }
+   }
+
+   /* XXX dead code */
+   return false;
+}
+
+static void
+do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
+             Relation idxrel, HeapTuple old_key)
+{
+   StringInfoData s_key;
+   char        remote_ts[MAXDATELEN + 1];
+   char        local_ts[MAXDATELEN + 1];
+
+   uint64      local_sysid,
+               remote_sysid;
+   TimeLineID  local_tli,
+               remote_tli;
+
+
+   fetch_sysid_via_node_id(local_node_id,
+                           &local_sysid, &local_tli);
+   fetch_sysid_via_node_id(bdr_apply_con->origin_id,
+                           &remote_sysid, &remote_tli);
+
+   Assert(remote_sysid == bdr_apply_con->sysid);
+   Assert(remote_tli == bdr_apply_con->timeline);
+
+   memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
+          MAXDATELEN);
+   memcpy(local_ts, timestamptz_to_str(ts),
+          MAXDATELEN);
+
+   initStringInfo(&s_key);
+   tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
+
+   ereport(LOG,
+           (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+            errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
+                   apply_update ? "applying" : "skipping",
+                   remote_sysid, remote_tli, remote_ts,
+                   local_node_id == InvalidRepNodeId ? "local" : "remote",
+                   local_sysid, local_tli, local_ts, s_key.data)));
+   resetStringInfo(&s_key);
+}
+
+static void
+do_apply_update(Relation rel, ItemPointerData oldtid, HeapTuple old_tuple, BDRTupleData new_tuple)
+{
+   HeapTuple   nt;
+
+   Assert(old_tuple != NULL);
+   nt = heap_modify_tuple(old_tuple, RelationGetDescr(rel),
+                     new_tuple.values, new_tuple.isnull, new_tuple.changed);
+   simple_heap_update(rel, &oldtid, nt);
+   UserTableUpdateIndexes(rel, nt);
+   bdr_count_update();
+}
+
 void
 process_remote_delete(StringInfo s)
 {