bdr: Use (sysid,timelineid,dboid) on the wire, not local RepNodeId
authorCraig Ringer <craig@2ndquadrant.com>
Fri, 16 May 2014 07:57:42 +0000 (15:57 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:38 +0000 (17:55 +0200)
The wire protocol was using RepNodeId in catchup mode, but RepNodeId is a
purely local identiifer that should never leave a node.

It should 've been sending the (sysid, timeline id, dboid) tuple that globally
identifies a node, then turning that back to a RepNodeId local to the apply
side when it' s received.

contrib/bdr/bdr_apply.c
contrib/bdr/bdr_output.c

index 86a9b1b6191d9bf0dc88bca6b1a9096c8991682e..b3c65f0b754c1439b47f586e9713a42cc6c2861a 100644 (file)
@@ -202,7 +202,11 @@ process_remote_commit(StringInfo s)
    TimestampTz     committime;
    TimestampTz     end_lsn;
    int             flags;
-   RepNodeId       remote_origin_id = InvalidRepNodeId;
+
+   /* for BDR_OUTPUT_COMMIT_HAS_ORIGIN */
+   uint64          remote_sysid = 0;
+   TimeLineID      remote_timeline_id = 0;
+   Oid             remote_dboid = InvalidOid;
    XLogRecPtr      remote_origin_lsn = InvalidXLogRecPtr;
 
    Assert(bdr_apply_worker != NULL);
@@ -216,7 +220,9 @@ process_remote_commit(StringInfo s)
 
    if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
    {
-       remote_origin_id = pq_getmsgint(s, 2);
+       remote_sysid = pq_getmsgint64(s);
+       remote_timeline_id = pq_getmsgint(s, 4);
+       remote_dboid = pq_getmsgint(s, 4);
        remote_origin_lsn = pq_getmsgint64(s);
    }
 
@@ -246,19 +252,57 @@ process_remote_commit(StringInfo s)
     */
    AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd);
 
-   if (remote_origin_id != InvalidRepNodeId)
+   /*
+    * If we're in catchup mode, see if the commit is relayed
+    * from elsewhere and advance the appropriate slot.
+    */
+   if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
    {
+       char remote_ident[256];
+       NameData replication_name;
+       RepNodeId remote_origin_id = InvalidRepNodeId;
+
+       if (remote_sysid == GetSystemIdentifier()
+           && remote_timeline_id == ThisTimeLineID
+           && remote_dboid == MyDatabaseId)
+       {
+           /*
+            * This might not have to be an error condition, but we don't cope
+            * with it for now and it shouldn't arise for use of catchup mode
+            * for init_replica.
+            */
+           ereport(ERROR,
+                   (errmsg("Replication loop in catchup mode"),
+                    errdetail("Received a transaction from the remote node that originated on this node")));
+       }
+
+       /* replication_name is currently unused in bdr */
+       NameStr(replication_name)[0] = '\0';
+
        /*
-        * We're replaying a record that's been forwarded from another node, so
-        * we need to advance the replication identifier for that node so that
-        * replay directly from that node will start from the correct LSN when
-        * we replicate directly.
-        *
-        * If it was from the immediate origin node, remote_origin_id would be
-        * set to InvalidRepNodeId by the remote end's output plugin.
+        * To determine whether the commit was forwarded by the upstream from
+        * another node, we need to get the local RepNodeId for that node based
+        * on the (sysid, timelineid, dboid) supplied in catchup mode.
         */
-       AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
-                                    XactLastCommitEnd);
+       snprintf(remote_ident, sizeof(remote_ident),
+               BDR_NODE_ID_FORMAT,
+               remote_sysid, remote_timeline_id, remote_dboid, MyDatabaseId,
+               NameStr(replication_name));
+
+       StartTransactionCommand();
+       remote_origin_id = GetReplicationIdentifier(remote_ident, false);
+       CommitTransactionCommand();
+
+       if (remote_origin_id != replication_origin_id)
+       {
+           /*
+            * The row isn't from the immediate upstream; advance the slot
+            * of the node it originally came from so we start replay of that
+            * node's change data at the right place.
+            */
+           AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
+                                        XactLastCommitEnd);
+       }
    }
 
    CurrentResourceOwner = bdr_saved_resowner;
index 81a6dae79b6d1072108efc7ea6631a46f2b1c4d0..d6722ef131c31801e422d09f3f00631c0a18afcd 100644 (file)
@@ -532,10 +532,21 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
    {
        /*
-        * Note that origin_id is InvalidRepNodeIdentifier for locally
-        * originated commits.
+        * The RepNodeId in txn->origin_id is our local identifier for the
+        * origin node, but it's not valid outside our node. It must be
+        * converted into the (sysid, tlid, dboid) that uniquely identifies the
+        * node globally so that can be sent.
         */
-       pq_sendint(ctx->out, txn->origin_id, 2);
+       uint64      origin_sysid;
+       TimeLineID  origin_tlid;
+       Oid         origin_dboid = InvalidOid;
+
+       bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid,
+                                   &origin_tlid, &origin_dboid);
+
+       pq_sendint64(ctx->out, origin_sysid);
+       pq_sendint(ctx->out, origin_tlid, 4);
+       pq_sendint(ctx->out, origin_dboid, 4);
        pq_sendint64(ctx->out, txn->origin_lsn);
    }