static void check_sequencer_wakeup(Relation rel);
+static bool check_apply_update(RepNodeId local_node_id, TimestampTz ts, bool *log_update);
+static void do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts, Relation idxrel, HeapTuple old_key);
+static void do_apply_update(Relation rel, ItemPointerData oldtid, HeapTuple old_tuple, BDRTupleData new_tuple);
+
bool request_sequencer_wakeup = false;
bool started_transaction = false;
Oid QueuedDDLCommandsRelid = InvalidOid;
{
old_key = read_tuple(s, rel);
action = pq_getmsgbyte(s);
- primary_key_changed = true;;
+ primary_key_changed = true;
}
/* check for new tuple */
bool apply_update;
bool log_update;
- uint64 local_sysid,
- remote_sysid;
- TimeLineID local_tli,
- remote_tli;
CommitExtraData local_node_id_raw;
ItemPointerCopy(&oldtid, &oldtuple.t_self);
TransactionIdGetCommitTsData(xmin, &ts, &local_node_id_raw);
local_node_id = local_node_id_raw;
- if (local_node_id == bdr_apply_con->origin_id)
- {
- /*
- * If the row got updated twice within a single node, just apply
- * the update with no conflict. Don't warn/log either, regardless
- * of the timing; that's just too common and valid since normal row
- * level locking guarantees are met.
- */
- apply_update = true;
- log_update = false;
- }
- else
- {
- int cmp;
-
- /*
- * Decide what update wins based on transaction timestamp difference.
- * The later transaction wins. If the timestamps compare equal,
- * use sysid + TLI to discern.
- */
-
- cmp = timestamptz_cmp_internal(replication_origin_timestamp, ts);
-
- if (cmp > 0)
- {
- apply_update = true;
- log_update = false;
- }
- else if (cmp == 0)
- {
- log_update = true;
-
- fetch_sysid_via_node_id(local_node_id,
- &local_sysid, &local_tli);
- fetch_sysid_via_node_id(bdr_apply_con->origin_id,
- &remote_sysid, &remote_tli);
-
- if (local_sysid < remote_sysid)
- apply_update = true;
- else if (local_sysid > remote_sysid)
- apply_update = false;
- else if (local_tli < remote_tli)
- apply_update = true;
- else if (local_tli > remote_tli)
- apply_update = false;
- else
- /* shouldn't happen */
- elog(ERROR, "unsuccessful node comparison");
- }
- else
- {
- apply_update = false;
- log_update = true;
- }
- }
+ apply_update = check_apply_update(local_node_id, ts, &log_update);
if (log_update)
- {
- char remote_ts[MAXDATELEN + 1];
- char local_ts[MAXDATELEN + 1];
-
- fetch_sysid_via_node_id(local_node_id,
- &local_sysid, &local_tli);
- fetch_sysid_via_node_id(bdr_apply_con->origin_id,
- &remote_sysid, &remote_tli);
- Assert(remote_sysid == bdr_apply_con->sysid);
- Assert(remote_tli == bdr_apply_con->timeline);
-
- memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
- MAXDATELEN);
- memcpy(local_ts, timestamptz_to_str(ts),
- MAXDATELEN);
-
- initStringInfo(&s_key);
- tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
-
- ereport(LOG,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
- apply_update ? "applying" : "skipping",
- remote_sysid, remote_tli, remote_ts,
- local_node_id == InvalidRepNodeId ? "local" : "remote",
- local_sysid, local_tli, local_ts, s_key.data)));
- resetStringInfo(&s_key);
- }
+ do_log_update(local_node_id, apply_update, ts, idxrel, old_key);
if (apply_update)
- {
- HeapTuple nt;
- Assert(old_tuple != NULL);
- nt = heap_modify_tuple(old_tuple, RelationGetDescr(rel),
- new_tuple.values, new_tuple.isnull, new_tuple.changed);
- simple_heap_update(rel, &oldtid, nt);
- UserTableUpdateIndexes(rel, nt);
- bdr_count_update();
- }
+ do_apply_update(rel, oldtid, old_tuple, new_tuple);
else
bdr_count_update_conflict();
}
ereport(ERROR,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: could not find existing tuple for pkey %s", s_key.data)));
- /* XXX dead code */
- resetStringInfo(&s_key);
- goto err;
- }
-err:
+ /*
+ * idxrel and rel will be closed by context cleanup, s_key will be
+ * cleaned up as well
+ * TODO but what about check_sequencer_wakeup()?
+ */
+ }
check_sequencer_wakeup(rel);
heap_close(rel, NoLock);
}
+static bool
+check_apply_update(RepNodeId local_node_id, TimestampTz ts, bool *log_update)
+{
+ uint64 local_sysid,
+ remote_sysid;
+ TimeLineID local_tli,
+ remote_tli;
+ int cmp;
+
+ if (local_node_id == bdr_apply_con->origin_id)
+ {
+ /*
+ * If the row got updated twice within a single node, just apply the
+ * update with no conflict. Don't warn/log either, regardless of the
+ * timing; that's just too common and valid since normal row level
+ * locking guarantees are met.
+ */
+ *log_update = false;
+ return true;
+ }
+ else
+ {
+ /*
+ * Decide what update wins based on transaction timestamp difference.
+ * The later transaction wins. If the timestamps compare equal, use
+ * sysid + TLI to discern.
+ */
+
+ cmp = timestamptz_cmp_internal(replication_origin_timestamp, ts);
+
+ if (cmp > 0)
+ {
+ *log_update = false;
+ return true;
+ }
+ else if (cmp == 0)
+ {
+ *log_update = true;
+
+ fetch_sysid_via_node_id(local_node_id,
+ &local_sysid, &local_tli);
+ fetch_sysid_via_node_id(bdr_apply_con->origin_id,
+ &remote_sysid, &remote_tli);
+
+ if (local_sysid < remote_sysid)
+ return true;
+ else if (local_sysid > remote_sysid)
+ return false;
+ else if (local_tli < remote_tli)
+ return true;
+ else if (local_tli > remote_tli)
+ return false;
+ else
+ /* shouldn't happen */
+ elog(ERROR, "unsuccessful node comparison");
+ }
+ else
+ {
+ *log_update = true;
+ return false;
+ }
+ }
+
+ /* XXX dead code */
+ return false;
+}
+
+static void
+do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
+ Relation idxrel, HeapTuple old_key)
+{
+ StringInfoData s_key;
+ char remote_ts[MAXDATELEN + 1];
+ char local_ts[MAXDATELEN + 1];
+
+ uint64 local_sysid,
+ remote_sysid;
+ TimeLineID local_tli,
+ remote_tli;
+
+
+ fetch_sysid_via_node_id(local_node_id,
+ &local_sysid, &local_tli);
+ fetch_sysid_via_node_id(bdr_apply_con->origin_id,
+ &remote_sysid, &remote_tli);
+
+ Assert(remote_sysid == bdr_apply_con->sysid);
+ Assert(remote_tli == bdr_apply_con->timeline);
+
+ memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
+ MAXDATELEN);
+ memcpy(local_ts, timestamptz_to_str(ts),
+ MAXDATELEN);
+
+ initStringInfo(&s_key);
+ tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
+
+ ereport(LOG,
+ (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+ errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
+ apply_update ? "applying" : "skipping",
+ remote_sysid, remote_tli, remote_ts,
+ local_node_id == InvalidRepNodeId ? "local" : "remote",
+ local_sysid, local_tli, local_ts, s_key.data)));
+ resetStringInfo(&s_key);
+}
+
+static void
+do_apply_update(Relation rel, ItemPointerData oldtid, HeapTuple old_tuple, BDRTupleData new_tuple)
+{
+ HeapTuple nt;
+
+ Assert(old_tuple != NULL);
+ nt = heap_modify_tuple(old_tuple, RelationGetDescr(rel),
+ new_tuple.values, new_tuple.isnull, new_tuple.changed);
+ simple_heap_update(rel, &oldtid, nt);
+ UserTableUpdateIndexes(rel, nt);
+ bdr_count_update();
+}
+
void
process_remote_delete(StringInfo s)
{