bdr: Move apply worker global state out of shmem
authorCraig Ringer <craig@2ndquadrant.com>
Tue, 13 May 2014 01:46:26 +0000 (09:46 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:34 +0000 (17:55 +0200)
contrib/bdr/bdr.c
contrib/bdr/bdr.h
contrib/bdr/bdr_apply.c
contrib/bdr/bdr_conflict_logging.c
contrib/bdr/bdr_init_replica.c

index 82a20f4c2cad84bca6ca8ffa43ab5e150891a327..396062d654ba615764a9dde7560258f5439fe497 100644 (file)
 
 #define MAXCONNINFO        1024
 
-/* Really should be private to bdr_apply.c */
-extern bool exit_worker;
+/* TODO: move to bdr_apply.c when bdr_apply_main is moved */
+extern bool            exit_worker;
+extern uint64      origin_sysid;
+extern TimeLineID  origin_timeline;
+/* end externs for bdr apply state */
 
 static int   n_configured_bdr_nodes = 0;
 ResourceOwner bdr_saved_resowner;
@@ -524,10 +527,8 @@ bdr_apply_main(Datum main_arg)
         MyBgworkerEntry->bgw_name, NameStr(bdr_apply_config->dbname));
 
    streamConn = bdr_establish_connection_and_slot(
-       bdr_apply_config, &slot_name, &bdr_apply_worker->sysid,
-       &bdr_apply_worker->timeline, &replication_identifier, NULL);
-
-   bdr_apply_worker->origin_id = replication_identifier;
+       bdr_apply_config, &slot_name, &origin_sysid,
+       &origin_timeline, &replication_identifier, NULL);
 
    /* initialize stat subsystem, our id won't change further */
    bdr_count_set_current_node(replication_identifier);
index e4987cfa3b034a642555df79c614717ea90c3b2f..73da0aa2ba5a93210345e4e62c737c10676b32c0 100644 (file)
@@ -116,11 +116,6 @@ typedef struct BdrApplyWorker
     */
    int connection_config_idx;
 
-   /* TODO: Remove these from shm, into bdr worker global state */
-   RepNodeId origin_id;
-   uint64 sysid;
-   TimeLineID timeline;
-
    /*
     * If not InvalidXLogRecPtr, stop replay at this point and exit.
     *
index fa0a7e38782385b4300d0269930bdb0b0f058233..2b3392d296ebeaea8a15154d3164cd18b23ed425 100644 (file)
@@ -66,19 +66,23 @@ typedef struct BDRTupleData
    bool        changed[MaxTupleAttributeNumber];
 } BDRTupleData;
 
-bool       started_transaction = false;
+/* Relation oid cache; initialized then left unchanged */
 Oid            QueuedDDLCommandsRelid = InvalidOid;
 Oid            QueuedDropsRelid = InvalidOid;
 
+/* Global apply worker state */
+uint64     origin_sysid;
+TimeLineID origin_timeline;
+bool       started_transaction = false;
+/* During apply, holds xid of remote transaction */
+TransactionId replication_origin_xid = InvalidTransactionId;
+
 /*
  * this should really be a static in bdr_apply.c, but bdr.c needs it for
  * bdr_apply_main currently.
  */
 bool       exit_worker = false;
 
-/* During apply, holds xid of remote transaction */
-TransactionId replication_origin_xid = InvalidTransactionId;
-
 /*
  * This code only runs within an apply bgworker, so we can stash a pointer to our
  * state in shm in a global for convenient access.
@@ -197,8 +201,8 @@ process_remote_commit(StringInfo s)
    TimestampTz     committime;
    TimestampTz     end_lsn;
    int             flags;
-   RepNodeId       origin_id = InvalidRepNodeId;
-   XLogRecPtr      origin_lsn = InvalidXLogRecPtr;
+   RepNodeId       remote_origin_id = InvalidRepNodeId;
+   XLogRecPtr      remote_origin_lsn = InvalidXLogRecPtr;
 
    Assert(bdr_apply_worker != NULL);
 
@@ -211,8 +215,8 @@ process_remote_commit(StringInfo s)
 
    if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
    {
-       origin_id = pq_getmsgint(s, 2);
-       origin_lsn = pq_getmsgint64(s);
+       remote_origin_id = pq_getmsgint(s, 2);
+       remote_origin_lsn = pq_getmsgint64(s);
    }
 
    elog(DEBUG1, "COMMIT origin(lsn, end, timestamp): %X/%X, %X/%X, %s",
@@ -236,12 +240,12 @@ process_remote_commit(StringInfo s)
     *
     * We always advance the local replication identifier for the origin node,
     * even if we're really replaying a commit that's been forwarded from
-    * another node (per origin_id below). This is necessary to make sure we
-    * don't replay the same forwarded commit multiple times.
+    * another node (per remote_origin_id below). This is necessary to make
+    * sure we don't replay the same forwarded commit multiple times.
     */
    AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd);
 
-   if (origin_id != InvalidRepNodeId)
+   if (remote_origin_id != InvalidRepNodeId)
    {
        /*
         * We're replaying a record that's been forwarded from another node, so
@@ -249,10 +253,11 @@ process_remote_commit(StringInfo s)
         * replay directly from that node will start from the correct LSN when
         * we replicate directly.
         *
-        * If it was from the immediate origin node, origin_id would be set to
-        * InvalidRepNodeId by the remote end's output plugin.
+        * If it was from the immediate origin node, remote_origin_id would be
+        * set to InvalidRepNodeId by the remote end's output plugin.
         */
-       AdvanceReplicationIdentifier(origin_id, origin_lsn, XactLastCommitEnd);
+       AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
+                                    XactLastCommitEnd);
    }
 
    CurrentResourceOwner = bdr_saved_resowner;
@@ -827,7 +832,7 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
    if (new_tuple)
        *new_tuple = NULL;
 
-   if (local_node_id == bdr_apply_worker->origin_id)
+   if (local_node_id == replication_origin_id)
    {
        /*
         * If the row got updated twice within a single node, just apply the
@@ -912,7 +917,7 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
             */
            fetch_sysid_via_node_id(local_node_id,
                                    &local_sysid, &local_tli);
-           fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+           fetch_sysid_via_node_id(replication_origin_id,
                                    &remote_sysid, &remote_tli);
 
            /*
@@ -967,11 +972,11 @@ do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
 
    fetch_sysid_via_node_id(local_node_id,
                            &local_sysid, &local_tli);
-   fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+   fetch_sysid_via_node_id(replication_origin_id,
                            &remote_sysid, &remote_tli);
 
-   Assert(remote_sysid == bdr_apply_worker->sysid);
-   Assert(remote_tli == bdr_apply_worker->timeline);
+   Assert(remote_sysid == origin_sysid);
+   Assert(remote_tli == origin_timeline);
 
    memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
           MAXDATELEN);
index 4bf3b59d5c749d1bd02d0c968c05e6ec587609e9..f3c904bf893337b55d93665fbebed025575710b1 100644 (file)
@@ -488,7 +488,7 @@ bdr_conflict_log(BdrConflictType conflict_type,
    }
 
    /* TODO: May make sense to cache the remote sysid in a global too... */
-   fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+   fetch_sysid_via_node_id(replication_origin_id,
            &conflict.remote_sysid, &tli);
    conflict.remote_commit_time = replication_origin_timestamp;
    conflict.remote_txid = remote_txid;
index 771b8a70b6c14b8658fec3041187a2b76c036c53..582d8b7c2b7b49262fca55431ece8568b0a1b14f 100644 (file)
@@ -1032,10 +1032,6 @@ bdr_catchup_to_lsn(int cfg_index,
        /* Make sure the catchup worker can find its bdr.xxx_ GUCs */
        catchup_worker->connection_config_idx = cfg_index;
 
-       /* Set up the BdrApplyWorker struct in shmem */
-       catchup_worker->origin_id = InvalidRepNodeId;
-       catchup_worker->sysid = 0;
-       catchup_worker->timeline = 0;
        /* Special parameters for a catchup worker only */
        catchup_worker->replay_stop_lsn = target_lsn;
        catchup_worker->forward_changesets = true;