From: Craig Ringer Date: Fri, 16 May 2014 07:57:42 +0000 (+0800) Subject: bdr: Use (sysid,timelineid,dboid) on the wire, not local RepNodeId X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/?a=commitdiff_plain;h=a9c43a406265bea75e00bf7be913be8e5660fde6;p=users%2Fandresfreund%2Fpostgres.git bdr: Use (sysid,timelineid,dboid) on the wire, not local RepNodeId The wire protocol was using RepNodeId in catchup mode, but RepNodeId is a purely local identiifer that should never leave a node. It should 've been sending the (sysid, timeline id, dboid) tuple that globally identifies a node, then turning that back to a RepNodeId local to the apply side when it' s received. --- diff --git a/contrib/bdr/bdr_apply.c b/contrib/bdr/bdr_apply.c index 86a9b1b619..b3c65f0b75 100644 --- a/contrib/bdr/bdr_apply.c +++ b/contrib/bdr/bdr_apply.c @@ -202,7 +202,11 @@ process_remote_commit(StringInfo s) TimestampTz committime; TimestampTz end_lsn; int flags; - RepNodeId remote_origin_id = InvalidRepNodeId; + + /* for BDR_OUTPUT_COMMIT_HAS_ORIGIN */ + uint64 remote_sysid = 0; + TimeLineID remote_timeline_id = 0; + Oid remote_dboid = InvalidOid; XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr; Assert(bdr_apply_worker != NULL); @@ -216,7 +220,9 @@ process_remote_commit(StringInfo s) if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN) { - remote_origin_id = pq_getmsgint(s, 2); + remote_sysid = pq_getmsgint64(s); + remote_timeline_id = pq_getmsgint(s, 4); + remote_dboid = pq_getmsgint(s, 4); remote_origin_lsn = pq_getmsgint64(s); } @@ -246,19 +252,57 @@ process_remote_commit(StringInfo s) */ AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd); - if (remote_origin_id != InvalidRepNodeId) + /* + * If we're in catchup mode, see if the commit is relayed + * from elsewhere and advance the appropriate slot. + */ + if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN) { + char remote_ident[256]; + NameData replication_name; + RepNodeId remote_origin_id = InvalidRepNodeId; + + if (remote_sysid == GetSystemIdentifier() + && remote_timeline_id == ThisTimeLineID + && remote_dboid == MyDatabaseId) + { + /* + * This might not have to be an error condition, but we don't cope + * with it for now and it shouldn't arise for use of catchup mode + * for init_replica. + */ + ereport(ERROR, + (errmsg("Replication loop in catchup mode"), + errdetail("Received a transaction from the remote node that originated on this node"))); + } + + /* replication_name is currently unused in bdr */ + NameStr(replication_name)[0] = '\0'; + /* - * We're replaying a record that's been forwarded from another node, so - * we need to advance the replication identifier for that node so that - * replay directly from that node will start from the correct LSN when - * we replicate directly. - * - * If it was from the immediate origin node, remote_origin_id would be - * set to InvalidRepNodeId by the remote end's output plugin. + * To determine whether the commit was forwarded by the upstream from + * another node, we need to get the local RepNodeId for that node based + * on the (sysid, timelineid, dboid) supplied in catchup mode. */ - AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn, - XactLastCommitEnd); + snprintf(remote_ident, sizeof(remote_ident), + BDR_NODE_ID_FORMAT, + remote_sysid, remote_timeline_id, remote_dboid, MyDatabaseId, + NameStr(replication_name)); + + StartTransactionCommand(); + remote_origin_id = GetReplicationIdentifier(remote_ident, false); + CommitTransactionCommand(); + + if (remote_origin_id != replication_origin_id) + { + /* + * The row isn't from the immediate upstream; advance the slot + * of the node it originally came from so we start replay of that + * node's change data at the right place. + */ + AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn, + XactLastCommitEnd); + } } CurrentResourceOwner = bdr_saved_resowner; diff --git a/contrib/bdr/bdr_output.c b/contrib/bdr/bdr_output.c index 81a6dae79b..d6722ef131 100644 --- a/contrib/bdr/bdr_output.c +++ b/contrib/bdr/bdr_output.c @@ -532,10 +532,21 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN) { /* - * Note that origin_id is InvalidRepNodeIdentifier for locally - * originated commits. + * The RepNodeId in txn->origin_id is our local identifier for the + * origin node, but it's not valid outside our node. It must be + * converted into the (sysid, tlid, dboid) that uniquely identifies the + * node globally so that can be sent. */ - pq_sendint(ctx->out, txn->origin_id, 2); + uint64 origin_sysid; + TimeLineID origin_tlid; + Oid origin_dboid = InvalidOid; + + bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid, + &origin_tlid, &origin_dboid); + + pq_sendint64(ctx->out, origin_sysid); + pq_sendint(ctx->out, origin_tlid, 4); + pq_sendint(ctx->out, origin_dboid, 4); pq_sendint64(ctx->out, txn->origin_lsn); }