#define MAXCONNINFO 1024
-static bool got_sigterm = false;
+static bool exit_worker = false;
static int n_configured_bdr_nodes = 0;
ResourceOwner bdr_saved_resowner;
static bool bdr_is_restart = false;
{
int save_errno = errno;
- got_sigterm = true;
+ exit_worker = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
+/*
+ * Read a remote action type and process the action record.
+ *
+ * May set exit_worker to stop processing before next record.
+ */
static void
bdr_process_remote_action(StringInfo s)
{
break;
/* COMMIT */
case 'C':
- process_remote_commit(s);
+ if (!process_remote_commit(s))
+ exit_worker = true;
break;
/* INSERT */
case 'I':
appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+ if (bdr_apply_worker->forward_changesets)
+ appendStringInfo(&query, ", forward_changesets 't'");
appendStringInfoChar(&query, ')');
res = PQexec(streamConn, query.data);
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- while (!got_sigterm)
+ while (!exit_worker)
{
/* int ret; */
int rc;
for (;;)
{
- if (got_sigterm)
+ if (exit_worker)
break;
r = PQgetCopyData(streamConn, ©buf, 1);
BdrApplyWorker *con = &worker->worker_data.apply_worker;
if ( strcmp(NameStr(con->dbname), dbname) == 0 )
{
- /* It's an apply worker for our DB; launch it */
+ /* It's an apply worker for our DB; register it */
BackgroundWorkerHandle *bgw_handle;
snprintf(apply_worker.bgw_name, BGW_MAXLEN,
ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
bdr_saved_resowner = CurrentResourceOwner;
- /* TODO: Hande need to initialize database from remote at this point */
-
elog(LOG, "Starting bdr apply workers for db %s",
NameStr(bdr_perdb_worker->dbname));
/* initialize sequencer */
bdr_sequencer_init(bdr_perdb_worker->seq_slot);
- while (!got_sigterm)
+ while (!exit_worker)
{
/*
* Background workers mustn't call usleep() or any direct equivalent:
BdrApplyWorker *apply_worker;
char *name;
- apply_worker = (BdrApplyWorker *) palloc(sizeof(BdrApplyWorker));
+ apply_worker = (BdrApplyWorker *) palloc0(sizeof(BdrApplyWorker));
+ apply_worker->forward_changesets = false;
+ apply_worker->replay_stop_lsn = InvalidXLogRecPtr;
name = (char *) lfirst(c);
if (!bdr_create_con_gucs(name, used_databases, &num_used_databases,
TimeLineID timeline;
+ /* If not InvalidXLogRecPtr, stop replay at this point and exit */
+ XLogRecPtr replay_stop_lsn;
+
+ /* Request that the remote forward all changes from other nodes */
+ bool forward_changesets;
+
} BdrApplyWorker;
/*
/* apply support */
extern void process_remote_begin(StringInfo s);
-extern void process_remote_commit(StringInfo s);
+extern bool process_remote_commit(StringInfo s);
extern void process_remote_insert(StringInfo s);
extern void process_remote_update(StringInfo s);
extern void process_remote_delete(StringInfo s);
}
}
-void
+/*
+ * Process a commit message from the output plugin, advance replication
+ * identifiers, commit the local transaction, and determine whether replay
+ * should continue.
+ *
+ * Returns true if apply should continue with the next record, false if replay
+ * should stop after this record.
+ */
+bool
process_remote_commit(StringInfo s)
{
XLogRecPtr commit_lsn;
CurrentResourceOwner = bdr_saved_resowner;
bdr_count_commit();
+
+ /*
+ * Stop replay if we're doing limited replay and we've replayed up to the
+ * last record we're supposed to process.
+ */
+ if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr
+ && bdr_apply_worker->replay_stop_lsn <= end_lsn)
+ {
+ ereport(LOG,
+ (errmsg("bdr apply %s finished processing; replayed to %X/%X of required %X/%X",
+ NameStr(bdr_apply_worker->name),
+ (uint32)(end_lsn>>32), (uint32)end_lsn,
+ (uint32)(bdr_apply_worker->replay_stop_lsn>>32), (uint32)bdr_apply_worker->replay_stop_lsn)));
+ /*
+ * We clear the replay_stop_lsn field to indicate successful catchup,
+ * so we don't need a separate flag field in shmem for all apply
+ * workers.
+ */
+ bdr_apply_worker->replay_stop_lsn = InvalidXLogRecPtr;
+ return false;
+ }
+ else
+ return true;
}
void