TimestampTz committime;
TimestampTz end_lsn;
int flags;
- RepNodeId remote_origin_id = InvalidRepNodeId;
+
+ /* for BDR_OUTPUT_COMMIT_HAS_ORIGIN */
+ uint64 remote_sysid = 0;
+ TimeLineID remote_timeline_id = 0;
+ Oid remote_dboid = InvalidOid;
XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr;
Assert(bdr_apply_worker != NULL);
if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
{
- remote_origin_id = pq_getmsgint(s, 2);
+ remote_sysid = pq_getmsgint64(s);
+ remote_timeline_id = pq_getmsgint(s, 4);
+ remote_dboid = pq_getmsgint(s, 4);
remote_origin_lsn = pq_getmsgint64(s);
}
*/
AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd);
- if (remote_origin_id != InvalidRepNodeId)
+ /*
+ * If we're in catchup mode, see if the commit is relayed
+ * from elsewhere and advance the appropriate slot.
+ */
+ if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
{
+ char remote_ident[256];
+ NameData replication_name;
+ RepNodeId remote_origin_id = InvalidRepNodeId;
+
+ if (remote_sysid == GetSystemIdentifier()
+ && remote_timeline_id == ThisTimeLineID
+ && remote_dboid == MyDatabaseId)
+ {
+ /*
+ * This might not have to be an error condition, but we don't cope
+ * with it for now and it shouldn't arise for use of catchup mode
+ * for init_replica.
+ */
+ ereport(ERROR,
+ (errmsg("Replication loop in catchup mode"),
+ errdetail("Received a transaction from the remote node that originated on this node")));
+ }
+
+ /* replication_name is currently unused in bdr */
+ NameStr(replication_name)[0] = '\0';
+
/*
- * We're replaying a record that's been forwarded from another node, so
- * we need to advance the replication identifier for that node so that
- * replay directly from that node will start from the correct LSN when
- * we replicate directly.
- *
- * If it was from the immediate origin node, remote_origin_id would be
- * set to InvalidRepNodeId by the remote end's output plugin.
+ * To determine whether the commit was forwarded by the upstream from
+ * another node, we need to get the local RepNodeId for that node based
+ * on the (sysid, timelineid, dboid) supplied in catchup mode.
*/
- AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
- XactLastCommitEnd);
+ snprintf(remote_ident, sizeof(remote_ident),
+ BDR_NODE_ID_FORMAT,
+ remote_sysid, remote_timeline_id, remote_dboid, MyDatabaseId,
+ NameStr(replication_name));
+
+ StartTransactionCommand();
+ remote_origin_id = GetReplicationIdentifier(remote_ident, false);
+ CommitTransactionCommand();
+
+ if (remote_origin_id != replication_origin_id)
+ {
+ /*
+ * The row isn't from the immediate upstream; advance the slot
+ * of the node it originally came from so we start replay of that
+ * node's change data at the right place.
+ */
+ AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
+ XactLastCommitEnd);
+ }
}
CurrentResourceOwner = bdr_saved_resowner;
if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
{
/*
- * Note that origin_id is InvalidRepNodeIdentifier for locally
- * originated commits.
+ * The RepNodeId in txn->origin_id is our local identifier for the
+ * origin node, but it's not valid outside our node. It must be
+ * converted into the (sysid, tlid, dboid) that uniquely identifies the
+ * node globally so that can be sent.
*/
- pq_sendint(ctx->out, txn->origin_id, 2);
+ uint64 origin_sysid;
+ TimeLineID origin_tlid;
+ Oid origin_dboid = InvalidOid;
+
+ bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid,
+ &origin_tlid, &origin_dboid);
+
+ pq_sendint64(ctx->out, origin_sysid);
+ pq_sendint(ctx->out, origin_tlid, 4);
+ pq_sendint(ctx->out, origin_dboid, 4);
pq_sendint64(ctx->out, txn->origin_lsn);
}