bdr: Send remote transaction origin (sysid,tlid,dboid) at BEGIN
authorCraig Ringer <craig@2ndquadrant.com>
Wed, 4 Jun 2014 11:25:39 +0000 (19:25 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:39 +0000 (17:55 +0200)
Previously we sent this information at COMMIT time, which meant code
running before COMMIT was replayed couldn't tell if it was working on
a forwarded transaction from a third party node (in catchup mode).

Now the remote identifier is decoded into a local RepNodeId at BEGIN
and stored for access throughout replay.

contrib/bdr/bdr.c
contrib/bdr/bdr.h
contrib/bdr/bdr_apply.c
contrib/bdr/bdr_output.c

index 4e4c7c3087fdf1de3865378814825e83e271296c..dd989a2f03faf3d94c1b476d13d85367e5ee11bb 100644 (file)
@@ -246,7 +246,8 @@ bdr_connect(char *conninfo_repl,
        ereport(FATAL,
                (errcode(ERRCODE_CONNECTION_FAILURE),
                 errmsg("could not connect to the primary server: %s",
-                       PQerrorMessage(streamConn))));
+                       PQerrorMessage(streamConn)),
+                errdetail("Connection string is '%s'", conninfo_repl)));
    }
 
    elog(DEBUG3, "Sending replication command: IDENTIFY_SYSTEM");
index 94ad4aa560a97d7656ef6e49a3f48e1ea6f4f47e..eb3fa0ea5feeab957934dee9966532d249461ae0 100644 (file)
@@ -51,13 +51,13 @@ struct ScanKeyData; /* from access/skey.h for ScanKey */
 enum LockTupleMode; /* from access/heapam.h */
 
 /*
- * Flags to indicate which fields are present in a commit record sent by the
+ * Flags to indicate which fields are present in a begin record sent by the
  * output plugin.
  */
-typedef enum BdrOutputCommitFlags
+typedef enum BdrOutputBeginFlags
 {
-   BDR_OUTPUT_COMMIT_HAS_ORIGIN = 1
-} BdrOutputCommitFlags;
+   BDR_OUTPUT_TRANSACTION_HAS_ORIGIN = 1
+} BdrOutputBeginFlags;
 
 /*
  * BDR conflict detection: type of conflict that was identified.
index ba20a57e0dc16d50dd880361b037375849c38602..5fdc8e5d1788766339ece97851b6d887826a90e8 100644 (file)
@@ -78,6 +78,17 @@ bool     started_transaction = false;
 /* During apply, holds xid of remote transaction */
 TransactionId replication_origin_xid = InvalidTransactionId;
 
+/*
+ * For tracking of the remote origin's information when in catchup mode
+ * (BDR_OUTPUT_TRANSACTION_HAS_ORIGIN).
+ */
+static uint64          remote_origin_sysid = 0;
+static TimeLineID      remote_origin_timeline_id = 0;
+static Oid             remote_origin_dboid = InvalidOid;
+static XLogRecPtr      remote_origin_lsn = InvalidXLogRecPtr;
+/* The local identifier for the remote's origin, if any. */
+static RepNodeId       remote_origin_id = InvalidRepNodeId;
+
 /*
  * this should really be a static in bdr_apply.c, but bdr.c needs it for
  * bdr_apply_main currently.
@@ -136,15 +147,37 @@ process_remote_begin(StringInfo s)
    TransactionId   remote_xid;
    char            statbuf[100];
    int             apply_delay = bdr_apply_config->apply_delay;
+   int             flags = 0;
 
    Assert(bdr_apply_worker != NULL);
 
    started_transaction = false;
+   remote_origin_id = InvalidRepNodeId;
+
+   flags = pq_getmsgint(s, 4);
 
    origlsn = pq_getmsgint64(s);
+   Assert(origlsn != InvalidXLogRecPtr);
    committime = pq_getmsgint64(s);
    remote_xid = pq_getmsgint(s, 4);
 
+   if (flags & BDR_OUTPUT_TRANSACTION_HAS_ORIGIN)
+   {
+       remote_origin_sysid = pq_getmsgint64(s);
+       remote_origin_timeline_id = pq_getmsgint(s, 4);
+       remote_origin_dboid = pq_getmsgint(s, 4);
+       remote_origin_lsn = pq_getmsgint64(s);
+   }
+   else
+   {
+       /* Transaction originated directly from remote node */
+       remote_origin_sysid = 0;
+       remote_origin_timeline_id = 0;
+       remote_origin_dboid = InvalidOid;
+       remote_origin_lsn = InvalidXLogRecPtr;
+   }
+
+
    /* setup state for commit and conflict detection */
    replication_origin_lsn = origlsn;
    replication_origin_timestamp = committime;
@@ -165,6 +198,47 @@ process_remote_begin(StringInfo s)
    if (apply_delay == -1)
        apply_delay = bdr_default_apply_delay;
 
+   /*
+    * If we're in catchup mode, see if this transaction is relayed from
+    * elsewhere and advance the appropriate slot.
+    */
+   if (flags & BDR_OUTPUT_TRANSACTION_HAS_ORIGIN)
+   {
+       char remote_ident[256];
+       NameData replication_name;
+
+       if (remote_origin_sysid == GetSystemIdentifier()
+           && remote_origin_timeline_id == ThisTimeLineID
+           && remote_origin_dboid == MyDatabaseId)
+       {
+           /*
+            * This might not have to be an error condition, but we don't cope
+            * with it for now and it shouldn't arise for use of catchup mode
+            * for init_replica.
+            */
+           ereport(ERROR,
+                   (errmsg("Replication loop in catchup mode"),
+                    errdetail("Received a transaction from the remote node that originated on this node")));
+       }
+
+       /* replication_name is currently unused in bdr */
+       NameStr(replication_name)[0] = '\0';
+
+       /*
+        * To determine whether the commit was forwarded by the upstream from
+        * another node, we need to get the local RepNodeId for that node based
+        * on the (sysid, timelineid, dboid) supplied in catchup mode.
+        */
+       snprintf(remote_ident, sizeof(remote_ident),
+               BDR_NODE_ID_FORMAT,
+               remote_origin_sysid, remote_origin_timeline_id, remote_origin_dboid, MyDatabaseId,
+               NameStr(replication_name));
+
+       StartTransactionCommand();
+       remote_origin_id = GetReplicationIdentifier(remote_ident, false);
+       CommitTransactionCommand();
+   }
+
    /* don't want the overhead otherwise */
    if (apply_delay > 0)
    {
@@ -203,34 +277,26 @@ process_remote_commit(StringInfo s)
    TimestampTz     end_lsn;
    int             flags;
 
-   /* for BDR_OUTPUT_COMMIT_HAS_ORIGIN */
-   uint64          remote_sysid = 0;
-   TimeLineID      remote_timeline_id = 0;
-   Oid             remote_dboid = InvalidOid;
-   XLogRecPtr      remote_origin_lsn = InvalidXLogRecPtr;
-
    Assert(bdr_apply_worker != NULL);
 
    flags = pq_getmsgint(s, 4);
 
+   if (flags != 0)
+       elog(ERROR, "Commit flags are currently unused, but flags was set to %i", flags);
+
    /* order of access to fields after flags is important */
    commit_lsn = pq_getmsgint64(s);
    end_lsn = pq_getmsgint64(s);
    committime = pq_getmsgint64(s);
 
-   if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
-   {
-       remote_sysid = pq_getmsgint64(s);
-       remote_timeline_id = pq_getmsgint(s, 4);
-       remote_dboid = pq_getmsgint(s, 4);
-       remote_origin_lsn = pq_getmsgint64(s);
-   }
-
    elog(DEBUG1, "COMMIT origin(lsn, end, timestamp): %X/%X, %X/%X, %s",
         (uint32) (commit_lsn >> 32), (uint32) commit_lsn,
         (uint32) (end_lsn >> 32), (uint32) end_lsn,
         timestamptz_to_str(committime));
 
+   elog(LOG, "commit_lsn: "UINT64_FORMAT"; replication_origin_lsn: "UINT64_FORMAT,
+        commit_lsn, replication_origin_lsn);
+
    Assert(commit_lsn == replication_origin_lsn);
    Assert(committime == replication_origin_timestamp);
 
@@ -253,56 +319,18 @@ process_remote_commit(StringInfo s)
    AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd);
 
    /*
-    * If we're in catchup mode, see if the commit is relayed
-    * from elsewhere and advance the appropriate slot.
+    * If we're in catchup mode, see if the commit is relayed from elsewhere
+    * and advance the appropriate slot.
     */
-   if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
+   if (remote_origin_id != replication_origin_id)
    {
-       char remote_ident[256];
-       NameData replication_name;
-       RepNodeId remote_origin_id = InvalidRepNodeId;
-
-       if (remote_sysid == GetSystemIdentifier()
-           && remote_timeline_id == ThisTimeLineID
-           && remote_dboid == MyDatabaseId)
-       {
-           /*
-            * This might not have to be an error condition, but we don't cope
-            * with it for now and it shouldn't arise for use of catchup mode
-            * for init_replica.
-            */
-           ereport(ERROR,
-                   (errmsg("Replication loop in catchup mode"),
-                    errdetail("Received a transaction from the remote node that originated on this node")));
-       }
-
-       /* replication_name is currently unused in bdr */
-       NameStr(replication_name)[0] = '\0';
-
        /*
-        * To determine whether the commit was forwarded by the upstream from
-        * another node, we need to get the local RepNodeId for that node based
-        * on the (sysid, timelineid, dboid) supplied in catchup mode.
+        * The row isn't from the immediate upstream; advance the slot of the
+        * node it originally came from so we start replay of that node's
+        * change data at the right place.
         */
-       snprintf(remote_ident, sizeof(remote_ident),
-               BDR_NODE_ID_FORMAT,
-               remote_sysid, remote_timeline_id, remote_dboid, MyDatabaseId,
-               NameStr(replication_name));
-
-       StartTransactionCommand();
-       remote_origin_id = GetReplicationIdentifier(remote_ident, false);
-       CommitTransactionCommand();
-
-       if (remote_origin_id != replication_origin_id)
-       {
-           /*
-            * The row isn't from the immediate upstream; advance the slot
-            * of the node it originally came from so we start replay of that
-            * node's change data at the right place.
-            */
-           AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
-                                        XactLastCommitEnd);
-       }
+       AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
+                                    XactLastCommitEnd);
    }
 
    CurrentResourceOwner = bdr_saved_resowner;
@@ -972,11 +1000,11 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
        else if (cmp == 0)
        {
            uint64      local_sysid,
-                       remote_sysid;
+                       remote_origin_sysid;
            TimeLineID  local_tli,
                        remote_tli;
            Oid         local_dboid,
-                       remote_dboid;
+                       remote_origin_dboid;
            /*
             * Timestamps are equal. Use sysid + timeline id to decide which
             * tuple to retain.
@@ -985,8 +1013,8 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
                                        &local_sysid, &local_tli,
                                        &local_dboid);
            bdr_fetch_sysid_via_node_id(replication_origin_id,
-                                       &remote_sysid, &remote_tli,
-                                       &remote_dboid);
+                                       &remote_origin_sysid, &remote_tli,
+                                       &remote_origin_dboid);
 
            /*
             * As the timestamps were equal, we have to break the tie in a
@@ -995,17 +1023,17 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
             * Use the ordering of the node's unique identifier, the tuple of
             * (sysid, timelineid, dboid).
             */
-           if (local_sysid < remote_sysid)
+           if (local_sysid < remote_origin_sysid)
                *perform_update = true;
-           else if (local_sysid > remote_sysid)
+           else if (local_sysid > remote_origin_sysid)
                *perform_update = false;
            else if (local_tli < remote_tli)
                *perform_update = true;
            else if (local_tli > remote_tli)
                *perform_update = false;
-           else if (local_dboid < remote_dboid)
+           else if (local_dboid < remote_origin_dboid)
                *perform_update = true;
-           else if (local_dboid > remote_dboid)
+           else if (local_dboid > remote_origin_dboid)
                *perform_update = false;
            else
                /* shouldn't happen */
@@ -1040,23 +1068,23 @@ do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
    char        local_ts[MAXDATELEN + 1];
 
    uint64      local_sysid,
-               remote_sysid;
+               remote_origin_sysid;
    TimeLineID  local_tli,
                remote_tli;
    Oid         local_dboid,
-               remote_dboid;
+               remote_origin_dboid;
 
 
    bdr_fetch_sysid_via_node_id(local_node_id,
                                &local_sysid, &local_tli,
                                &local_dboid);
    bdr_fetch_sysid_via_node_id(replication_origin_id,
-                               &remote_sysid, &remote_tli,
-                               &remote_dboid);
+                               &remote_origin_sysid, &remote_tli,
+                               &remote_origin_dboid);
 
-   Assert(remote_sysid == origin_sysid);
+   Assert(remote_origin_sysid == origin_sysid);
    Assert(remote_tli == origin_timeline);
-   Assert(remote_dboid == origin_dboid);
+   Assert(remote_origin_dboid == origin_dboid);
 
    memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
           MAXDATELEN);
@@ -1076,7 +1104,7 @@ do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
                (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
                 errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u:%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s, resolved by user tuple:%s",
                        apply_update ? "applying" : "skipping",
-                       remote_sysid, remote_tli, remote_dboid, remote_ts,
+                       remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
                        local_node_id == InvalidRepNodeId ? "local" : "remote",
                        local_sysid, local_tli, local_ts, s_key.data,
                        s_user_tuple.data)));
@@ -1087,7 +1115,7 @@ do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
                (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
                 errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u:%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
                        apply_update ? "applying" : "skipping",
-                       remote_sysid, remote_tli, remote_dboid, remote_ts,
+                       remote_origin_sysid, remote_tli, remote_origin_dboid, remote_ts,
                        local_node_id == InvalidRepNodeId ? "local" : "remote",
                        local_sysid, local_tli, local_ts, s_key.data)));
    }
index d6722ef131c31801e422d09f3f00631c0a18afcd..72c90b338070d68b40da7d9a610e60aeb91abe7c 100644 (file)
@@ -465,13 +465,18 @@ should_forward_changeset(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
           || ((BdrOutputData*)ctx->output_plugin_private)->forward_changesets;
 }
 
-/* BEGIN callback */
+/*
+ * BEGIN callback
+ *
+ * If you change this you must also change the corresponding code in
+ * bdr_apply.c . Make sure that any flags are in sync.
+ */
 void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
-#ifdef NOT_YET
    BdrOutputData *data = ctx->output_plugin_private;
-#endif
+   int flags = 0;
+
    AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
 
    if (!should_forward_changeset(ctx, txn))
@@ -479,9 +484,44 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
    OutputPluginPrepareWrite(ctx, true);
    pq_sendbyte(ctx->out, 'B');     /* BEGIN */
+
+   /*
+    * Are we forwarding changesets from other nodes? If so, we must include
+    * the origin node ID and LSN in BEGIN records.
+    */
+   if (data->forward_changesets)
+       flags |= BDR_OUTPUT_TRANSACTION_HAS_ORIGIN;
+
+   /* send the flags field its self */
+   pq_sendint(ctx->out, flags, 4);
+
+   /* fixed fields */
    pq_sendint64(ctx->out, txn->final_lsn);
    pq_sendint64(ctx->out, txn->commit_time);
    pq_sendint(ctx->out, txn->xid, 4);
+
+   /* and optional data selected above */
+   if (flags & BDR_OUTPUT_TRANSACTION_HAS_ORIGIN)
+   {
+       /*
+        * The RepNodeId in txn->origin_id is our local identifier for the
+        * origin node, but it's not valid outside our node. It must be
+        * converted into the (sysid, tlid, dboid) that uniquely identifies the
+        * node globally so that can be sent.
+        */
+       uint64      origin_sysid;
+       TimeLineID  origin_tlid;
+       Oid         origin_dboid = InvalidOid;
+
+       bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid,
+                                   &origin_tlid, &origin_dboid);
+
+       pq_sendint64(ctx->out, origin_sysid);
+       pq_sendint(ctx->out, origin_tlid, 4);
+       pq_sendint(ctx->out, origin_dboid, 4);
+       pq_sendint64(ctx->out, txn->origin_lsn);
+   }
+
    OutputPluginWrite(ctx, true);
    return;
 }
@@ -497,13 +537,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
  * field.
  *
  * If you change this, you'll need to change process_remote_commit(...)
- * too.
+ * too. Make sure to keep any flags in sync.
  */
 void
 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                     XLogRecPtr commit_lsn)
 {
+#ifdef NOT_YET
    BdrOutputData *data = ctx->output_plugin_private;
+#endif
 
    int flags = 0;
 
@@ -513,13 +555,6 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    OutputPluginPrepareWrite(ctx, true);
    pq_sendbyte(ctx->out, 'C');     /* sending COMMIT */
 
-   /*
-    * Are we forwarding changesets from other nodes? If so, we must include
-    * the origin node ID and LSN in commit records.
-    */
-   if (data->forward_changesets)
-       flags |= BDR_OUTPUT_COMMIT_HAS_ORIGIN;
-
    /* send the flags field its self */
    pq_sendint(ctx->out, flags, 4);
 
@@ -528,28 +563,6 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    pq_sendint64(ctx->out, txn->end_lsn);
    pq_sendint64(ctx->out, txn->commit_time);
 
-   /* and optional data selected above */
-   if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
-   {
-       /*
-        * The RepNodeId in txn->origin_id is our local identifier for the
-        * origin node, but it's not valid outside our node. It must be
-        * converted into the (sysid, tlid, dboid) that uniquely identifies the
-        * node globally so that can be sent.
-        */
-       uint64      origin_sysid;
-       TimeLineID  origin_tlid;
-       Oid         origin_dboid = InvalidOid;
-
-       bdr_fetch_sysid_via_node_id(txn->origin_id, &origin_sysid,
-                                   &origin_tlid, &origin_dboid);
-
-       pq_sendint64(ctx->out, origin_sysid);
-       pq_sendint(ctx->out, origin_tlid, 4);
-       pq_sendint(ctx->out, origin_dboid, 4);
-       pq_sendint64(ctx->out, txn->origin_lsn);
-   }
-
    OutputPluginWrite(ctx, true);
 }