pg_receivexlog: slot support
authorAndres Freund <andres@anarazel.de>
Wed, 29 Jan 2014 12:54:54 +0000 (13:54 +0100)
committerAndres Freund <andres@anarazel.de>
Wed, 29 Jan 2014 12:54:54 +0000 (13:54 +0100)
doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h

index 19bebb62f7ab6aa285b7d38f6ba06960abdb3f30..9a2e584b47696bbe827889fd7c0c7aa95b1b4110 100644 (file)
@@ -225,6 +225,17 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>--slot</option></term>
+      <listitem>
+        <para>
+         Require <application>pg_receivexlog</application> to use an existing
+         physical replication slot. See <xref linkend="streaming-replication">,
+         <xref linkend="changesetextraction">.
+        </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
    </para>
 
index 3c6ab9a90245f001837da0457833e0b7f113f307..8a702e3388015e0174f6ede850ced57b0e2caed3 100644 (file)
@@ -67,6 +67,7 @@ usage(void)
        printf(_("  -U, --username=NAME    connect as specified database user\n"));
        printf(_("  -w, --no-password      never prompt for password\n"));
        printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
+       printf(_("      --slot             replication slot to use\n"));
        printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
 }
 
@@ -343,6 +344,7 @@ main(int argc, char **argv)
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
                {"status-interval", required_argument, NULL, 's'},
+               {"slot", required_argument, NULL, 'S'},
                {"verbose", no_argument, NULL, 'v'},
                {NULL, 0, NULL, 0}
        };
@@ -409,6 +411,9 @@ main(int argc, char **argv)
                                        exit(1);
                                }
                                break;
+                       case 'S':
+                               replication_slot = pg_strdup(optarg);
+                               break;
                        case 'n':
                                noloop = 1;
                                break;
index 2555904cd06a3db79b00a85762e7b8dbf39e4f29..d50cf8d90e33d33a2b5dd9488afca0c03eea8f8a 100644 (file)
@@ -31,6 +31,7 @@
 /* fd and filename for currently open WAL file */
 static int     walfile = -1;
 static char current_walfile_name[MAXPGPATH] = "";
+static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
@@ -133,7 +134,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
  * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
 {
        off_t           currpos;
 
@@ -187,6 +188,7 @@ close_walfile(char *basedir, char *partial_suffix)
                                _("%s: not renaming \"%s%s\", segment is not complete\n"),
                                progname, current_walfile_name, partial_suffix);
 
+       lastFlushPosition = pos;
        return true;
 }
 
@@ -421,7 +423,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
        len += 1;
        sendint64(blockpos, &replybuf[len]);            /* write */
        len += 8;
-       sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* flush */
+       sendint64(lastFlushPosition, &replybuf[len]);           /* flush */
        len += 8;
        sendint64(InvalidXLogRecPtr, &replybuf[len]);           /* apply */
        len += 8;
@@ -511,6 +513,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                  int standby_message_timeout, char *partial_suffix)
 {
        char            query[128];
+       char            slotcmd[128];
        PGresult   *res;
        XLogRecPtr      stoppos;
 
@@ -521,6 +524,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        if (!CheckServerVersionForStreaming(conn))
                return false;
 
+       if (replication_slot != NULL)
+               sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
+       else
+               slotcmd[0] = 0;
+
        if (sysidentifier != NULL)
        {
                /* Validate system identifier hasn't changed */
@@ -560,6 +568,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                PQclear(res);
        }
 
+       /*
+        * initialize flush position to starting point, it's the caller's
+        * responsibility that that's sane.
+        */
+       lastFlushPosition = startpos;
+
        while (1)
        {
                /*
@@ -606,7 +620,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        return true;
 
                /* Initiate the replication stream at specified location */
-               snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+               snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+                                slotcmd,
                                 (uint32) (startpos >> 32), (uint32) startpos,
                                 timeline);
                res = PQexec(conn, query);
@@ -810,7 +825,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 */
                if (still_sending && stream_stop(blockpos, timeline, false))
                {
-                       if (!close_walfile(basedir, partial_suffix))
+                       if (!close_walfile(basedir, partial_suffix, blockpos))
                        {
                                /* Potential error message is written by close_walfile */
                                goto error;
@@ -909,7 +924,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                         */
                        if (still_sending)
                        {
-                               if (!close_walfile(basedir, partial_suffix))
+                               if (!close_walfile(basedir, partial_suffix, blockpos))
                                {
                                        /* Error message written in close_walfile() */
                                        goto error;
@@ -1074,7 +1089,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                /* Did we reach the end of a WAL segment? */
                                if (blockpos % XLOG_SEG_SIZE == 0)
                                {
-                                       if (!close_walfile(basedir, partial_suffix))
+                                       if (!close_walfile(basedir, partial_suffix, blockpos))
                                                /* Error message written in close_walfile() */
                                                goto error;
 
index 96fbed898fc3303ab09edf45b7088b95df74ae80..041076ff1d73976b9baf1f7a40dd3c87298d4a67 100644 (file)
@@ -22,6 +22,7 @@ char     *connection_string = NULL;
 char      *dbhost = NULL;
 char      *dbuser = NULL;
 char      *dbport = NULL;
+char      *replication_slot = NULL;
 int                    dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
 static char *dbpassword = NULL;
 PGconn    *conn = NULL;
index 77d6b86ced3c4aed719f76d14654dfb12442e7d6..bb3c34db07f831a1e0b9aee6e22e3c5c53e8952b 100644 (file)
@@ -6,6 +6,7 @@ extern char *dbhost;
 extern char *dbuser;
 extern char *dbport;
 extern int     dbgetpassword;
+extern char *replication_slot;
 
 /* Connection kept global so we can disconnect easily */
 extern PGconn *conn;