#define MAXCONNINFO 1024
-/* Really should be private to bdr_apply.c */
-extern bool exit_worker;
+/* TODO: move to bdr_apply.c when bdr_apply_main is moved */
+extern bool exit_worker;
+extern uint64 origin_sysid;
+extern TimeLineID origin_timeline;
+/* end externs for bdr apply state */
static int n_configured_bdr_nodes = 0;
ResourceOwner bdr_saved_resowner;
MyBgworkerEntry->bgw_name, NameStr(bdr_apply_config->dbname));
streamConn = bdr_establish_connection_and_slot(
- bdr_apply_config, &slot_name, &bdr_apply_worker->sysid,
- &bdr_apply_worker->timeline, &replication_identifier, NULL);
-
- bdr_apply_worker->origin_id = replication_identifier;
+ bdr_apply_config, &slot_name, &origin_sysid,
+ &origin_timeline, &replication_identifier, NULL);
/* initialize stat subsystem, our id won't change further */
bdr_count_set_current_node(replication_identifier);
bool changed[MaxTupleAttributeNumber];
} BDRTupleData;
-bool started_transaction = false;
+/* Relation oid cache; initialized then left unchanged */
Oid QueuedDDLCommandsRelid = InvalidOid;
Oid QueuedDropsRelid = InvalidOid;
+/* Global apply worker state */
+uint64 origin_sysid;
+TimeLineID origin_timeline;
+bool started_transaction = false;
+/* During apply, holds xid of remote transaction */
+TransactionId replication_origin_xid = InvalidTransactionId;
+
/*
* this should really be a static in bdr_apply.c, but bdr.c needs it for
* bdr_apply_main currently.
*/
bool exit_worker = false;
-/* During apply, holds xid of remote transaction */
-TransactionId replication_origin_xid = InvalidTransactionId;
-
/*
* This code only runs within an apply bgworker, so we can stash a pointer to our
* state in shm in a global for convenient access.
TimestampTz committime;
TimestampTz end_lsn;
int flags;
- RepNodeId origin_id = InvalidRepNodeId;
- XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ RepNodeId remote_origin_id = InvalidRepNodeId;
+ XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr;
Assert(bdr_apply_worker != NULL);
if (flags & BDR_OUTPUT_COMMIT_HAS_ORIGIN)
{
- origin_id = pq_getmsgint(s, 2);
- origin_lsn = pq_getmsgint64(s);
+ remote_origin_id = pq_getmsgint(s, 2);
+ remote_origin_lsn = pq_getmsgint64(s);
}
elog(DEBUG1, "COMMIT origin(lsn, end, timestamp): %X/%X, %X/%X, %s",
*
* We always advance the local replication identifier for the origin node,
* even if we're really replaying a commit that's been forwarded from
- * another node (per origin_id below). This is necessary to make sure we
- * don't replay the same forwarded commit multiple times.
+ * another node (per remote_origin_id below). This is necessary to make
+ * sure we don't replay the same forwarded commit multiple times.
*/
AdvanceCachedReplicationIdentifier(end_lsn, XactLastCommitEnd);
- if (origin_id != InvalidRepNodeId)
+ if (remote_origin_id != InvalidRepNodeId)
{
/*
* We're replaying a record that's been forwarded from another node, so
* replay directly from that node will start from the correct LSN when
* we replicate directly.
*
- * If it was from the immediate origin node, origin_id would be set to
- * InvalidRepNodeId by the remote end's output plugin.
+ * If it was from the immediate origin node, remote_origin_id would be
+ * set to InvalidRepNodeId by the remote end's output plugin.
*/
- AdvanceReplicationIdentifier(origin_id, origin_lsn, XactLastCommitEnd);
+ AdvanceReplicationIdentifier(remote_origin_id, remote_origin_lsn,
+ XactLastCommitEnd);
}
CurrentResourceOwner = bdr_saved_resowner;
if (new_tuple)
*new_tuple = NULL;
- if (local_node_id == bdr_apply_worker->origin_id)
+ if (local_node_id == replication_origin_id)
{
/*
* If the row got updated twice within a single node, just apply the
*/
fetch_sysid_via_node_id(local_node_id,
&local_sysid, &local_tli);
- fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+ fetch_sysid_via_node_id(replication_origin_id,
&remote_sysid, &remote_tli);
/*
fetch_sysid_via_node_id(local_node_id,
&local_sysid, &local_tli);
- fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+ fetch_sysid_via_node_id(replication_origin_id,
&remote_sysid, &remote_tli);
- Assert(remote_sysid == bdr_apply_worker->sysid);
- Assert(remote_tli == bdr_apply_worker->timeline);
+ Assert(remote_sysid == origin_sysid);
+ Assert(remote_tli == origin_timeline);
memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
MAXDATELEN);