From: Andres Freund Date: Sun, 27 Apr 2014 13:47:38 +0000 (+0200) Subject: bdr: some cleanups for the init replica functionality. X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/?a=commitdiff_plain;h=5ff406973c4d332186babab77f0d5432f4fc6cc5;p=users%2Fandresfreund%2Fpostgres.git bdr: some cleanups for the init replica functionality. --- diff --git a/contrib/bdr/bdr_init_replica.c b/contrib/bdr/bdr_init_replica.c index fbcb9b118d..c4b819bc91 100644 --- a/contrib/bdr/bdr_init_replica.c +++ b/contrib/bdr/bdr_init_replica.c @@ -24,6 +24,7 @@ #include "bdr.h" +#include "fmgr.h" #include "libpq-fe.h" #include "miscadmin.h" @@ -32,6 +33,8 @@ #include "access/heapam.h" #include "access/xact.h" +#include "catalog/pg_type.h" + #include "replication/replication_identifier.h" #include "replication/walreceiver.h" @@ -45,6 +48,7 @@ #include "storage/shmem.h" #include "utils/builtins.h" +#include "utils/pg_lsn.h" #include "utils/syscache.h" char *bdr_temp_dump_directory = NULL; @@ -70,20 +74,23 @@ find_init_replica_worker(Name dbname) /* Check whether one of our connections has init_replica set */ for (off = 0; off < bdr_max_workers; off++) { - if (BdrWorkerCtl->slots[off].worker_type == BDR_WORKER_APPLY) + BdrApplyWorker *aw; + + if (BdrWorkerCtl->slots[off].worker_type != BDR_WORKER_APPLY) + continue; + + aw = &BdrWorkerCtl->slots[off].worker_data.apply_worker; + + if (strcmp(NameStr(aw->dbname), NameStr(*dbname)) == 0) { - BdrApplyWorker *aw = &BdrWorkerCtl->slots[off].worker_data.apply_worker; - if (strcmp(NameStr(aw->dbname), NameStr(*dbname)) == 0) - { - const char *init_replica_str; - bool init_replica = false; - init_replica_str = bdr_get_worker_option(NameStr(aw->name), - "init_replica", true); - if (init_replica_str - && parse_bool(init_replica_str, &init_replica) - && init_replica) - return &BdrWorkerCtl->slots[off]; - } + const char *init_replica_str; + bool init_replica = false; + init_replica_str = bdr_get_worker_option(NameStr(aw->name), + "init_replica", true); + if (init_replica_str + && parse_bool(init_replica_str, &init_replica) + && init_replica) + return &BdrWorkerCtl->slots[off]; } } return NULL; @@ -98,27 +105,34 @@ find_init_replica_worker(Name dbname) static char bdr_get_remote_status(PGconn *pgconn, Name dbname) { - PGresult *res; - char status; - StringInfoData query; - char escaped_dbname[NAMEDATALEN*2+1]; - int escape_error; - - initStringInfo(&query); - - PQescapeStringConn(pgconn, &escaped_dbname[0], NameStr(*dbname), NAMEDATALEN, &escape_error); - if (escape_error) - elog(FATAL, "Failed to escape local dbname %s: %s", - NameStr(*dbname), PQerrorMessage(pgconn)); - - appendStringInfo(&query, - "SELECT node_status FROM bdr.bdr_nodes WHERE node_sysid = " UINT64_FORMAT " AND node_dbname = '%s' FOR UPDATE", - GetSystemIdentifier(), escaped_dbname); - res = PQexec(pgconn, query.data); + PGresult *res; + char status; + static const int n_params = 2; + Oid param_types[] = {NUMERICOID, TEXTOID}; + const char *param_values[n_params]; + const int sysid_str_length = 33; + char sysid_str[sysid_str_length]; + + snprintf(sysid_str, sysid_str_length, UINT64_FORMAT, + GetSystemIdentifier()); + sysid_str[sysid_str_length-1] = '\0'; + + param_values[0] = sysid_str; + param_types[0] = NUMERICOID; + + param_values[1] = NameStr(*dbname); + param_types[1] = TEXTOID; + + res = PQexecParams(pgconn, + "SELECT node_status FROM bdr.bdr_nodes " + "WHERE node_sysid = $1 AND node_dbname = $2 " + "FOR UPDATE", + 2, param_types, param_values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - elog(FATAL, "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n", - NameStr(*dbname), query.data, PQresStatus(PQresultStatus(res)), + elog(FATAL, "bdr %s: Failed to get remote status during bdr init: " + "state %s: %s\n", NameStr(*dbname), + PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)); } if (PQntuples(res) == 0) @@ -146,52 +160,56 @@ bdr_get_remote_status(PGconn *pgconn, Name dbname) * a group of BDR nodes. */ static char -bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char prev_status) +bdr_set_remote_status(PGconn *pgconn, Name dbname, + const char status, const char prev_status) { - PGresult *res; - char *status_str; - StringInfoData query; - char escaped_dbname[NAMEDATALEN*2+1]; - int escape_error; - const uint64 sysid = GetSystemIdentifier(); - - initStringInfo(&query); + PGresult *res; + char *status_str; + const uint64 sysid = GetSystemIdentifier(); + const int sysid_str_length = 33; + char sysid_str[sysid_str_length]; if (status == prev_status) /* No action required (we could check the remote, but meh) */ return status; - PQescapeStringConn(pgconn, &escaped_dbname[0], NameStr(*dbname), NAMEDATALEN, &escape_error); - if (escape_error) - elog(FATAL, "Failed to escape local dbname %s: %s", - NameStr(*dbname), PQerrorMessage(pgconn)); + snprintf(sysid_str, sysid_str_length, UINT64_FORMAT, + GetSystemIdentifier()); + sysid_str[sysid_str_length-1] = '\0'; if (status == '\0') { - char new_status; - appendStringInfo(&query, - "DELETE FROM bdr.bdr_nodes WHERE node_sysid = " - UINT64_FORMAT - " AND node_dbname = '%s' RETURNING node_status", - sysid, escaped_dbname); - res = PQexec(pgconn, query.data); + Oid param_types[] = {NUMERICOID, TEXTOID}; + const char *param_values[2]; + char new_status; + + param_values[0] = sysid_str; + param_values[1] = NameStr(*dbname); + + res = PQexecParams(pgconn, + "DELETE FROM bdr.bdr_nodes WHERE node_sysid = $1" + " AND node_dbname = $2 RETURNING node_status", + 2, param_types, param_values, NULL, NULL, 0); elog(DEBUG2, "bdr %s: deleting bdr_nodes row with id " UINT64_FORMAT - " and node_dbname %s ", NameStr(*dbname), sysid, escaped_dbname); + " and node_dbname %s ", NameStr(*dbname), sysid, NameStr(*dbname)); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - elog(FATAL, "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n", - NameStr(*dbname), query.data, - PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)); + elog(FATAL, "bdr %s: Failed to delete row from bdr_nodes: status %s: %s\n", + NameStr(*dbname), PQresStatus(PQresultStatus(res)), + PQresultErrorMessage(res)); } if (PQntuples(res) == 0) { - /* If prev_status was '\0' we wouldn't be here, so we should've got a returned value */ + /* + * If prev_status was '\0' we wouldn't be here, so we should've + * got a returned value. + */ elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT ", dbname='%s' missing, expected row with status=%c", - NameStr(*dbname), sysid, escaped_dbname, (int)prev_status); + NameStr(*dbname), sysid, NameStr(*dbname), (int)prev_status); } status_str = PQgetvalue(res, 0, 0); Assert(strlen(status_str) == 1); @@ -202,51 +220,58 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT ", dbname='%s' had status=%c, expected status=%c", - NameStr(*dbname), sysid, escaped_dbname, (int)new_status, (int)prev_status); + NameStr(*dbname), sysid, NameStr(*dbname), + (int) new_status, (int) prev_status); } PQclear(res); } else { - char new_status; - appendStringInfo(&query, - "UPDATE bdr.bdr_nodes " - "SET node_status = '%c' " - "WHERE node_sysid = " UINT64_FORMAT - " AND node_dbname = '%s' RETURNING (" - "SELECT node_status FROM bdr.bdr_nodes " - "WHERE node_sysid = " UINT64_FORMAT - " AND node_dbname = '%s')", - (int)status, sysid, escaped_dbname, sysid, - escaped_dbname); - - res = PQexec(pgconn, query.data); + Oid param_types[] = {CHAROID, NUMERICOID, TEXTOID}; + const char *param_values[3]; + char new_status; + char status_str[2]; + + snprintf(status_str, 2, "%c", (int)status); + param_values[0] = status_str; + param_values[1] = sysid_str; + param_values[2] = NameStr(*dbname); + + res = PQexecParams(pgconn, + "UPDATE bdr.bdr_nodes " + "SET node_status = $1 " + "WHERE node_sysid = $2 AND node_dbname = $3 " + "RETURNING (" + "SELECT node_status FROM bdr.bdr_nodes " + "WHERE node_sysid = $2 AND node_dbname = $3)", + 3, param_types, param_values, NULL, NULL, 0); elog(DEBUG2, "bdr %s: update row with id " UINT64_FORMAT " and node_dbname %s from %c to %c", - NameStr(*dbname), sysid, escaped_dbname, prev_status, status); + NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status); if (PQresultStatus(res) != PGRES_TUPLES_OK) { elog(FATAL, - "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n", - NameStr(*dbname), query.data, + "bdr %s: Failed to update bdr.nodes row: status %s: %s\n", + NameStr(*dbname), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)); } if (PQntuples(res) != 0) { + char *new_status_str; /* Updated a row */ - status_str = PQgetvalue(res, 0, 0); + new_status_str = PQgetvalue(res, 0, 0); Assert(strlen(status_str) == 1); - new_status = status_str[0]; + new_status = new_status_str[0]; if (new_status != prev_status) { elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT ", dbname='%s' had status=%c, expected status=%c", - NameStr(*dbname), sysid, escaped_dbname, (int)new_status, + NameStr(*dbname), sysid, NameStr(*dbname), (int)new_status, (int)prev_status); } @@ -254,24 +279,24 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char } else { - /* No rows affected, insert a new row instead */ + /* No rows affected, insert a new row instead. We re-use the previous + * query parameters. */ PQclear(res); - resetStringInfo(&query); - appendStringInfo(&query, - "INSERT INTO bdr.bdr_nodes (node_sysid, node_dbname, node_status) VALUES (" UINT64_FORMAT ", '%s', '%c');", - sysid, escaped_dbname, (int)status); - res = PQexec(pgconn, query.data); + res = PQexecParams(pgconn, + "INSERT INTO bdr.bdr_nodes" + " (node_status, node_sysid, node_dbname)" + " VALUES ($1, $2, $3);", + 3, param_types, param_values, NULL, NULL, 0); elog(DEBUG2, "bdr %s: insert row with id " UINT64_FORMAT " and node_dbname %s from %c to %c", - NameStr(*dbname), sysid, escaped_dbname, prev_status, status); + NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status); if (PQresultStatus(res) != PGRES_COMMAND_OK) { elog(FATAL, - "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n", - NameStr(*dbname), query.data, - PQresStatus(PQresultStatus(res)), + "bdr %s: Failed to insert row into bdr.bdr_nodes: status %s: %s\n", + NameStr(*dbname), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)); } PQclear(res); @@ -284,34 +309,27 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char static XLogRecPtr bdr_get_remote_lsn(PGconn *conn) { - const char *query = - "SELECT pg_xlog_location_diff(pg_current_xlog_insert_location(), '0/0')"; - char *lsn_str; - char *lsn_str_end; XLogRecPtr lsn; PGresult *res; - res = PQexec(conn, query); + res = PQexec(conn, "SELECT pg_current_xlog_insert_location()"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - elog(ERROR, "Unable to get remote LSN, query %s failed with status %s: %s\n", - query, PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)); + elog(ERROR, "Unable to get remote LSN: status %s: %s\n", + PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)); } Assert(PQntuples(res) == 1); - lsn_str = PQgetvalue(res, 0, 0); - /* lsn_str's remote type is numeric, but we know it has to fit in an XLogRecPtr */ - /* TODO: Less ugly way to do this */ - lsn = (XLogRecPtr) strtoul(lsn_str, &lsn_str_end, 10); - if (*lsn_str_end != '\0') - elog(ERROR, "Unable to parse remote LSN value %s as unsigned long int", lsn_str); + Assert(!PQgetisnull(res, 0, 0)); + lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, + CStringGetDatum(PQgetvalue(res, 0, 0)))); PQclear(res); return lsn; } /* - * Make sure the bdr extension is installed on the other end. If it's - * a known extension but not present in the current DB, try to CREATE EXTENSION - * it. + * Make sure the bdr extension is installed on the other end. If it's a known + * extension but not present in the current DB error out and tell the user to + * activate BDR then try again. */ static void bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name) @@ -332,36 +350,20 @@ bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name) { char *default_version; /* - * bdr ext is known to Pg, check install state, install if missing. + * bdr ext is known to Pg, check install state. * - * Right now we don't check the installed version or try to upgrade. + * Right now we don't check the installed version or try to install/upgrade. */ default_version = PQgetvalue(res, 0, 0); Assert(default_version != NULL); if (PQgetisnull(res, 0, 1)) { - /* bdr ext present but not installed; try to create */ - PQclear(res); - - res = PQexec(pgconn, "CREATE EXTENSION IF NOT EXISTS btree_gist;"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - ereport(ERROR, - (errmsg("Unable to 'CREATE EXTENSION btree_gist;' on bdr connection %s: state %s: %s", - NameStr(*bdr_conn_name), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)), - errhint("CREATE EXTENSION btree_gist; as a superuser."))); - } - PQclear(res); - - res = PQexec(pgconn, "CREATE EXTENSION bdr;"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - ereport(ERROR, - (errmsg("Unable to 'CREATE EXTENSION bdr;' on bdr connection %s: state %s: %s", - NameStr(*bdr_conn_name), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)), - errhint("Make sure BDR is in shared_preload_libraries and CREATE EXTENSION bdr; as superuser."))); - } - PQclear(res); + ereport(ERROR, + (errmsg("Remote database for BDR connection %s does not have the bdr extension active", + NameStr(*bdr_conn_name)), + errdetail("no entry with name 'bdr' in pg_extensions"), + errhint("add 'bdr' to shared_preload_libraries in postgresql.conf " + "on the target server and restart it."))); } } else if (PQntuples(res) == 0) @@ -381,8 +383,9 @@ bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name) /* * Delete a replication identifier. * - * This should really be in the replication identifier support code in changeset extraction, - * as DeleteReplicationIdentifier or DropReplicationIdentifier. + * This should really be in the replication identifier support code in + * changeset extraction, as DeleteReplicationIdentifier or + * DropReplicationIdentifier. * * If no matching identifier is found, takes no action. */ @@ -429,7 +432,7 @@ bdr_drop_slot_and_replication_identifier(Name connection_name, Name dbname) RepNodeId replication_identifier; NameData slot_name; TimeLineID timeline; - uint64 sysid; + uint64 sysid; PGresult *res; StringInfoData query; char *sqlstate; @@ -486,7 +489,8 @@ bdr_drop_slot_and_replication_identifier(Name connection_name, Name dbname) { ereport(ERROR, (errmsg("'DROP_REPLICATION_SLOT %s' on bdr connection %s failed with sqlstate %s: %s", - NameStr(slot_name), NameStr(*connection_name), sqlstate, PQresultErrorMessage(res)))); + NameStr(slot_name), NameStr(*connection_name), + sqlstate,PQresultErrorMessage(res)))); } else { @@ -727,7 +731,8 @@ bdr_init_replica(Name dbname) { ereport(FATAL, (errmsg("bdr %s: could not connect to the upstream server in non-replication mode: %s", - NameStr(*dbname), PQerrorMessage(nonrepl_init_conn)))); + NameStr(*dbname), + PQerrorMessage(nonrepl_init_conn)))); } bdr_ensure_ext_installed(nonrepl_init_conn, dbname); @@ -739,13 +744,14 @@ bdr_init_replica(Name dbname) switch (status) { case '\0': - elog(DEBUG2, "bdr %s: initializing from clean state", NameStr(*dbname)); + elog(DEBUG2, "bdr %s: initializing from clean state", + NameStr(*dbname)); break; case 'r': /* - * Init has been completed, but we didn't check our local bdr.bdr_nodes, - * or the final update hasn't propagated yet. + * Init has been completed, but we didn't check our local + * bdr.bdr_nodes, or the final update hasn't propagated yet. * * All we need to do is catch up, we already replayed enough to be * consistent and start up in normal mode last time around @@ -756,14 +762,16 @@ bdr_init_replica(Name dbname) case 'c': /* - * We were in catchup mode when we died. We need to resume catchup mode - * up to the expected LSN before switching over. + * We were in catchup mode when we died. We need to resume catchup + * mode up to the expected LSN before switching over. * * To do that all we need to do is fall through without doing any - * slot re-creation, dump/apply, etc, and pick up when we do catchup. + * slot re-creation, dump/apply, etc, and pick up when we do + * catchup. * - * We won't know what the original catchup target point is, but we can - * just catch up to whatever xlog position the server is currently at. + * We won't know what the original catchup target point is, but we + * can just catch up to whatever xlog position the server is + * currently at. */ elog(DEBUG2, "bdr %s: dump applied, need to continue catchup", NameStr(*dbname)); @@ -771,21 +779,23 @@ bdr_init_replica(Name dbname) case 'i': /* - * A previous init attempt seems to have failed. Clean up, then fall through - * to start setup again. + * A previous init attempt seems to have failed. Clean up, then + * fall through to start setup again. * - * We can't just re-use the slot and replication identifier that were created - * last time (if they were), because we have no way of getting the slot's exported - * snapshot after CREATE_REPLICATION_SLOT. + * We can't just re-use the slot and replication identifier that + * were created last time (if they were), because we have no way + * of getting the slot's exported snapshot after + * CREATE_REPLICATION_SLOT. */ elog(DEBUG2, "bdr %s: previous failed initalization detected, cleaning up", NameStr(*dbname)); bdr_drop_slot_and_replication_identifier(init_conn_name, dbname); - status = bdr_set_remote_status(nonrepl_init_conn, dbname, '\0', status); + status = bdr_set_remote_status(nonrepl_init_conn, dbname, + '\0', status); break; default: - Assert(false); // Unhandled case + elog(ERROR, "unreachable"); /* Unhandled case */ break; } @@ -799,19 +809,27 @@ bdr_init_replica(Name dbname) elog(LOG, "bdr %s: initializing from remote db", NameStr(*dbname)); - /* We're starting from scratch or have cleaned up a previous failed attempt */ - status = bdr_set_remote_status(nonrepl_init_conn, dbname, 'i', status); + /* + * We're starting from scratch or have cleaned up a previous failed + * attempt. + */ + status = bdr_set_remote_status(nonrepl_init_conn, dbname, + 'i', status); my_conn_idxs = (int*)palloc(sizeof(Size) * bdr_max_workers); - /* - * Collect a list of connections to make slots for. - */ + /* Collect a list of connections to make slots for. */ LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED); for (off = 0; off < bdr_max_workers; off++) - if (BdrWorkerCtl->slots[off].worker_type == BDR_WORKER_APPLY - && (strcmp(NameStr(BdrWorkerCtl->slots[off].worker_data.apply_worker.dbname), NameStr(*dbname)) == 0)) + { + BdrWorker *worker = &BdrWorkerCtl->slots[off]; + const char *worker_name = + NameStr(worker->worker_data.apply_worker.dbname); + + if (worker->worker_type == BDR_WORKER_APPLY + && strcmp(worker_name, NameStr(*dbname)) == 0) my_conn_idxs[n_conns++] = off; + } LWLockRelease(BdrWorkerCtl->lock); elog(DEBUG2, "bdr %s: creating slots for %d nodes", @@ -820,11 +838,12 @@ bdr_init_replica(Name dbname) /* * For each connection, ensure its slot exists. * - * Do it one by one rather than fiddling with async libpq queries. If this - * needs to be parallelized later, it should probably be done by launching - * each apply worker and letting them create their own slots, then having - * them wait until signalled/unlatched before proceeding with actual - * replication. That'll save us another round of connections too. + * Do it one by one rather than fiddling with async libpq queries. If + * this needs to be parallelized later, it should probably be done by + * launching each apply worker and letting them create their own + * slots, then having them wait until signalled/unlatched before + * proceeding with actual replication. That'll save us another round + * of connections too. * * We don't attempt any cleanup if slot creation fails, we just bail out * and leave any already-created slots in place. @@ -847,7 +866,10 @@ bdr_init_replica(Name dbname) * are all discarded; they're not needed here, and will be obtained * again by the apply workers when they're launched after init. */ - conn = bdr_establish_connection_and_slot(&w->worker_data.apply_worker.name, &slot_name, &sysid, &timeline, &replication_identifier, &snapshot); + conn = bdr_establish_connection_and_slot( + &w->worker_data.apply_worker.name, &slot_name, + &sysid, &timeline, &replication_identifier, &snapshot); + /* Always throws rather than returning failure */ Assert(conn); @@ -871,8 +893,9 @@ bdr_init_replica(Name dbname) else { /* - * Just throw the returned info away; we only needed to create the slot - * so its replication identifier can be advanced during catchup. + * Just throw the returned info away; we only needed to create + * the slot so its replication identifier can be advanced + * during catchup. */ if (snapshot) pfree(snapshot); @@ -958,8 +981,11 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_ /* Create the shm entry for the catchup worker */ LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED); for (worker_shmem_idx = 0; worker_shmem_idx < bdr_max_workers; worker_shmem_idx++) - if (BdrWorkerCtl->slots[worker_shmem_idx].worker_type == BDR_WORKER_EMPTY_SLOT) + { + BdrWorker *worker = &BdrWorkerCtl->slots[worker_shmem_idx]; + if (worker->worker_type == BDR_WORKER_EMPTY_SLOT) break; + } if (worker_shmem_idx == bdr_max_workers) { LWLockRelease(BdrWorkerCtl->lock); @@ -1012,7 +1038,7 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_ snprintf(bgw.bgw_name, BGW_MAXLEN, "bdr %s: catchup apply to %X/%X on %s", NameStr(*dbname), - (uint32)(target_lsn>>32), (uint32)target_lsn, + (uint32)(target_lsn >> 32), (uint32)target_lsn, NameStr(*conn_name)); bgw.bgw_name[BGW_MAXLEN-1] = '\0'; @@ -1023,9 +1049,9 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_ /* * Sleep on our latch until we're woken by SIGUSR1 on bgworker state - * change, or by timeout. (We need a timeout because there's a race between - * bgworker start and our setting the latch; if it starts and dies again - * quickly we'll miss it and sleep forever w/o a timeout). + * change, or by timeout. (We need a timeout because there's a race + * between bgworker start and our setting the latch; if it starts and + * dies again quickly we'll miss it and sleep forever w/o a timeout). */ while (bgw_status == BGWH_STARTED && bgw_pid == prev_bgw_pid) { @@ -1065,10 +1091,11 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_ pfree(bgw_handle); /* - * Stopped doesn't mean *successful*. The worker might've errored out. We - * have no way of getting its exit status, so we have to rely on it setting - * something in shmem on successful exit. In this case it will set - * replay_stop_lsn to InvalidXLogRecPtr to indicate that replay is done. + * Stopped doesn't mean *successful*. The worker might've errored + * out. We have no way of getting its exit status, so we have to rely + * on it setting something in shmem on successful exit. In this case + * it will set replay_stop_lsn to InvalidXLogRecPtr to indicate that + * replay is done. */ if (catchup_worker->replay_stop_lsn != InvalidXLogRecPtr) {