#include "bdr.h"
+#include "fmgr.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/xact.h"
+#include "catalog/pg_type.h"
+
#include "replication/replication_identifier.h"
#include "replication/walreceiver.h"
#include "storage/shmem.h"
#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
#include "utils/syscache.h"
char *bdr_temp_dump_directory = NULL;
/* Check whether one of our connections has init_replica set */
for (off = 0; off < bdr_max_workers; off++)
{
- if (BdrWorkerCtl->slots[off].worker_type == BDR_WORKER_APPLY)
+ BdrApplyWorker *aw;
+
+ if (BdrWorkerCtl->slots[off].worker_type != BDR_WORKER_APPLY)
+ continue;
+
+ aw = &BdrWorkerCtl->slots[off].worker_data.apply_worker;
+
+ if (strcmp(NameStr(aw->dbname), NameStr(*dbname)) == 0)
{
- BdrApplyWorker *aw = &BdrWorkerCtl->slots[off].worker_data.apply_worker;
- if (strcmp(NameStr(aw->dbname), NameStr(*dbname)) == 0)
- {
- const char *init_replica_str;
- bool init_replica = false;
- init_replica_str = bdr_get_worker_option(NameStr(aw->name),
- "init_replica", true);
- if (init_replica_str
- && parse_bool(init_replica_str, &init_replica)
- && init_replica)
- return &BdrWorkerCtl->slots[off];
- }
+ const char *init_replica_str;
+ bool init_replica = false;
+ init_replica_str = bdr_get_worker_option(NameStr(aw->name),
+ "init_replica", true);
+ if (init_replica_str
+ && parse_bool(init_replica_str, &init_replica)
+ && init_replica)
+ return &BdrWorkerCtl->slots[off];
}
}
return NULL;
static char
bdr_get_remote_status(PGconn *pgconn, Name dbname)
{
- PGresult *res;
- char status;
- StringInfoData query;
- char escaped_dbname[NAMEDATALEN*2+1];
- int escape_error;
-
- initStringInfo(&query);
-
- PQescapeStringConn(pgconn, &escaped_dbname[0], NameStr(*dbname), NAMEDATALEN, &escape_error);
- if (escape_error)
- elog(FATAL, "Failed to escape local dbname %s: %s",
- NameStr(*dbname), PQerrorMessage(pgconn));
-
- appendStringInfo(&query,
- "SELECT node_status FROM bdr.bdr_nodes WHERE node_sysid = " UINT64_FORMAT " AND node_dbname = '%s' FOR UPDATE",
- GetSystemIdentifier(), escaped_dbname);
- res = PQexec(pgconn, query.data);
+ PGresult *res;
+ char status;
+ static const int n_params = 2;
+ Oid param_types[] = {NUMERICOID, TEXTOID};
+ const char *param_values[n_params];
+ const int sysid_str_length = 33;
+ char sysid_str[sysid_str_length];
+
+ snprintf(sysid_str, sysid_str_length, UINT64_FORMAT,
+ GetSystemIdentifier());
+ sysid_str[sysid_str_length-1] = '\0';
+
+ param_values[0] = sysid_str;
+ param_types[0] = NUMERICOID;
+
+ param_values[1] = NameStr(*dbname);
+ param_types[1] = TEXTOID;
+
+ res = PQexecParams(pgconn,
+ "SELECT node_status FROM bdr.bdr_nodes "
+ "WHERE node_sysid = $1 AND node_dbname = $2 "
+ "FOR UPDATE",
+ 2, param_types, param_values, NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- elog(FATAL, "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
- NameStr(*dbname), query.data, PQresStatus(PQresultStatus(res)),
+ elog(FATAL, "bdr %s: Failed to get remote status during bdr init: "
+ "state %s: %s\n", NameStr(*dbname),
+ PQresStatus(PQresultStatus(res)),
PQresultErrorMessage(res));
}
if (PQntuples(res) == 0)
* a group of BDR nodes.
*/
static char
-bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char prev_status)
+bdr_set_remote_status(PGconn *pgconn, Name dbname,
+ const char status, const char prev_status)
{
- PGresult *res;
- char *status_str;
- StringInfoData query;
- char escaped_dbname[NAMEDATALEN*2+1];
- int escape_error;
- const uint64 sysid = GetSystemIdentifier();
-
- initStringInfo(&query);
+ PGresult *res;
+ char *status_str;
+ const uint64 sysid = GetSystemIdentifier();
+ const int sysid_str_length = 33;
+ char sysid_str[sysid_str_length];
if (status == prev_status)
/* No action required (we could check the remote, but meh) */
return status;
- PQescapeStringConn(pgconn, &escaped_dbname[0], NameStr(*dbname), NAMEDATALEN, &escape_error);
- if (escape_error)
- elog(FATAL, "Failed to escape local dbname %s: %s",
- NameStr(*dbname), PQerrorMessage(pgconn));
+ snprintf(sysid_str, sysid_str_length, UINT64_FORMAT,
+ GetSystemIdentifier());
+ sysid_str[sysid_str_length-1] = '\0';
if (status == '\0')
{
- char new_status;
- appendStringInfo(&query,
- "DELETE FROM bdr.bdr_nodes WHERE node_sysid = "
- UINT64_FORMAT
- " AND node_dbname = '%s' RETURNING node_status",
- sysid, escaped_dbname);
- res = PQexec(pgconn, query.data);
+ Oid param_types[] = {NUMERICOID, TEXTOID};
+ const char *param_values[2];
+ char new_status;
+
+ param_values[0] = sysid_str;
+ param_values[1] = NameStr(*dbname);
+
+ res = PQexecParams(pgconn,
+ "DELETE FROM bdr.bdr_nodes WHERE node_sysid = $1"
+ " AND node_dbname = $2 RETURNING node_status",
+ 2, param_types, param_values, NULL, NULL, 0);
elog(DEBUG2, "bdr %s: deleting bdr_nodes row with id " UINT64_FORMAT
- " and node_dbname %s ", NameStr(*dbname), sysid, escaped_dbname);
+ " and node_dbname %s ", NameStr(*dbname), sysid, NameStr(*dbname));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- elog(FATAL, "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
- NameStr(*dbname), query.data,
- PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
+ elog(FATAL, "bdr %s: Failed to delete row from bdr_nodes: status %s: %s\n",
+ NameStr(*dbname), PQresStatus(PQresultStatus(res)),
+ PQresultErrorMessage(res));
}
if (PQntuples(res) == 0)
{
- /* If prev_status was '\0' we wouldn't be here, so we should've got a returned value */
+ /*
+ * If prev_status was '\0' we wouldn't be here, so we should've
+ * got a returned value.
+ */
elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
UINT64_FORMAT
", dbname='%s' missing, expected row with status=%c",
- NameStr(*dbname), sysid, escaped_dbname, (int)prev_status);
+ NameStr(*dbname), sysid, NameStr(*dbname), (int)prev_status);
}
status_str = PQgetvalue(res, 0, 0);
Assert(strlen(status_str) == 1);
elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
UINT64_FORMAT
", dbname='%s' had status=%c, expected status=%c",
- NameStr(*dbname), sysid, escaped_dbname, (int)new_status, (int)prev_status);
+ NameStr(*dbname), sysid, NameStr(*dbname),
+ (int) new_status, (int) prev_status);
}
PQclear(res);
}
else
{
- char new_status;
- appendStringInfo(&query,
- "UPDATE bdr.bdr_nodes "
- "SET node_status = '%c' "
- "WHERE node_sysid = " UINT64_FORMAT
- " AND node_dbname = '%s' RETURNING ("
- "SELECT node_status FROM bdr.bdr_nodes "
- "WHERE node_sysid = " UINT64_FORMAT
- " AND node_dbname = '%s')",
- (int)status, sysid, escaped_dbname, sysid,
- escaped_dbname);
-
- res = PQexec(pgconn, query.data);
+ Oid param_types[] = {CHAROID, NUMERICOID, TEXTOID};
+ const char *param_values[3];
+ char new_status;
+ char status_str[2];
+
+ snprintf(status_str, 2, "%c", (int)status);
+ param_values[0] = status_str;
+ param_values[1] = sysid_str;
+ param_values[2] = NameStr(*dbname);
+
+ res = PQexecParams(pgconn,
+ "UPDATE bdr.bdr_nodes "
+ "SET node_status = $1 "
+ "WHERE node_sysid = $2 AND node_dbname = $3 "
+ "RETURNING ("
+ "SELECT node_status FROM bdr.bdr_nodes "
+ "WHERE node_sysid = $2 AND node_dbname = $3)",
+ 3, param_types, param_values, NULL, NULL, 0);
elog(DEBUG2, "bdr %s: update row with id "
UINT64_FORMAT
" and node_dbname %s from %c to %c",
- NameStr(*dbname), sysid, escaped_dbname, prev_status, status);
+ NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(FATAL,
- "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
- NameStr(*dbname), query.data,
+ "bdr %s: Failed to update bdr.nodes row: status %s: %s\n",
+ NameStr(*dbname),
PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
}
if (PQntuples(res) != 0)
{
+ char *new_status_str;
/* Updated a row */
- status_str = PQgetvalue(res, 0, 0);
+ new_status_str = PQgetvalue(res, 0, 0);
Assert(strlen(status_str) == 1);
- new_status = status_str[0];
+ new_status = new_status_str[0];
if (new_status != prev_status)
{
elog(FATAL,
"bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT
", dbname='%s' had status=%c, expected status=%c",
- NameStr(*dbname), sysid, escaped_dbname, (int)new_status,
+ NameStr(*dbname), sysid, NameStr(*dbname), (int)new_status,
(int)prev_status);
}
}
else
{
- /* No rows affected, insert a new row instead */
+ /* No rows affected, insert a new row instead. We re-use the previous
+ * query parameters. */
PQclear(res);
- resetStringInfo(&query);
- appendStringInfo(&query,
- "INSERT INTO bdr.bdr_nodes (node_sysid, node_dbname, node_status) VALUES (" UINT64_FORMAT ", '%s', '%c');",
- sysid, escaped_dbname, (int)status);
- res = PQexec(pgconn, query.data);
+ res = PQexecParams(pgconn,
+ "INSERT INTO bdr.bdr_nodes"
+ " (node_status, node_sysid, node_dbname)"
+ " VALUES ($1, $2, $3);",
+ 3, param_types, param_values, NULL, NULL, 0);
elog(DEBUG2, "bdr %s: insert row with id " UINT64_FORMAT
" and node_dbname %s from %c to %c",
- NameStr(*dbname), sysid, escaped_dbname, prev_status, status);
+ NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(FATAL,
- "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
- NameStr(*dbname), query.data,
- PQresStatus(PQresultStatus(res)),
+ "bdr %s: Failed to insert row into bdr.bdr_nodes: status %s: %s\n",
+ NameStr(*dbname), PQresStatus(PQresultStatus(res)),
PQresultErrorMessage(res));
}
PQclear(res);
static XLogRecPtr
bdr_get_remote_lsn(PGconn *conn)
{
- const char *query =
- "SELECT pg_xlog_location_diff(pg_current_xlog_insert_location(), '0/0')";
- char *lsn_str;
- char *lsn_str_end;
XLogRecPtr lsn;
PGresult *res;
- res = PQexec(conn, query);
+ res = PQexec(conn, "SELECT pg_current_xlog_insert_location()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- elog(ERROR, "Unable to get remote LSN, query %s failed with status %s: %s\n",
- query, PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
+ elog(ERROR, "Unable to get remote LSN: status %s: %s\n",
+ PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
}
Assert(PQntuples(res) == 1);
- lsn_str = PQgetvalue(res, 0, 0);
- /* lsn_str's remote type is numeric, but we know it has to fit in an XLogRecPtr */
- /* TODO: Less ugly way to do this */
- lsn = (XLogRecPtr) strtoul(lsn_str, &lsn_str_end, 10);
- if (*lsn_str_end != '\0')
- elog(ERROR, "Unable to parse remote LSN value %s as unsigned long int", lsn_str);
+ Assert(!PQgetisnull(res, 0, 0));
+ lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
+ CStringGetDatum(PQgetvalue(res, 0, 0))));
PQclear(res);
return lsn;
}
/*
- * Make sure the bdr extension is installed on the other end. If it's
- * a known extension but not present in the current DB, try to CREATE EXTENSION
- * it.
+ * Make sure the bdr extension is installed on the other end. If it's a known
+ * extension but not present in the current DB error out and tell the user to
+ * activate BDR then try again.
*/
static void
bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name)
{
char *default_version;
/*
- * bdr ext is known to Pg, check install state, install if missing.
+ * bdr ext is known to Pg, check install state.
*
- * Right now we don't check the installed version or try to upgrade.
+ * Right now we don't check the installed version or try to install/upgrade.
*/
default_version = PQgetvalue(res, 0, 0);
Assert(default_version != NULL);
if (PQgetisnull(res, 0, 1))
{
- /* bdr ext present but not installed; try to create */
- PQclear(res);
-
- res = PQexec(pgconn, "CREATE EXTENSION IF NOT EXISTS btree_gist;");
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- ereport(ERROR,
- (errmsg("Unable to 'CREATE EXTENSION btree_gist;' on bdr connection %s: state %s: %s",
- NameStr(*bdr_conn_name), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)),
- errhint("CREATE EXTENSION btree_gist; as a superuser.")));
- }
- PQclear(res);
-
- res = PQexec(pgconn, "CREATE EXTENSION bdr;");
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- ereport(ERROR,
- (errmsg("Unable to 'CREATE EXTENSION bdr;' on bdr connection %s: state %s: %s",
- NameStr(*bdr_conn_name), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)),
- errhint("Make sure BDR is in shared_preload_libraries and CREATE EXTENSION bdr; as superuser.")));
- }
- PQclear(res);
+ ereport(ERROR,
+ (errmsg("Remote database for BDR connection %s does not have the bdr extension active",
+ NameStr(*bdr_conn_name)),
+ errdetail("no entry with name 'bdr' in pg_extensions"),
+ errhint("add 'bdr' to shared_preload_libraries in postgresql.conf "
+ "on the target server and restart it.")));
}
}
else if (PQntuples(res) == 0)
/*
* Delete a replication identifier.
*
- * This should really be in the replication identifier support code in changeset extraction,
- * as DeleteReplicationIdentifier or DropReplicationIdentifier.
+ * This should really be in the replication identifier support code in
+ * changeset extraction, as DeleteReplicationIdentifier or
+ * DropReplicationIdentifier.
*
* If no matching identifier is found, takes no action.
*/
RepNodeId replication_identifier;
NameData slot_name;
TimeLineID timeline;
- uint64 sysid;
+ uint64 sysid;
PGresult *res;
StringInfoData query;
char *sqlstate;
{
ereport(ERROR,
(errmsg("'DROP_REPLICATION_SLOT %s' on bdr connection %s failed with sqlstate %s: %s",
- NameStr(slot_name), NameStr(*connection_name), sqlstate, PQresultErrorMessage(res))));
+ NameStr(slot_name), NameStr(*connection_name),
+ sqlstate,PQresultErrorMessage(res))));
}
else
{
{
ereport(FATAL,
(errmsg("bdr %s: could not connect to the upstream server in non-replication mode: %s",
- NameStr(*dbname), PQerrorMessage(nonrepl_init_conn))));
+ NameStr(*dbname),
+ PQerrorMessage(nonrepl_init_conn))));
}
bdr_ensure_ext_installed(nonrepl_init_conn, dbname);
switch (status)
{
case '\0':
- elog(DEBUG2, "bdr %s: initializing from clean state", NameStr(*dbname));
+ elog(DEBUG2, "bdr %s: initializing from clean state",
+ NameStr(*dbname));
break;
case 'r':
/*
- * Init has been completed, but we didn't check our local bdr.bdr_nodes,
- * or the final update hasn't propagated yet.
+ * Init has been completed, but we didn't check our local
+ * bdr.bdr_nodes, or the final update hasn't propagated yet.
*
* All we need to do is catch up, we already replayed enough to be
* consistent and start up in normal mode last time around
case 'c':
/*
- * We were in catchup mode when we died. We need to resume catchup mode
- * up to the expected LSN before switching over.
+ * We were in catchup mode when we died. We need to resume catchup
+ * mode up to the expected LSN before switching over.
*
* To do that all we need to do is fall through without doing any
- * slot re-creation, dump/apply, etc, and pick up when we do catchup.
+ * slot re-creation, dump/apply, etc, and pick up when we do
+ * catchup.
*
- * We won't know what the original catchup target point is, but we can
- * just catch up to whatever xlog position the server is currently at.
+ * We won't know what the original catchup target point is, but we
+ * can just catch up to whatever xlog position the server is
+ * currently at.
*/
elog(DEBUG2, "bdr %s: dump applied, need to continue catchup",
NameStr(*dbname));
case 'i':
/*
- * A previous init attempt seems to have failed. Clean up, then fall through
- * to start setup again.
+ * A previous init attempt seems to have failed. Clean up, then
+ * fall through to start setup again.
*
- * We can't just re-use the slot and replication identifier that were created
- * last time (if they were), because we have no way of getting the slot's exported
- * snapshot after CREATE_REPLICATION_SLOT.
+ * We can't just re-use the slot and replication identifier that
+ * were created last time (if they were), because we have no way
+ * of getting the slot's exported snapshot after
+ * CREATE_REPLICATION_SLOT.
*/
elog(DEBUG2, "bdr %s: previous failed initalization detected, cleaning up",
NameStr(*dbname));
bdr_drop_slot_and_replication_identifier(init_conn_name, dbname);
- status = bdr_set_remote_status(nonrepl_init_conn, dbname, '\0', status);
+ status = bdr_set_remote_status(nonrepl_init_conn, dbname,
+ '\0', status);
break;
default:
- Assert(false); // Unhandled case
+ elog(ERROR, "unreachable"); /* Unhandled case */
break;
}
elog(LOG, "bdr %s: initializing from remote db", NameStr(*dbname));
- /* We're starting from scratch or have cleaned up a previous failed attempt */
- status = bdr_set_remote_status(nonrepl_init_conn, dbname, 'i', status);
+ /*
+ * We're starting from scratch or have cleaned up a previous failed
+ * attempt.
+ */
+ status = bdr_set_remote_status(nonrepl_init_conn, dbname,
+ 'i', status);
my_conn_idxs = (int*)palloc(sizeof(Size) * bdr_max_workers);
- /*
- * Collect a list of connections to make slots for.
- */
+ /* Collect a list of connections to make slots for. */
LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
for (off = 0; off < bdr_max_workers; off++)
- if (BdrWorkerCtl->slots[off].worker_type == BDR_WORKER_APPLY
- && (strcmp(NameStr(BdrWorkerCtl->slots[off].worker_data.apply_worker.dbname), NameStr(*dbname)) == 0))
+ {
+ BdrWorker *worker = &BdrWorkerCtl->slots[off];
+ const char *worker_name =
+ NameStr(worker->worker_data.apply_worker.dbname);
+
+ if (worker->worker_type == BDR_WORKER_APPLY
+ && strcmp(worker_name, NameStr(*dbname)) == 0)
my_conn_idxs[n_conns++] = off;
+ }
LWLockRelease(BdrWorkerCtl->lock);
elog(DEBUG2, "bdr %s: creating slots for %d nodes",
/*
* For each connection, ensure its slot exists.
*
- * Do it one by one rather than fiddling with async libpq queries. If this
- * needs to be parallelized later, it should probably be done by launching
- * each apply worker and letting them create their own slots, then having
- * them wait until signalled/unlatched before proceeding with actual
- * replication. That'll save us another round of connections too.
+ * Do it one by one rather than fiddling with async libpq queries. If
+ * this needs to be parallelized later, it should probably be done by
+ * launching each apply worker and letting them create their own
+ * slots, then having them wait until signalled/unlatched before
+ * proceeding with actual replication. That'll save us another round
+ * of connections too.
*
* We don't attempt any cleanup if slot creation fails, we just bail out
* and leave any already-created slots in place.
* are all discarded; they're not needed here, and will be obtained
* again by the apply workers when they're launched after init.
*/
- conn = bdr_establish_connection_and_slot(&w->worker_data.apply_worker.name, &slot_name, &sysid, &timeline, &replication_identifier, &snapshot);
+ conn = bdr_establish_connection_and_slot(
+ &w->worker_data.apply_worker.name, &slot_name,
+ &sysid, &timeline, &replication_identifier, &snapshot);
+
/* Always throws rather than returning failure */
Assert(conn);
else
{
/*
- * Just throw the returned info away; we only needed to create the slot
- * so its replication identifier can be advanced during catchup.
+ * Just throw the returned info away; we only needed to create
+ * the slot so its replication identifier can be advanced
+ * during catchup.
*/
if (snapshot)
pfree(snapshot);
/* Create the shm entry for the catchup worker */
LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
for (worker_shmem_idx = 0; worker_shmem_idx < bdr_max_workers; worker_shmem_idx++)
- if (BdrWorkerCtl->slots[worker_shmem_idx].worker_type == BDR_WORKER_EMPTY_SLOT)
+ {
+ BdrWorker *worker = &BdrWorkerCtl->slots[worker_shmem_idx];
+ if (worker->worker_type == BDR_WORKER_EMPTY_SLOT)
break;
+ }
if (worker_shmem_idx == bdr_max_workers)
{
LWLockRelease(BdrWorkerCtl->lock);
snprintf(bgw.bgw_name, BGW_MAXLEN,
"bdr %s: catchup apply to %X/%X on %s",
NameStr(*dbname),
- (uint32)(target_lsn>>32), (uint32)target_lsn,
+ (uint32)(target_lsn >> 32), (uint32)target_lsn,
NameStr(*conn_name));
bgw.bgw_name[BGW_MAXLEN-1] = '\0';
/*
* Sleep on our latch until we're woken by SIGUSR1 on bgworker state
- * change, or by timeout. (We need a timeout because there's a race between
- * bgworker start and our setting the latch; if it starts and dies again
- * quickly we'll miss it and sleep forever w/o a timeout).
+ * change, or by timeout. (We need a timeout because there's a race
+ * between bgworker start and our setting the latch; if it starts and
+ * dies again quickly we'll miss it and sleep forever w/o a timeout).
*/
while (bgw_status == BGWH_STARTED && bgw_pid == prev_bgw_pid)
{
pfree(bgw_handle);
/*
- * Stopped doesn't mean *successful*. The worker might've errored out. We
- * have no way of getting its exit status, so we have to rely on it setting
- * something in shmem on successful exit. In this case it will set
- * replay_stop_lsn to InvalidXLogRecPtr to indicate that replay is done.
+ * Stopped doesn't mean *successful*. The worker might've errored
+ * out. We have no way of getting its exit status, so we have to rely
+ * on it setting something in shmem on successful exit. In this case
+ * it will set replay_stop_lsn to InvalidXLogRecPtr to indicate that
+ * replay is done.
*/
if (catchup_worker->replay_stop_lsn != InvalidXLogRecPtr)
{