From: Craig Ringer Date: Fri, 18 Apr 2014 04:29:01 +0000 (+0800) Subject: bdr: Support stopping replay at a specified LSN X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/?a=commitdiff_plain;h=e9602eec6b2e8b0de8c412faff2cbb101b0dac99;p=users%2Fandresfreund%2Fpostgres.git bdr: Support stopping replay at a specified LSN --- diff --git a/contrib/bdr/bdr.c b/contrib/bdr/bdr.c index 192d35ee00..6efc8e4bfc 100644 --- a/contrib/bdr/bdr.c +++ b/contrib/bdr/bdr.c @@ -56,7 +56,7 @@ #define MAXCONNINFO 1024 -static bool got_sigterm = false; +static bool exit_worker = false; static int n_configured_bdr_nodes = 0; ResourceOwner bdr_saved_resowner; static bool bdr_is_restart = false; @@ -207,7 +207,7 @@ bdr_sigterm(SIGNAL_ARGS) { int save_errno = errno; - got_sigterm = true; + exit_worker = true; if (MyProc) SetLatch(&MyProc->procLatch); @@ -225,6 +225,11 @@ bdr_sighup(SIGNAL_ARGS) errno = save_errno; } +/* + * Read a remote action type and process the action record. + * + * May set exit_worker to stop processing before next record. + */ static void bdr_process_remote_action(StringInfo s) { @@ -240,7 +245,8 @@ bdr_process_remote_action(StringInfo s) break; /* COMMIT */ case 'C': - process_remote_commit(s); + if (!process_remote_commit(s)) + exit_worker = true; break; /* INSERT */ case 'I': @@ -601,6 +607,8 @@ bdr_apply_main(Datum main_arg) appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps()); appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian()); appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName()); + if (bdr_apply_worker->forward_changesets) + appendStringInfo(&query, ", forward_changesets 't'"); appendStringInfoChar(&query, ')'); res = PQexec(streamConn, query.data); @@ -624,7 +632,7 @@ bdr_apply_main(Datum main_arg) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - while (!got_sigterm) + while (!exit_worker) { /* int ret; */ int rc; @@ -659,7 +667,7 @@ bdr_apply_main(Datum main_arg) for (;;) { - if (got_sigterm) + if (exit_worker) break; r = PQgetCopyData(streamConn, ©buf, 1); @@ -933,7 +941,7 @@ bdr_launch_apply_workers(char *dbname) BdrApplyWorker *con = &worker->worker_data.apply_worker; if ( strcmp(NameStr(con->dbname), dbname) == 0 ) { - /* It's an apply worker for our DB; launch it */ + /* It's an apply worker for our DB; register it */ BackgroundWorkerHandle *bgw_handle; snprintf(apply_worker.bgw_name, BGW_MAXLEN, @@ -1017,8 +1025,6 @@ bdr_perdb_worker_main(Datum main_arg) ResourceOwnerCreate(NULL, "bdr seq top-level resource owner"); bdr_saved_resowner = CurrentResourceOwner; - /* TODO: Hande need to initialize database from remote at this point */ - elog(LOG, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname)); @@ -1041,7 +1047,7 @@ bdr_perdb_worker_main(Datum main_arg) /* initialize sequencer */ bdr_sequencer_init(bdr_perdb_worker->seq_slot); - while (!got_sigterm) + while (!exit_worker) { /* * Background workers mustn't call usleep() or any direct equivalent: @@ -1410,7 +1416,9 @@ _PG_init(void) BdrApplyWorker *apply_worker; char *name; - apply_worker = (BdrApplyWorker *) palloc(sizeof(BdrApplyWorker)); + apply_worker = (BdrApplyWorker *) palloc0(sizeof(BdrApplyWorker)); + apply_worker->forward_changesets = false; + apply_worker->replay_stop_lsn = InvalidXLogRecPtr; name = (char *) lfirst(c); if (!bdr_create_con_gucs(name, used_databases, &num_used_databases, diff --git a/contrib/bdr/bdr.h b/contrib/bdr/bdr.h index 93d984576c..415af96a7d 100644 --- a/contrib/bdr/bdr.h +++ b/contrib/bdr/bdr.h @@ -47,6 +47,12 @@ typedef struct BdrApplyWorker TimeLineID timeline; + /* If not InvalidXLogRecPtr, stop replay at this point and exit */ + XLogRecPtr replay_stop_lsn; + + /* Request that the remote forward all changes from other nodes */ + bool forward_changesets; + } BdrApplyWorker; /* @@ -116,7 +122,7 @@ const char *bdr_get_worker_option(const char * worker_name, const char * option_ /* apply support */ extern void process_remote_begin(StringInfo s); -extern void process_remote_commit(StringInfo s); +extern bool process_remote_commit(StringInfo s); extern void process_remote_insert(StringInfo s); extern void process_remote_update(StringInfo s); extern void process_remote_delete(StringInfo s); diff --git a/contrib/bdr/bdr_apply.c b/contrib/bdr/bdr_apply.c index 5d77de4981..fe687e1515 100644 --- a/contrib/bdr/bdr_apply.c +++ b/contrib/bdr/bdr_apply.c @@ -175,7 +175,15 @@ process_remote_begin(StringInfo s) } } -void +/* + * Process a commit message from the output plugin, advance replication + * identifiers, commit the local transaction, and determine whether replay + * should continue. + * + * Returns true if apply should continue with the next record, false if replay + * should stop after this record. + */ +bool process_remote_commit(StringInfo s) { XLogRecPtr commit_lsn; @@ -243,6 +251,29 @@ process_remote_commit(StringInfo s) CurrentResourceOwner = bdr_saved_resowner; bdr_count_commit(); + + /* + * Stop replay if we're doing limited replay and we've replayed up to the + * last record we're supposed to process. + */ + if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr + && bdr_apply_worker->replay_stop_lsn <= end_lsn) + { + ereport(LOG, + (errmsg("bdr apply %s finished processing; replayed to %X/%X of required %X/%X", + NameStr(bdr_apply_worker->name), + (uint32)(end_lsn>>32), (uint32)end_lsn, + (uint32)(bdr_apply_worker->replay_stop_lsn>>32), (uint32)bdr_apply_worker->replay_stop_lsn))); + /* + * We clear the replay_stop_lsn field to indicate successful catchup, + * so we don't need a separate flag field in shmem for all apply + * workers. + */ + bdr_apply_worker->replay_stop_lsn = InvalidXLogRecPtr; + return false; + } + else + return true; } void