bdr: Support stopping replay at a specified LSN
authorCraig Ringer <craig@2ndquadrant.com>
Fri, 18 Apr 2014 04:29:01 +0000 (12:29 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:27 +0000 (17:55 +0200)
contrib/bdr/bdr.c
contrib/bdr/bdr.h
contrib/bdr/bdr_apply.c

index 192d35ee00b83f56c08d7070df6603edac39baec..6efc8e4bfc4a77ed61c73997dabd73a25efac470 100644 (file)
@@ -56,7 +56,7 @@
 
 #define MAXCONNINFO        1024
 
-static bool got_sigterm = false;
+static bool exit_worker = false;
 static int   n_configured_bdr_nodes = 0;
 ResourceOwner bdr_saved_resowner;
 static bool bdr_is_restart = false;
@@ -207,7 +207,7 @@ bdr_sigterm(SIGNAL_ARGS)
 {
    int         save_errno = errno;
 
-   got_sigterm = true;
+   exit_worker = true;
    if (MyProc)
        SetLatch(&MyProc->procLatch);
 
@@ -225,6 +225,11 @@ bdr_sighup(SIGNAL_ARGS)
    errno = save_errno;
 }
 
+/*
+ * Read a remote action type and process the action record.
+ *
+ * May set exit_worker to stop processing before next record.
+ */
 static void
 bdr_process_remote_action(StringInfo s)
 {
@@ -240,7 +245,8 @@ bdr_process_remote_action(StringInfo s)
            break;
            /* COMMIT */
        case 'C':
-           process_remote_commit(s);
+           if (!process_remote_commit(s))
+               exit_worker = true;
            break;
            /* INSERT */
        case 'I':
@@ -601,6 +607,8 @@ bdr_apply_main(Datum main_arg)
    appendStringInfo(&query, ", integer_datetimes '%d'", bdr_get_integer_timestamps());
    appendStringInfo(&query, ", bigendian '%d'", bdr_get_bigendian());
    appendStringInfo(&query, ", db_encoding '%s'", GetDatabaseEncodingName());
+   if (bdr_apply_worker->forward_changesets)
+       appendStringInfo(&query, ", forward_changesets 't'");
 
    appendStringInfoChar(&query, ')');
    res = PQexec(streamConn, query.data);
@@ -624,7 +632,7 @@ bdr_apply_main(Datum main_arg)
                                           ALLOCSET_DEFAULT_INITSIZE,
                                           ALLOCSET_DEFAULT_MAXSIZE);
 
-   while (!got_sigterm)
+   while (!exit_worker)
    {
        /* int       ret; */
        int         rc;
@@ -659,7 +667,7 @@ bdr_apply_main(Datum main_arg)
 
        for (;;)
        {
-           if (got_sigterm)
+           if (exit_worker)
                break;
 
            r = PQgetCopyData(streamConn, &copybuf, 1);
@@ -933,7 +941,7 @@ bdr_launch_apply_workers(char *dbname)
                    BdrApplyWorker *con = &worker->worker_data.apply_worker;
                    if ( strcmp(NameStr(con->dbname), dbname) == 0 )
                    {
-                       /* It's an apply worker for our DB; launch it */
+                       /* It's an apply worker for our DB; register it */
                        BackgroundWorkerHandle *bgw_handle;
 
                        snprintf(apply_worker.bgw_name, BGW_MAXLEN,
@@ -1017,8 +1025,6 @@ bdr_perdb_worker_main(Datum main_arg)
        ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
    bdr_saved_resowner = CurrentResourceOwner;
 
-   /* TODO: Hande need to initialize database from remote at this point */
-
    elog(LOG, "Starting bdr apply workers for db %s",
         NameStr(bdr_perdb_worker->dbname));
 
@@ -1041,7 +1047,7 @@ bdr_perdb_worker_main(Datum main_arg)
    /* initialize sequencer */
    bdr_sequencer_init(bdr_perdb_worker->seq_slot);
 
-   while (!got_sigterm)
+   while (!exit_worker)
    {
        /*
         * Background workers mustn't call usleep() or any direct equivalent:
@@ -1410,7 +1416,9 @@ _PG_init(void)
        BdrApplyWorker *apply_worker;
        char *name;
 
-       apply_worker = (BdrApplyWorker *) palloc(sizeof(BdrApplyWorker));
+       apply_worker = (BdrApplyWorker *) palloc0(sizeof(BdrApplyWorker));
+       apply_worker->forward_changesets = false;
+       apply_worker->replay_stop_lsn = InvalidXLogRecPtr;
        name = (char *) lfirst(c);
 
        if (!bdr_create_con_gucs(name, used_databases, &num_used_databases,
index 93d984576c18583b2f8a1ee9e9ec86dbd30a6ae6..415af96a7d8cb4fa2267c09750d19899bdab2d9d 100644 (file)
@@ -47,6 +47,12 @@ typedef struct BdrApplyWorker
 
    TimeLineID timeline;
 
+   /* If not InvalidXLogRecPtr, stop replay at this point and exit */
+   XLogRecPtr replay_stop_lsn;
+
+   /* Request that the remote forward all changes from other nodes */
+   bool forward_changesets;
+
 } BdrApplyWorker;
 
 /*
@@ -116,7 +122,7 @@ const char *bdr_get_worker_option(const char * worker_name, const char * option_
 
 /* apply support */
 extern void process_remote_begin(StringInfo s);
-extern void process_remote_commit(StringInfo s);
+extern bool process_remote_commit(StringInfo s);
 extern void process_remote_insert(StringInfo s);
 extern void process_remote_update(StringInfo s);
 extern void process_remote_delete(StringInfo s);
index 5d77de49814b5d6a4589c5d72728ec0507cf3dcc..fe687e151516cffbb794eb5d3af8a833075cc819 100644 (file)
@@ -175,7 +175,15 @@ process_remote_begin(StringInfo s)
    }
 }
 
-void
+/*
+ * Process a commit message from the output plugin, advance replication
+ * identifiers, commit the local transaction, and determine whether replay
+ * should continue.
+ *
+ * Returns true if apply should continue with the next record, false if replay
+ * should stop after this record.
+ */
+bool
 process_remote_commit(StringInfo s)
 {
    XLogRecPtr      commit_lsn;
@@ -243,6 +251,29 @@ process_remote_commit(StringInfo s)
    CurrentResourceOwner = bdr_saved_resowner;
 
    bdr_count_commit();
+
+   /*
+    * Stop replay if we're doing limited replay and we've replayed up to the
+    * last record we're supposed to process.
+    */
+   if (bdr_apply_worker->replay_stop_lsn != InvalidXLogRecPtr
+           && bdr_apply_worker->replay_stop_lsn <= end_lsn)
+   {
+       ereport(LOG,
+               (errmsg("bdr apply %s finished processing; replayed to %X/%X of required %X/%X",
+                NameStr(bdr_apply_worker->name),
+                (uint32)(end_lsn>>32), (uint32)end_lsn,
+                (uint32)(bdr_apply_worker->replay_stop_lsn>>32), (uint32)bdr_apply_worker->replay_stop_lsn)));
+       /*
+        * We clear the replay_stop_lsn field to indicate successful catchup,
+        * so we don't need a separate flag field in shmem for all apply
+        * workers.
+        */
+       bdr_apply_worker->replay_stop_lsn = InvalidXLogRecPtr;
+       return false;
+   }
+   else
+       return true;
 }
 
 void