bdr: some cleanups for the init replica functionality.
authorAndres Freund <andres@anarazel.de>
Sun, 27 Apr 2014 13:47:38 +0000 (15:47 +0200)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:28 +0000 (17:55 +0200)
contrib/bdr/bdr_init_replica.c

index fbcb9b118d61b04d897126845afc4fc0fb953496..c4b819bc91455853b4bbd8bf812ae1c1675128ab 100644 (file)
@@ -24,6 +24,7 @@
 
 #include "bdr.h"
 
+#include "fmgr.h"
 #include "libpq-fe.h"
 #include "miscadmin.h"
 
@@ -32,6 +33,8 @@
 #include "access/heapam.h"
 #include "access/xact.h"
 
+#include "catalog/pg_type.h"
+
 #include "replication/replication_identifier.h"
 #include "replication/walreceiver.h"
 
@@ -45,6 +48,7 @@
 #include "storage/shmem.h"
 
 #include "utils/builtins.h"
+#include "utils/pg_lsn.h"
 #include "utils/syscache.h"
 
 char *bdr_temp_dump_directory = NULL;
@@ -70,20 +74,23 @@ find_init_replica_worker(Name dbname)
    /* Check whether one of our connections has init_replica set */
    for (off = 0; off < bdr_max_workers; off++)
    {
-       if (BdrWorkerCtl->slots[off].worker_type == BDR_WORKER_APPLY)
+       BdrApplyWorker *aw;
+
+       if (BdrWorkerCtl->slots[off].worker_type != BDR_WORKER_APPLY)
+           continue;
+
+       aw = &BdrWorkerCtl->slots[off].worker_data.apply_worker;
+
+       if (strcmp(NameStr(aw->dbname), NameStr(*dbname)) == 0)
        {
-           BdrApplyWorker *aw = &BdrWorkerCtl->slots[off].worker_data.apply_worker;
-           if (strcmp(NameStr(aw->dbname), NameStr(*dbname)) == 0)
-           {
-               const char *init_replica_str;
-               bool init_replica = false;
-               init_replica_str = bdr_get_worker_option(NameStr(aw->name),
-                                                        "init_replica", true);
-               if (init_replica_str
-                   && parse_bool(init_replica_str, &init_replica)
-                   && init_replica)
-                   return &BdrWorkerCtl->slots[off];
-           }
+           const char *init_replica_str;
+           bool init_replica = false;
+           init_replica_str = bdr_get_worker_option(NameStr(aw->name),
+                                                    "init_replica", true);
+           if (init_replica_str
+               && parse_bool(init_replica_str, &init_replica)
+               && init_replica)
+               return &BdrWorkerCtl->slots[off];
        }
    }
    return NULL;
@@ -98,27 +105,34 @@ find_init_replica_worker(Name dbname)
 static char
 bdr_get_remote_status(PGconn *pgconn, Name dbname)
 {
-   PGresult *res;
-   char status;
-   StringInfoData query;
-   char escaped_dbname[NAMEDATALEN*2+1];
-   int escape_error;
-
-   initStringInfo(&query);
-
-   PQescapeStringConn(pgconn, &escaped_dbname[0], NameStr(*dbname), NAMEDATALEN, &escape_error);
-   if (escape_error)
-       elog(FATAL, "Failed to escape local dbname %s: %s",
-            NameStr(*dbname), PQerrorMessage(pgconn));
-
-   appendStringInfo(&query,
-                    "SELECT node_status FROM bdr.bdr_nodes WHERE node_sysid = " UINT64_FORMAT " AND node_dbname = '%s' FOR UPDATE",
-                    GetSystemIdentifier(), escaped_dbname);
-   res = PQexec(pgconn, query.data);
+   PGresult           *res;
+   char                status;
+   static const int    n_params = 2;
+   Oid                 param_types[] = {NUMERICOID, TEXTOID};
+   const char         *param_values[n_params];
+   const int           sysid_str_length = 33;
+   char                sysid_str[sysid_str_length];
+
+   snprintf(sysid_str, sysid_str_length, UINT64_FORMAT,
+            GetSystemIdentifier());
+   sysid_str[sysid_str_length-1] = '\0';
+
+   param_values[0] = sysid_str;
+   param_types[0] = NUMERICOID;
+
+   param_values[1] = NameStr(*dbname);
+   param_types[1] = TEXTOID;
+
+   res = PQexecParams(pgconn,
+                      "SELECT node_status FROM bdr.bdr_nodes "
+                      "WHERE node_sysid = $1 AND node_dbname = $2 "
+                      "FOR UPDATE",
+                      2, param_types, param_values, NULL, NULL, 0);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
-       elog(FATAL, "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
-            NameStr(*dbname), query.data, PQresStatus(PQresultStatus(res)),
+       elog(FATAL, "bdr %s: Failed to get remote status during bdr init: "
+            "state %s: %s\n", NameStr(*dbname),
+            PQresStatus(PQresultStatus(res)),
             PQresultErrorMessage(res));
    }
    if (PQntuples(res) == 0)
@@ -146,52 +160,56 @@ bdr_get_remote_status(PGconn *pgconn, Name dbname)
  * a group of BDR nodes.
  */
 static char
-bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char prev_status)
+bdr_set_remote_status(PGconn *pgconn, Name dbname,
+                     const char status, const char prev_status)
 {
-   PGresult *res;
-   char    *status_str;
-   StringInfoData query;
-   char escaped_dbname[NAMEDATALEN*2+1];
-   int escape_error;
-   const uint64 sysid = GetSystemIdentifier();
-
-   initStringInfo(&query);
+   PGresult           *res;
+   char               *status_str;
+   const uint64        sysid = GetSystemIdentifier();
+   const int           sysid_str_length = 33;
+   char                sysid_str[sysid_str_length];
 
    if (status == prev_status)
        /* No action required (we could check the remote, but meh) */
        return status;
 
-   PQescapeStringConn(pgconn, &escaped_dbname[0], NameStr(*dbname), NAMEDATALEN, &escape_error);
-   if (escape_error)
-       elog(FATAL, "Failed to escape local dbname %s: %s",
-            NameStr(*dbname), PQerrorMessage(pgconn));
+   snprintf(sysid_str, sysid_str_length, UINT64_FORMAT,
+            GetSystemIdentifier());
+   sysid_str[sysid_str_length-1] = '\0';
 
    if (status == '\0')
    {
-       char    new_status;
-       appendStringInfo(&query,
-                        "DELETE FROM bdr.bdr_nodes WHERE node_sysid = "
-                        UINT64_FORMAT
-                        " AND node_dbname = '%s' RETURNING node_status",
-                        sysid, escaped_dbname);
-       res = PQexec(pgconn, query.data);
+       Oid         param_types[] = {NUMERICOID, TEXTOID};
+       const char *param_values[2];
+       char        new_status;
+
+       param_values[0] = sysid_str;
+       param_values[1] = NameStr(*dbname);
+
+       res = PQexecParams(pgconn,
+                          "DELETE FROM bdr.bdr_nodes WHERE node_sysid = $1"
+                          " AND node_dbname = $2 RETURNING node_status",
+                          2, param_types, param_values, NULL, NULL, 0);
 
        elog(DEBUG2, "bdr %s: deleting bdr_nodes row with id " UINT64_FORMAT
-            " and node_dbname %s ", NameStr(*dbname), sysid, escaped_dbname);
+            " and node_dbname %s ", NameStr(*dbname), sysid, NameStr(*dbname));
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-           elog(FATAL, "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
-                NameStr(*dbname), query.data,
-                PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
+           elog(FATAL, "bdr %s: Failed to delete row from bdr_nodes: status %s: %s\n",
+                NameStr(*dbname), PQresStatus(PQresultStatus(res)),
+                PQresultErrorMessage(res));
        }
        if (PQntuples(res) == 0)
        {
-           /* If prev_status was '\0' we wouldn't be here, so we should've got a returned value */
+           /*
+            * If prev_status was '\0' we wouldn't be here, so we should've
+            * got a returned value.
+            */
            elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
                 UINT64_FORMAT
                 ", dbname='%s' missing, expected row with status=%c",
-                NameStr(*dbname), sysid, escaped_dbname, (int)prev_status);
+                NameStr(*dbname), sysid, NameStr(*dbname), (int)prev_status);
        }
        status_str = PQgetvalue(res, 0, 0);
        Assert(strlen(status_str) == 1);
@@ -202,51 +220,58 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char
            elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
                 UINT64_FORMAT
                 ", dbname='%s' had status=%c, expected status=%c",
-                NameStr(*dbname), sysid, escaped_dbname, (int)new_status, (int)prev_status);
+                NameStr(*dbname), sysid, NameStr(*dbname),
+                (int) new_status, (int) prev_status);
        }
 
        PQclear(res);
    }
    else
    {
-       char    new_status;
-       appendStringInfo(&query,
-                        "UPDATE bdr.bdr_nodes "
-                        "SET node_status = '%c' "
-                        "WHERE node_sysid = " UINT64_FORMAT
-                        " AND node_dbname = '%s' RETURNING ("
-                        "SELECT node_status FROM bdr.bdr_nodes "
-                        "WHERE node_sysid = " UINT64_FORMAT
-                        " AND node_dbname = '%s')",
-                        (int)status, sysid, escaped_dbname, sysid,
-                        escaped_dbname);
-
-       res = PQexec(pgconn, query.data);
+       Oid         param_types[] = {CHAROID, NUMERICOID, TEXTOID};
+       const char *param_values[3];
+       char        new_status;
+       char        status_str[2];
+
+       snprintf(status_str, 2, "%c", (int)status);
+       param_values[0] = status_str;
+       param_values[1] = sysid_str;
+       param_values[2] = NameStr(*dbname);
+
+       res = PQexecParams(pgconn,
+                          "UPDATE bdr.bdr_nodes "
+                          "SET node_status = $1 "
+                          "WHERE node_sysid = $2 AND node_dbname = $3 "
+                          "RETURNING ("
+                          "SELECT node_status FROM bdr.bdr_nodes "
+                          "WHERE node_sysid = $2 AND node_dbname = $3)",
+                          3, param_types, param_values, NULL, NULL, 0);
 
        elog(DEBUG2, "bdr %s: update row with id "
             UINT64_FORMAT
             " and node_dbname %s from %c to %c",
-            NameStr(*dbname), sysid, escaped_dbname, prev_status, status);
+            NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            elog(FATAL,
-                "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
-                NameStr(*dbname), query.data,
+                "bdr %s: Failed to update bdr.nodes row: status %s: %s\n",
+                NameStr(*dbname),
                 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
        }
        if (PQntuples(res) != 0)
        {
+           char *new_status_str;
            /* Updated a row */
-           status_str = PQgetvalue(res, 0, 0);
+           new_status_str = PQgetvalue(res, 0, 0);
            Assert(strlen(status_str) == 1);
-           new_status = status_str[0];
+           new_status = new_status_str[0];
            if (new_status != prev_status)
            {
                elog(FATAL,
                     "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT
                     ", dbname='%s' had status=%c, expected status=%c",
-                    NameStr(*dbname), sysid, escaped_dbname, (int)new_status,
+                    NameStr(*dbname), sysid, NameStr(*dbname), (int)new_status,
                     (int)prev_status);
            }
 
@@ -254,24 +279,24 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char
        }
        else
        {
-           /* No rows affected, insert a new row instead */
+           /* No rows affected, insert a new row instead. We re-use the previous
+            * query parameters. */
            PQclear(res);
-           resetStringInfo(&query);
-           appendStringInfo(&query,
-                            "INSERT INTO bdr.bdr_nodes (node_sysid, node_dbname, node_status) VALUES (" UINT64_FORMAT ", '%s', '%c');",
-                            sysid, escaped_dbname, (int)status);
-           res = PQexec(pgconn, query.data);
+           res = PQexecParams(pgconn,
+                              "INSERT INTO bdr.bdr_nodes"
+                              "    (node_status, node_sysid, node_dbname)"
+                              "    VALUES ($1, $2, $3);",
+                              3, param_types, param_values, NULL, NULL, 0);
 
            elog(DEBUG2, "bdr %s: insert row with id " UINT64_FORMAT
                 " and node_dbname %s from %c to %c",
-                NameStr(*dbname), sysid, escaped_dbname, prev_status, status);
+                NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
 
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
            {
                elog(FATAL,
-                    "bdr %s: query failed during bdr init - \"%s\": status %s: %s\n",
-                    NameStr(*dbname), query.data,
-                    PQresStatus(PQresultStatus(res)),
+                    "bdr %s: Failed to insert row into bdr.bdr_nodes: status %s: %s\n",
+                    NameStr(*dbname), PQresStatus(PQresultStatus(res)),
                     PQresultErrorMessage(res));
            }
            PQclear(res);
@@ -284,34 +309,27 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const char status, const char
 static XLogRecPtr
 bdr_get_remote_lsn(PGconn *conn)
 {
-   const char *query =
-       "SELECT pg_xlog_location_diff(pg_current_xlog_insert_location(), '0/0')";
-   char       *lsn_str;
-   char       *lsn_str_end;
    XLogRecPtr  lsn;
    PGresult   *res;
 
-   res = PQexec(conn, query);
+   res = PQexec(conn, "SELECT pg_current_xlog_insert_location()");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
-       elog(ERROR, "Unable to get remote LSN, query %s failed with status %s: %s\n",
-           query, PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
+       elog(ERROR, "Unable to get remote LSN: status %s: %s\n",
+            PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
    }
    Assert(PQntuples(res) == 1);
-   lsn_str = PQgetvalue(res, 0, 0);
-   /* lsn_str's remote type is numeric, but we know it has to fit in an XLogRecPtr */
-   /* TODO: Less ugly way to do this */
-   lsn = (XLogRecPtr) strtoul(lsn_str, &lsn_str_end, 10);
-   if (*lsn_str_end != '\0')
-       elog(ERROR, "Unable to parse remote LSN value %s as unsigned long int", lsn_str);
+   Assert(!PQgetisnull(res, 0, 0));
+   lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
+                     CStringGetDatum(PQgetvalue(res, 0, 0))));
    PQclear(res);
    return lsn;
 }
 
 /*
- * Make sure the bdr extension is installed on the other end. If it's
- * a known extension but not present in the current DB, try to CREATE EXTENSION
- * it.
+ * Make sure the bdr extension is installed on the other end. If it's a known
+ * extension but not present in the current DB error out and tell the user to
+ * activate BDR then try again.
  */
 static void
 bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name)
@@ -332,36 +350,20 @@ bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name)
    {
        char *default_version;
        /*
-        * bdr ext is known to Pg, check install state, install if missing.
+        * bdr ext is known to Pg, check install state.
         *
-        * Right now we don't check the installed version or try to upgrade.
+        * Right now we don't check the installed version or try to install/upgrade.
         */
        default_version = PQgetvalue(res, 0, 0);
        Assert(default_version != NULL);
        if (PQgetisnull(res, 0, 1))
        {
-           /* bdr ext present but not installed; try to create */
-           PQclear(res);
-
-           res = PQexec(pgconn, "CREATE EXTENSION IF NOT EXISTS btree_gist;");
-           if (PQresultStatus(res) != PGRES_COMMAND_OK)
-           {
-               ereport(ERROR,
-                       (errmsg("Unable to 'CREATE EXTENSION btree_gist;' on bdr connection %s: state %s: %s",
-                        NameStr(*bdr_conn_name), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)),
-                        errhint("CREATE EXTENSION btree_gist; as a superuser.")));
-           }
-           PQclear(res);
-
-           res = PQexec(pgconn, "CREATE EXTENSION bdr;");
-           if (PQresultStatus(res) != PGRES_COMMAND_OK)
-           {
-               ereport(ERROR,
-                       (errmsg("Unable to 'CREATE EXTENSION bdr;' on bdr connection %s: state %s: %s",
-                        NameStr(*bdr_conn_name), PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res)),
-                        errhint("Make sure BDR is in shared_preload_libraries and CREATE EXTENSION bdr; as superuser.")));
-           }
-           PQclear(res);
+           ereport(ERROR,
+                   (errmsg("Remote database for BDR connection %s does not have the bdr extension active",
+                    NameStr(*bdr_conn_name)),
+                    errdetail("no entry with name 'bdr' in pg_extensions"),
+                    errhint("add 'bdr' to shared_preload_libraries in postgresql.conf "
+                            "on the target server and restart it.")));
        }
    }
    else if (PQntuples(res) == 0)
@@ -381,8 +383,9 @@ bdr_ensure_ext_installed(PGconn *pgconn, Name bdr_conn_name)
 /*
  * Delete a replication identifier.
  *
- * This should really be in the replication identifier support code in changeset extraction,
- * as DeleteReplicationIdentifier or DropReplicationIdentifier.
+ * This should really be in the replication identifier support code in
+ * changeset extraction, as DeleteReplicationIdentifier or
+ * DropReplicationIdentifier.
  *
  * If no matching identifier is found, takes no action.
  */
@@ -429,7 +432,7 @@ bdr_drop_slot_and_replication_identifier(Name connection_name, Name dbname)
    RepNodeId   replication_identifier;
    NameData    slot_name;
    TimeLineID  timeline;
-   uint64      sysid;
+   uint64      sysid;
    PGresult   *res;
    StringInfoData query;
    char       *sqlstate;
@@ -486,7 +489,8 @@ bdr_drop_slot_and_replication_identifier(Name connection_name, Name dbname)
        {
            ereport(ERROR,
                    (errmsg("'DROP_REPLICATION_SLOT %s' on bdr connection %s failed with sqlstate %s: %s",
-                    NameStr(slot_name), NameStr(*connection_name), sqlstate, PQresultErrorMessage(res))));
+                           NameStr(slot_name), NameStr(*connection_name),
+                           sqlstate,PQresultErrorMessage(res))));
        }
        else
        {
@@ -727,7 +731,8 @@ bdr_init_replica(Name dbname)
    {
        ereport(FATAL,
                (errmsg("bdr %s: could not connect to the upstream server in non-replication mode: %s",
-                       NameStr(*dbname), PQerrorMessage(nonrepl_init_conn))));
+                       NameStr(*dbname),
+                       PQerrorMessage(nonrepl_init_conn))));
    }
 
    bdr_ensure_ext_installed(nonrepl_init_conn, dbname);
@@ -739,13 +744,14 @@ bdr_init_replica(Name dbname)
    switch (status)
    {
        case '\0':
-           elog(DEBUG2, "bdr %s: initializing from clean state", NameStr(*dbname));
+           elog(DEBUG2, "bdr %s: initializing from clean state",
+                NameStr(*dbname));
            break;
 
        case 'r':
            /*
-            * Init has been completed, but we didn't check our local bdr.bdr_nodes,
-            * or the final update hasn't propagated yet.
+            * Init has been completed, but we didn't check our local
+            * bdr.bdr_nodes, or the final update hasn't propagated yet.
             *
             * All we need to do is catch up, we already replayed enough to be
             * consistent and start up in normal mode last time around
@@ -756,14 +762,16 @@ bdr_init_replica(Name dbname)
 
        case 'c':
            /*
-            * We were in catchup mode when we died. We need to resume catchup mode
-            * up to the expected LSN before switching over.
+            * We were in catchup mode when we died. We need to resume catchup
+            * mode up to the expected LSN before switching over.
             *
             * To do that all we need to do is fall through without doing any
-            * slot re-creation, dump/apply, etc, and pick up when we do catchup.
+            * slot re-creation, dump/apply, etc, and pick up when we do
+            * catchup.
             *
-            * We won't know what the original catchup target point is, but we can
-            * just catch up to whatever xlog position the server is currently at.
+            * We won't know what the original catchup target point is, but we
+            * can just catch up to whatever xlog position the server is
+            * currently at.
             */
            elog(DEBUG2, "bdr %s: dump applied, need to continue catchup",
                 NameStr(*dbname));
@@ -771,21 +779,23 @@ bdr_init_replica(Name dbname)
 
        case 'i':
            /*
-            * A previous init attempt seems to have failed. Clean up, then fall through
-            * to start setup again.
+            * A previous init attempt seems to have failed. Clean up, then
+            * fall through to start setup again.
             *
-            * We can't just re-use the slot and replication identifier that were created
-            * last time (if they were), because we have no way of getting the slot's exported
-            * snapshot after CREATE_REPLICATION_SLOT.
+            * We can't just re-use the slot and replication identifier that
+            * were created last time (if they were), because we have no way
+            * of getting the slot's exported snapshot after
+            * CREATE_REPLICATION_SLOT.
             */
            elog(DEBUG2, "bdr %s: previous failed initalization detected, cleaning up",
                 NameStr(*dbname));
            bdr_drop_slot_and_replication_identifier(init_conn_name, dbname);
-           status = bdr_set_remote_status(nonrepl_init_conn, dbname, '\0', status);
+           status = bdr_set_remote_status(nonrepl_init_conn, dbname,
+                                          '\0', status);
            break;
 
        default:
-           Assert(false); // Unhandled case
+           elog(ERROR, "unreachable"); /* Unhandled case */
            break;
    }
 
@@ -799,19 +809,27 @@ bdr_init_replica(Name dbname)
 
        elog(LOG, "bdr %s: initializing from remote db", NameStr(*dbname));
 
-       /* We're starting from scratch or have cleaned up a previous failed attempt */
-       status = bdr_set_remote_status(nonrepl_init_conn, dbname, 'i', status);
+       /*
+        * We're starting from scratch or have cleaned up a previous failed
+        * attempt.
+        */
+       status = bdr_set_remote_status(nonrepl_init_conn, dbname,
+                                      'i', status);
 
        my_conn_idxs = (int*)palloc(sizeof(Size) * bdr_max_workers);
 
-       /*
-        * Collect a list of connections to make slots for.
-        */
+       /* Collect a list of connections to make slots for. */
        LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
        for (off = 0; off < bdr_max_workers; off++)
-           if (BdrWorkerCtl->slots[off].worker_type == BDR_WORKER_APPLY
-               && (strcmp(NameStr(BdrWorkerCtl->slots[off].worker_data.apply_worker.dbname), NameStr(*dbname)) == 0))
+       {
+           BdrWorker *worker = &BdrWorkerCtl->slots[off];
+           const char *worker_name =
+               NameStr(worker->worker_data.apply_worker.dbname);
+
+           if (worker->worker_type == BDR_WORKER_APPLY
+               && strcmp(worker_name, NameStr(*dbname)) == 0)
                my_conn_idxs[n_conns++] = off;
+       }
        LWLockRelease(BdrWorkerCtl->lock);
 
        elog(DEBUG2, "bdr %s: creating slots for %d nodes",
@@ -820,11 +838,12 @@ bdr_init_replica(Name dbname)
        /*
         * For each connection, ensure its slot exists.
         *
-        * Do it one by one rather than fiddling with async libpq queries. If this
-        * needs to be parallelized later, it should probably be done by launching
-        * each apply worker and letting them create their own slots, then having
-        * them wait until signalled/unlatched before proceeding with actual
-        * replication. That'll save us another round of connections too.
+        * Do it one by one rather than fiddling with async libpq queries. If
+        * this needs to be parallelized later, it should probably be done by
+        * launching each apply worker and letting them create their own
+        * slots, then having them wait until signalled/unlatched before
+        * proceeding with actual replication. That'll save us another round
+        * of connections too.
         *
         * We don't attempt any cleanup if slot creation fails, we just bail out
         * and leave any already-created slots in place.
@@ -847,7 +866,10 @@ bdr_init_replica(Name dbname)
             * are all discarded; they're not needed here, and will be obtained
             * again by the apply workers when they're launched after init.
             */
-           conn = bdr_establish_connection_and_slot(&w->worker_data.apply_worker.name, &slot_name, &sysid, &timeline, &replication_identifier, &snapshot);
+           conn = bdr_establish_connection_and_slot(
+               &w->worker_data.apply_worker.name, &slot_name,
+               &sysid, &timeline, &replication_identifier, &snapshot);
+
            /* Always throws rather than returning failure */
            Assert(conn);
 
@@ -871,8 +893,9 @@ bdr_init_replica(Name dbname)
            else
            {
                /*
-                * Just throw the returned info away; we only needed to create the slot
-                * so its replication identifier can be advanced during catchup.
+                * Just throw the returned info away; we only needed to create
+                * the slot so its replication identifier can be advanced
+                * during catchup.
                 */
                if (snapshot)
                    pfree(snapshot);
@@ -958,8 +981,11 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_
    /* Create the shm entry for the catchup worker */
    LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
    for (worker_shmem_idx = 0; worker_shmem_idx < bdr_max_workers; worker_shmem_idx++)
-       if (BdrWorkerCtl->slots[worker_shmem_idx].worker_type == BDR_WORKER_EMPTY_SLOT)
+   {
+       BdrWorker *worker = &BdrWorkerCtl->slots[worker_shmem_idx];
+       if (worker->worker_type == BDR_WORKER_EMPTY_SLOT)
            break;
+   }
    if (worker_shmem_idx == bdr_max_workers)
    {
        LWLockRelease(BdrWorkerCtl->lock);
@@ -1012,7 +1038,7 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_
        snprintf(bgw.bgw_name, BGW_MAXLEN,
                 "bdr %s: catchup apply to %X/%X on %s",
                 NameStr(*dbname),
-                (uint32)(target_lsn>>32), (uint32)target_lsn,
+                (uint32)(target_lsn >> 32), (uint32)target_lsn,
                 NameStr(*conn_name));
        bgw.bgw_name[BGW_MAXLEN-1] = '\0';
 
@@ -1023,9 +1049,9 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_
 
        /*
         * Sleep on our latch until we're woken by SIGUSR1 on bgworker state
-        * change, or by timeout. (We need a timeout because there's a race between
-        * bgworker start and our setting the latch; if it starts and dies again
-        * quickly we'll miss it and sleep forever w/o a timeout).
+        * change, or by timeout. (We need a timeout because there's a race
+        * between bgworker start and our setting the latch; if it starts and
+        * dies again quickly we'll miss it and sleep forever w/o a timeout).
         */
        while (bgw_status == BGWH_STARTED && bgw_pid == prev_bgw_pid)
        {
@@ -1065,10 +1091,11 @@ bdr_catchup_to_lsn(PGconn *conn, Name dbname, Name conn_name, XLogRecPtr target_
        pfree(bgw_handle);
 
        /*
-        * Stopped doesn't mean *successful*. The worker might've errored out. We
-        * have no way of getting its exit status, so we have to rely on it setting
-        * something in shmem on successful exit. In this case it will set
-        * replay_stop_lsn to InvalidXLogRecPtr to indicate that replay is done.
+        * Stopped doesn't mean *successful*. The worker might've errored
+        * out. We have no way of getting its exit status, so we have to rely
+        * on it setting something in shmem on successful exit. In this case
+        * it will set replay_stop_lsn to InvalidXLogRecPtr to indicate that
+        * replay is done.
         */
        if (catchup_worker->replay_stop_lsn != InvalidXLogRecPtr)
        {