/* During apply, holds xid of remote transaction */
TransactionId replication_origin_xid = InvalidTransactionId;
+/*
+ * For tracking of the remote origin's information when in catchup mode
+ * (BDR_OUTPUT_TRANSACTION_HAS_ORIGIN).
+ */
+static uint64 remote_origin_sysid = 0;
+static TimeLineID remote_origin_timeline_id = 0;
+static Oid remote_origin_dboid = InvalidOid;
+static XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr;
+/* The local identifier for the remote's origin, if any. */
+static RepNodeId remote_origin_id = InvalidRepNodeId;
+
/*
* this should really be a static in bdr_apply.c, but bdr.c needs it for
* bdr_apply_main currently.
TransactionId remote_xid;
char statbuf[100];
int apply_delay = bdr_apply_config->apply_delay;
+ int flags = 0;
Assert(bdr_apply_worker != NULL);
started_transaction = false;
+ remote_origin_id = InvalidRepNodeId;
+
+ flags = pq_getmsgint(s, 4);
origlsn = pq_getmsgint64(s);
+ Assert(origlsn != InvalidXLogRecPtr);
committime = pq_getmsgint64(s);
remote_xid = pq_getmsgint(s, 4);
+ if (flags & BDR_OUTPUT_TRANSACTION_HAS_ORIGIN)
+ {
+ remote_origin_sysid = pq_getmsgint64(s);
+ remote_origin_timeline_id = pq_getmsgint(s, 4);
+ remote_origin_dboid = pq_getmsgint(s, 4);
+ remote_origin_lsn = pq_getmsgint64(s);
+ }
+ else
+ {
+ /* Transaction originated directly from remote node */
+ remote_origin_sysid = 0;
+ remote_origin_timeline_id = 0;
+ remote_origin_dboid = InvalidOid;
+ remote_origin_lsn = InvalidXLogRecPtr;
+ }
+
+
/* setup state for commit and conflict detection */
replication_origin_lsn = origlsn;
replication_origin_timestamp = committime;
if (apply_delay == -1)
apply_delay = bdr_default_apply_delay;
+ /*
+ * If we're in catchup mode, see if this transaction is relayed from
+ * elsewhere and advance the appropriate slot.
+ */
+ if (flags & BDR_OUTPUT_TRANSACTION_HAS_ORIGIN)
+ {
+ char remote_ident[256];
+ NameData replication_name;
+
+ if (remote_origin_sysid == GetSystemIdentifier()
+ && remote_origin_timeline_id == ThisTimeLineID
+ && remote_origin_dboid == MyDatabaseId)
+ {
+ /*
+ * This might not have to be an error condition, but we don't cope
+ * with it for now and it shouldn't arise for use of catchup mode
+ * for init_replica.
+ */
+ ereport(ERROR,
+ (errmsg("Replication loop in catchup mode"),
+ errdetail("Received a transaction from the remote node that originated on this node")));
+ }
+
+ /* replication_name is currently unused in bdr */
+ NameStr(replication_name)[0] = '\0';
+
+ /*
+ * To determine whether the commit was forwarded by the upstream from
+ * another node, we need to get the local RepNodeId for that node based
+ * on the (sysid, timelineid, dboid) supplied in catchup mode.
+ */
+ snprintf(remote_ident, sizeof(remote_ident),
+ BDR_NODE_ID_FORMAT,
+ remote_origin_sysid, remote_origin_timeline_id, remote_origin_dboid, MyDatabaseId,
+ NameStr(replication_name));
+
+ StartTransactionCommand();
+ remote_origin_id = GetReplicationIdentifier(remote_ident, false);
+ CommitTransactionCommand();
+ }
+
/* don't want the overhead otherwise */
if (apply_delay > 0)
{
TimestampTz end_lsn;
int flags;
- /* for BDR_OUTPUT_COMMIT_HAS_ORIGIN */
- uint64 remote_sysid = 0;
- TimeLineID remote_timeline_id = 0;
- Oid remote_dboid = InvalidOid;
- XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr;
-
Assert(bdr_apply_worker != NULL);
flags = pq_getmsgint(s, 4);
+ if (flags != 0)
+ elog(ERROR, "Commit flags are currently unused, but flags was set to %i", flags);
+
/* order of access to fields after flags is important */
commit_lsn = pq_getmsgint64(s);
end_lsn = pq_getmsgint64(s);
committime = pq_getmsgint64(s);
- if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
- {
- remote_sysid = pq_getmsgint64(s);
- remote_timeline_id = pq_getmsgint(s, 4);
- remote_dboid = pq_getmsgint(s, 4);
- remote_origin_lsn = pq_getmsgint64(s);
- }
-
elog(DEBUG1, "COMMIT origin(lsn, end, timestamp): %X/%X, %X/%X, %s",
(uint32) (commit_lsn >> 32), (uint32) commit_lsn,
(uint32) (end_lsn >> 32), (uint32) end_lsn,
timestamptz_to_str(committime));
+ elog(LOG, "commit_lsn: "UINT64_FORMAT"; replication_origin_lsn: "UINT64_FORMAT,
+ commit_lsn, replication_origin_lsn);
+
Assert(commit_lsn == replication_origin_lsn);
Assert(committime == replication_origin_timestamp);
AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd);
/*
- * If we're in catchup mode, see if the commit is relayed
- * from elsewhere and advance the appropriate slot.
+ * If we're in catchup mode, see if the commit is relayed from elsewhere
+ * and advance the appropriate slot.
*/
- if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
+ if (remote_origin_id != replication_origin_id)
{
- char remote_ident[256];
- NameData replication_name;
- RepNodeId remote_origin_id = InvalidRepNodeId;
-
- if (remote_sysid == GetSystemIdentifier()
- && remote_timeline_id == ThisTimeLineID
- && remote_dboid == MyDatabaseId)
- {
- /*
- * This might not have to be an error condition, but we don't cope
- * with it for now and it shouldn't arise for use of catchup mode
- * for init_replica.
- */
- ereport(ERROR,
- (errmsg("Replication loop in catchup mode"),
- errdetail("Received a transaction from the remote node that originated on this node")));
- }
-
- /* replication_name is currently unused in bdr */
- NameStr(replication_name)[0] = '\0';
-
/*
- * To determine whether the commit was forwarded by the upstream from
- * another node, we need to get the local RepNodeId for that node based
- * on the (sysid, timelineid, dboid) supplied in catchup mode.
+ * The row isn't from the immediate upstream; advance the slot of the
+ * node it originally came from so we start replay of that node's
+ * change data at the right place.
*/
- snprintf(remote_ident, sizeof(remote_ident),
- BDR_NODE_ID_FORMAT,
- remote_sysid, remote_timeline_id, remote_dboid, MyDatabaseId,
- NameStr(replication_name));
-
- StartTransactionCommand();
- remote_origin_id = GetReplicationIdentifier(remote_ident, false);
- CommitTransactionCommand();
-
- if (remote_origin_id != replication_origin_id)
- {
- /*
- * The row isn't from the immediate upstream; advance the slot
- * of the node it originally came from so we start replay of that
- * node's change data at the right place.
- */
- AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
- XactLastCommitEnd);
- }
+ AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
+ XactLastCommitEnd);
}
CurrentResourceOwner = bdr_saved_resowner;
else if (cmp == 0)
{
uint64 local_sysid,
- remote_sysid;
+ remote_origin_sysid;
TimeLineID local_tli,
remote_tli;
Oid local_dboid,
- remote_dboid;
+ remote_origin_dboid;
/*
* Timestamps are equal. Use sysid + timeline id to decide which
* tuple to retain.
&local_sysid, &local_tli,
&local_dboid);
bdr_fetch_sysid_via_node_id(replication_origin_id,
- &remote_sysid, &remote_tli,
- &remote_dboid);
+ &remote_origin_sysid, &remote_tli,
+ &remote_origin_dboid);
/*
* As the timestamps were equal, we have to break the tie in a
* Use the ordering of the node's unique identifier, the tuple of
* (sysid, timelineid, dboid).
*/
- if (local_sysid < remote_sysid)
+ if (local_sysid < remote_origin_sysid)
*perform_update = true;
- else if (local_sysid > remote_sysid)
+ else if (local_sysid > remote_origin_sysid)
*perform_update = false;
else if (local_tli < remote_tli)
*perform_update = true;
else if (local_tli > remote_tli)
*perform_update = false;
- else if (local_dboid < remote_dboid)
+ else if (local_dboid < remote_origin_dboid)
*perform_update = true;
- else if (local_dboid > remote_dboid)
+ else if (local_dboid > remote_origin_dboid)
*perform_update = false;
else
/* shouldn't happen */
char local_ts[MAXDATELEN + 1];
uint64 local_sysid,
- remote_sysid;
+ remote_origin_sysid;
TimeLineID local_tli,
remote_tli;
Oid local_dboid,
- remote_dboid;
+ remote_origin_dboid;
bdr_fetch_sysid_via_node_id(local_node_id,
&local_sysid, &local_tli,
&local_dboid);
bdr_fetch_sysid_via_node_id(replication_origin_id,
- &remote_sysid, &remote_tli,
- &remote_dboid);
+ &remote_origin_sysid, &remote_tli,
+ &remote_origin_dboid);
- Assert(remote_sysid == origin_sysid);
+ Assert(remote_origin_sysid == origin_sysid);
Assert(remote_tli == origin_timeline);
- Assert(remote_dboid == origin_dboid);
+ Assert(remote_origin_dboid == origin_dboid);
memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
MAXDATELEN);
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u:%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s, resolved by user tuple:%s",
apply_update ? "applying" : "skipping",
- remote_sysid, remote_tli, remote_dboid, remote_ts,
+ remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
local_node_id == InvalidRepNodeId ? "local" : "remote",
local_sysid, local_tli, local_ts, s_key.data,
s_user_tuple.data)));
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u:%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
apply_update ? "applying" : "skipping",
- remote_sysid, remote_tli, remote_dboid, remote_ts,
+ remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
local_node_id == InvalidRepNodeId ? "local" : "remote",
local_sysid, local_tli, local_ts, s_key.data)));
}
|| ((BdrOutputData*)ctx->output_plugin_private)->forward_changesets;
}
-/* BEGIN callback */
+/*
+ * BEGIN callback
+ *
+ * If you change this you must also change the corresponding code in
+ * bdr_apply.c . Make sure that any flags are in sync.
+ */
void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
-#ifdef NOT_YET
BdrOutputData *data = ctx->output_plugin_private;
-#endif
+ int flags = 0;
+
AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
if (!should_forward_changeset(ctx, txn))
OutputPluginPrepareWrite(ctx, true);
pq_sendbyte(ctx->out, 'B'); /* BEGIN */
+
+ /*
+ * Are we forwarding changesets from other nodes? If so, we must include
+ * the origin node ID and LSN in BEGIN records.
+ */
+ if (data->forward_changesets)
+ flags |= BDR_OUTPUT_TRANSACTION_HAS_ORIGIN;
+
+ /* send the flags field its self */
+ pq_sendint(ctx->out, flags, 4);
+
+ /* fixed fields */
pq_sendint64(ctx->out, txn->final_lsn);
pq_sendint64(ctx->out, txn->commit_time);
pq_sendint(ctx->out, txn->xid, 4);
+
+ /* and optional data selected above */
+ if (flags & BDR_OUTPUT_TRANSACTION_HAS_ORIGIN)
+ {
+ /*
+ * The RepNodeId in txn->origin_id is our local identifier for the
+ * origin node, but it's not valid outside our node. It must be
+ * converted into the (sysid, tlid, dboid) that uniquely identifies the
+ * node globally so that can be sent.
+ */
+ uint64 origin_sysid;
+ TimeLineID origin_tlid;
+ Oid origin_dboid = InvalidOid;
+
+ bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid,
+ &origin_tlid, &origin_dboid);
+
+ pq_sendint64(ctx->out, origin_sysid);
+ pq_sendint(ctx->out, origin_tlid, 4);
+ pq_sendint(ctx->out, origin_dboid, 4);
+ pq_sendint64(ctx->out, txn->origin_lsn);
+ }
+
OutputPluginWrite(ctx, true);
return;
}
* field.
*
* If you change this, you'll need to change process_remote_commit(...)
- * too.
+ * too. Make sure to keep any flags in sync.
*/
void
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+#ifdef NOT_YET
BdrOutputData *data = ctx->output_plugin_private;
+#endif
int flags = 0;
OutputPluginPrepareWrite(ctx, true);
pq_sendbyte(ctx->out, 'C'); /* sending COMMIT */
- /*
- * Are we forwarding changesets from other nodes? If so, we must include
- * the origin node ID and LSN in commit records.
- */
- if (data->forward_changesets)
- flags |= BDR_OUTPUT_COMMIT_HAS_ORIGIN;
-
/* send the flags field its self */
pq_sendint(ctx->out, flags, 4);
pq_sendint64(ctx->out, txn->end_lsn);
pq_sendint64(ctx->out, txn->commit_time);
- /* and optional data selected above */
- if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
- {
- /*
- * The RepNodeId in txn->origin_id is our local identifier for the
- * origin node, but it's not valid outside our node. It must be
- * converted into the (sysid, tlid, dboid) that uniquely identifies the
- * node globally so that can be sent.
- */
- uint64 origin_sysid;
- TimeLineID origin_tlid;
- Oid origin_dboid = InvalidOid;
-
- bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid,
- &origin_tlid, &origin_dboid);
-
- pq_sendint64(ctx->out, origin_sysid);
- pq_sendint(ctx->out, origin_tlid, 4);
- pq_sendint(ctx->out, origin_dboid, 4);
- pq_sendint64(ctx->out, txn->origin_lsn);
- }
-
OutputPluginWrite(ctx, true);
}