printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_(" --slot replication slot to use\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
+ {"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
exit(1);
}
break;
+ case 'S':
+ replication_slot = pg_strdup(optarg);
+ break;
case 'n':
noloop = 1;
break;
/* fd and filename for currently open WAL file */
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
+static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
{
off_t currpos;
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
+ lastFlushPosition = pos;
return true;
}
len += 1;
sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
- sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
+ sendint64(lastFlushPosition, &replybuf[len]); /* flush */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
int standby_message_timeout, char *partial_suffix)
{
char query[128];
+ char slotcmd[128];
PGresult *res;
XLogRecPtr stoppos;
if (!CheckServerVersionForStreaming(conn))
return false;
+ if (replication_slot != NULL)
+ sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
+ else
+ slotcmd[0] = 0;
+
if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
PQclear(res);
}
+ /*
+ * initialize flush position to starting point, it's the caller's
+ * responsibility that that's sane.
+ */
+ lastFlushPosition = startpos;
+
while (1)
{
/*
return true;
/* Initiate the replication stream at specified location */
- snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+ slotcmd,
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
goto error;
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
goto error;
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
/* Error message written in close_walfile() */
goto error;