bdr: Add timeline ID to bdr.bdr_nodes, change sysid type to text
authorCraig Ringer <craig@2ndquadrant.com>
Thu, 15 May 2014 09:59:39 +0000 (17:59 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:37 +0000 (17:55 +0200)
The unique identifier for a given bdr node is (sysid, timelineid, dbname), so
make sure that's the correct key in bdr.bdr_nodes.

contrib/bdr/bdr--0.5.sql
contrib/bdr/bdr.h
contrib/bdr/bdr_catalogs.c
contrib/bdr/bdr_init_replica.c
contrib/bdr/bdr_output.c

index 6dd30d6edfda810d6b1f0c1007fdaf9b1de9ff0f..03923f56176a883fd1ea2069fc76613a5c7c5cc5 100644 (file)
@@ -305,10 +305,11 @@ COMMENT ON COLUMN bdr_conflict_history.error_message IS 'On apply error, the err
 -- who knows why we'd want to) so the PK should be the (dbname, sysid) tuple.
 --
 CREATE TABLE bdr_nodes (
-    node_sysid numeric not null,
+    node_sysid text not null, -- Really a uint64 but we have no type for that
+    node_timeline oid not null,
     node_dbname name not null,
     node_status "char" not null,
-    primary key(node_sysid, node_dbname),
+    primary key(node_sysid, node_timeline, node_dbname),
     check (node_status in ('i', 'c', 'r'))
 );
 REVOKE ALL ON TABLE bdr_nodes FROM PUBLIC;
@@ -316,6 +317,7 @@ SELECT pg_catalog.pg_extension_config_dump('bdr_nodes', '');
 
 COMMENT ON TABLE bdr_nodes IS 'All known nodes in this BDR group.';
 COMMENT ON COLUMN bdr_nodes.node_sysid IS 'system_identifier from the control file of the node';
+COMMENT ON COLUMN bdr_nodes.node_timeline IS 'timeline ID of this node';
 COMMENT ON COLUMN bdr_nodes.node_dbname IS 'local database name on the node';
 COMMENT ON COLUMN bdr_nodes.node_status IS 'Readiness of the node: [i]nitializing, [c]atchup, [r]eady. Doesn''t indicate connected/disconnected.';
 
index 04b86c1cb7fab6df6759435043ffc5ab5d39cd4d..89bdad69bf8750856f2c878b00b3eaa3378f78d8 100644 (file)
@@ -320,8 +320,8 @@ extern void init_bdr_commandfilter(void);
 extern void bdr_apply_main(Datum main_arg);
 
 /* manipulation of bdr catalogs */
-extern char bdr_nodes_get_local_status(uint64 sysid, Name dbname);
-extern void bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status);
+extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Name dbname);
+extern void bdr_nodes_set_local_status(Name dbname, char status);
 
 extern Oid GetSysCacheOidError(int cacheId, Datum key1, Datum key2, Datum key3,
                               Datum key4);
index fc329bd0e0762363ddcad5131cf6b41be4396486..ea5a27bea1241d73725dd66d4d0f7532e90b66de 100644 (file)
@@ -50,19 +50,19 @@ GetSysCacheOidError(int cacheId,
    GetSysCacheOidError(cacheId, key1, key2, 0, 0)
 
 /*
- * Get the bdr.bdr_nodes status value for the current local node from the local
- * database via SPI, if any such row exists.
+ * Get the bdr.bdr_nodes status value for the specififed node from the local
+ * bdr.bdr_nodes table via SPI.
  *
  * Returns the status value, or '\0' if no such row exists.
  *
  * SPI must be initialized, and you must be in a running transaction.
  */
 char
-bdr_nodes_get_local_status(uint64 sysid, Name dbname)
+bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Name dbname)
 {
    int         spi_ret;
-   Oid         argtypes[] = { NUMERICOID, NAMEOID };
-   Datum       values[2];
+   Oid         argtypes[] = { TEXTOID, OIDOID, NAMEOID };
+   Datum       values[3];
    bool        isnull;
    char        status;
    char        sysid_str[33];
@@ -87,15 +87,14 @@ bdr_nodes_get_local_status(uint64 sysid, Name dbname)
                       NameStr(*dbname)),
                errhint("There is no bdr.bdr_connections entry for this database on the target node or bdr is not in shared_preload_libraries")));
 
-   values[0] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
-                                       CStringGetDatum(sysid_str),
-                                       InvalidOid, Int32GetDatum(-1));
-   values[1] = NameGetDatum(dbname);
+   values[0] = CStringGetTextDatum(sysid_str);
+   values[1] = ObjectIdGetDatum(tli);
+   values[2] = NameGetDatum(dbname);
 
    spi_ret = SPI_execute_with_args(
            "SELECT node_status FROM bdr.bdr_nodes "
-           "WHERE node_sysid = $1 AND node_dbname = $2",
-           2, argtypes, values, NULL, false, 1);
+           "WHERE node_sysid = $1 AND node_timeline = $2 AND node_dbname = $3",
+           3, argtypes, values, NULL, false, 1);
 
    if (spi_ret != SPI_OK_SELECT)
        elog(ERROR, "Unable to query bdr.bdr_nodes, SPI error %d", spi_ret);
@@ -119,15 +118,18 @@ bdr_nodes_get_local_status(uint64 sysid, Name dbname)
  * Unlike bdr_set_remote_status, '\0' may not be passed to delete the row, and
  * no upsert is performed. This is a simple insert only.
  *
+ * Unlike bdr_nodes_get_local_status, only the status of the local node may
+ * be set.
+ *
  * SPI must be initialized, and you must be in a running transaction that is
  * not bound to any remote node replication state.
  */
 void
-bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status)
+bdr_nodes_set_local_status(Name dbname, char status)
 {
    int         spi_ret;
-   Oid         argtypes[] = { CHAROID, NUMERICOID, NAMEOID };
-   Datum       values[3];
+   Oid         argtypes[] = { CHAROID, TEXTOID, OIDOID, NAMEOID };
+   Datum       values[4];
    char        sysid_str[33];
 
    Assert(status != '\0'); /* Cannot pass \0 to delete */
@@ -135,26 +137,27 @@ bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status)
    /* Cannot have replication apply state set in this tx */
    Assert(replication_origin_id == InvalidRepNodeId);
 
-   snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid);
+   snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT,
+            GetSystemIdentifier());
    sysid_str[sizeof(sysid_str)-1] = '\0';
 
    values[0] = CharGetDatum(status);
-   values[1] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
-                                       CStringGetDatum(sysid_str),
-                                       InvalidOid, Int32GetDatum(-1));
-   values[2] = NameGetDatum(dbname);
+   values[1] = CStringGetTextDatum(sysid_str);
+   values[2] = ObjectIdGetDatum(ThisTimeLineID);
+   values[3] = NameGetDatum(dbname);
 
    spi_ret = SPI_execute_with_args(
                               "INSERT INTO bdr.bdr_nodes"
-                              "    (node_status, node_sysid, node_dbname)"
-                              "    VALUES ($1, $2, $3);",
-                              3, argtypes, values, NULL, false, 0);
+                              " (node_status, node_sysid, node_timeline, node_dbname)"
+                              " VALUES ($1, $2, $3, $4);",
+                              4, argtypes, values, NULL, false, 0);
 
    if (spi_ret != SPI_OK_INSERT)
        elog(ERROR, "Unable to insert row (status=%c, node_sysid="
-                   UINT64_FORMAT ", node_dbname=%s) into bdr.bdr_nodes, "
-                   "SPI error %d",
-                   status, sysid, NameStr(*dbname), spi_ret);
+                   UINT64_FORMAT ", node_timeline=%u, node_dbname=%s) "
+                   "into bdr.bdr_nodes: SPI error %d",
+                   status, GetSystemIdentifier(), ThisTimeLineID,
+                   NameStr(*dbname), spi_ret);
 }
 
 
index c1e22501c840e1c3098cf42fd85eacf1a6f3346b..59343a42bf0f0c2315711ca3b64a6a37e6bdc17d 100644 (file)
@@ -106,26 +106,30 @@ bdr_get_remote_status(PGconn *pgconn, Name dbname)
 {
    PGresult           *res;
    char                status;
-   Oid                 param_types[] = {NUMERICOID, TEXTOID};
-   const char         *param_values[2];
+   Oid                 param_types[] = {TEXTOID, OIDOID, NAMEOID};
+   const char         *param_values[3];
    /* Needs to fit max length of UINT64_FORMAT */
    char                sysid_str[33];
+   char                tlid_str[33];
 
    snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT,
             GetSystemIdentifier());
    sysid_str[sizeof(sysid_str)-1] = '\0';
 
-   param_values[0] = sysid_str;
-   param_types[0] = NUMERICOID;
+   snprintf(tlid_str, sizeof(tlid_str), "%u",
+            ThisTimeLineID);
+   tlid_str[sizeof(tlid_str)-1] = '\0';
 
-   param_values[1] = NameStr(*dbname);
-   param_types[1] = TEXTOID;
+   param_values[0] = sysid_str;
+   param_values[1] = tlid_str;
+   param_values[2] = NameStr(*dbname);
 
    res = PQexecParams(pgconn,
                       "SELECT node_status FROM bdr.bdr_nodes "
-                      "WHERE node_sysid = $1 AND node_dbname = $2 "
+                      "WHERE node_sysid = $1 AND node_timeline = $2 "
+                      "AND node_dbname = $3 "
                       "FOR UPDATE",
-                      2, param_types, param_values, NULL, NULL, 0);
+                      3, param_types, param_values, NULL, NULL, 0);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        elog(FATAL, "bdr %s: Failed to get remote status during bdr init: "
@@ -166,6 +170,7 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
    const uint64        sysid = GetSystemIdentifier();
    /* Needs to fit max length of UINT64_FORMAT */
    char                sysid_str[33];
+   char                tlid_str[33];
 
    if (status == prev_status)
        /* No action required (we could check the remote, but meh) */
@@ -175,22 +180,25 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
             GetSystemIdentifier());
    sysid_str[sizeof(sysid_str)-1] = '\0';
 
+   snprintf(tlid_str, sizeof(tlid_str), "%u",
+            ThisTimeLineID);
+   tlid_str[sizeof(tlid_str)-1] = '\0';
+
    if (status == '\0')
    {
-       Oid         param_types[] = {NUMERICOID, TEXTOID};
-       const char *param_values[2];
+       Oid         param_types[] = {TEXTOID, OIDOID, NAMEOID};
+       const char *param_values[3];
        char        new_status;
 
        param_values[0] = sysid_str;
-       param_values[1] = NameStr(*dbname);
+       param_values[1] = tlid_str;
+       param_values[2] = NameStr(*dbname);
 
        res = PQexecParams(pgconn,
                           "DELETE FROM bdr.bdr_nodes WHERE node_sysid = $1"
-                          " AND node_dbname = $2 RETURNING node_status",
-                          2, param_types, param_values, NULL, NULL, 0);
-
-       elog(DEBUG2, "bdr %s: deleting bdr_nodes row with id " UINT64_FORMAT
-            " and node_dbname %s ", NameStr(*dbname), sysid, NameStr(*dbname));
+                          " AND node_timeline = $2 AND node_dbname = $3 "
+                          "RETURNING node_status",
+                          3, param_types, param_values, NULL, NULL, 0);
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
@@ -204,10 +212,10 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
             * If prev_status was '\0' we wouldn't be here, so we should've
             * got a returned value.
             */
-           elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
-                UINT64_FORMAT
-                ", dbname='%s' missing, expected row with status=%c",
-                NameStr(*dbname), sysid, NameStr(*dbname), (int)prev_status);
+           elog(FATAL, "bdr %s: bdr.bdr_nodes row for sysid=" UINT64_FORMAT
+                       ", tlid=%u, dbname='%s' missing, expected row with status=%c",
+                NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname),
+                (int)prev_status);
        }
        status_str = PQgetvalue(res, 0, 0);
        Assert(strlen(status_str) == 1);
@@ -215,10 +223,9 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
 
        if (new_status != prev_status)
        {
-           elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
-                UINT64_FORMAT
-                ", dbname='%s' had status=%c, expected status=%c",
-                NameStr(*dbname), sysid, NameStr(*dbname),
+           elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT
+                       ", timeline=%u, dbname='%s' had status=%c, expected status=%c",
+                NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname),
                 (int) new_status, (int) prev_status);
        }
 
@@ -226,29 +233,28 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
    }
    else
    {
-       Oid         param_types[] = {CHAROID, NUMERICOID, TEXTOID};
-       const char *param_values[3];
+       Oid         param_types[] = {CHAROID, TEXTOID, OIDOID, NAMEOID};
+       const char *param_values[4];
        char        new_status;
        char        status_str[2];
 
        snprintf(status_str, 2, "%c", (int)status);
        param_values[0] = status_str;
        param_values[1] = sysid_str;
-       param_values[2] = NameStr(*dbname);
+       param_values[2] = tlid_str;
+       param_values[3] = NameStr(*dbname);
 
        res = PQexecParams(pgconn,
                           "UPDATE bdr.bdr_nodes "
                           "SET node_status = $1 "
-                          "WHERE node_sysid = $2 AND node_dbname = $3 "
+                          "WHERE node_sysid = $2 AND node_timeline = $3 "
+                          "AND node_dbname = $4 "
                           "RETURNING ("
-                          "SELECT node_status FROM bdr.bdr_nodes "
-                          "WHERE node_sysid = $2 AND node_dbname = $3)",
-                          3, param_types, param_values, NULL, NULL, 0);
-
-       elog(DEBUG2, "bdr %s: update row with id "
-            UINT64_FORMAT
-            " and node_dbname %s from %c to %c",
-            NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
+                          "  SELECT node_status FROM bdr.bdr_nodes "
+                          "  WHERE node_sysid = $2 AND node_timeline = $3 "
+                          "  AND node_dbname = $4"
+                          ")",
+                          4, param_types, param_values, NULL, NULL, 0);
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
@@ -268,9 +274,9 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
            {
                elog(FATAL,
                     "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT
-                    ", dbname='%s' had status=%c, expected status=%c",
-                    NameStr(*dbname), sysid, NameStr(*dbname), (int)new_status,
-                    (int)prev_status);
+                    ", timeline=%u, dbname='%s' had status=%c, expected status=%c",
+                    NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname),
+                    (int)new_status, (int)prev_status);
            }
 
            PQclear(res);
@@ -282,13 +288,9 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname,
            PQclear(res);
            res = PQexecParams(pgconn,
                               "INSERT INTO bdr.bdr_nodes"
-                              "    (node_status, node_sysid, node_dbname)"
-                              "    VALUES ($1, $2, $3);",
-                              3, param_types, param_values, NULL, NULL, 0);
-
-           elog(DEBUG2, "bdr %s: insert row with id " UINT64_FORMAT
-                " and node_dbname %s from %c to %c",
-                NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
+                              " (node_status, node_sysid, node_timeline, node_dbname)"
+                              " VALUES ($1, $2, $3, $4);",
+                              4, param_types, param_values, NULL, NULL, 0);
 
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
            {
@@ -710,7 +712,8 @@ bdr_init_replica(Name dbname)
    if (spi_ret != SPI_OK_CONNECT)
        elog(ERROR, "SPI already connected; this shouldn't be possible");
 
-   status = bdr_nodes_get_local_status(GetSystemIdentifier(), dbname);
+   status = bdr_nodes_get_local_status(GetSystemIdentifier(), ThisTimeLineID,
+                                       dbname);
    if (status == 'r')
    {
        /* Already in ready state, nothing more to do */
@@ -755,7 +758,7 @@ bdr_init_replica(Name dbname)
         * We still have to ensure that bdr.bdr_nodes.status is 'r' for this
         * node so that slot creation is permitted.
         */
-       bdr_nodes_set_local_status(GetSystemIdentifier(), dbname, 'r');
+       bdr_nodes_set_local_status(dbname, 'r');
    }
    /*
     * We no longer require the transaction for SPI; further work gets done on
@@ -878,14 +881,15 @@ bdr_init_replica(Name dbname)
        for (off = 0; off < bdr_max_workers; off++)
        {
            BdrWorker              *worker = &BdrWorkerCtl->slots[off];
-           BdrConnectionConfig    *cfg;
 
-           cfg = bdr_connection_configs
-               [worker->worker_data.apply_worker.connection_config_idx];
+           if (worker->worker_type == BDR_WORKER_APPLY)
+           {
+               BdrConnectionConfig * const cfg = bdr_connection_configs
+                   [worker->worker_data.apply_worker.connection_config_idx];
 
-           if (worker->worker_type == BDR_WORKER_APPLY
-               && strcmp(cfg->dbname, NameStr(*dbname)) == 0)
-               my_conn_idxs[n_conns++] = off;
+               if (strcmp(cfg->dbname, NameStr(*dbname)) == 0)
+                   my_conn_idxs[n_conns++] = off;
+           }
        }
        LWLockRelease(BdrWorkerCtl->lock);
 
index f7e24ac868e33b8c7cdb19750e70aea28405955c..0639a70329012fd287cc4db94e038f8e72a9e3dc 100644 (file)
@@ -183,15 +183,16 @@ bdr_ensure_node_ready()
    int spi_ret;
    const uint64 sysid = GetSystemIdentifier();
    char status;
-   HeapTuple tuple;
-   const char *dbname;
+   NameData dbname;
+   char *tmp_dbname;
 
    StartTransactionCommand();
 
-   dbname = get_database_name(MyDatabaseId);
-   if (dbname == NULL)
-       /* Shouldn't happen as logical rep requires a db */
-       elog(ERROR, "Failed to get name of local database");
+   /* We need dbname valid outside this transaction, so copy it */
+   tmp_dbname = get_database_name(MyDatabaseId);
+   strncpy(NameStr(dbname), tmp_dbname, NAMEDATALEN);
+   NameStr(dbname)[NAMEDATALEN-1] = '\0';
+   pfree(tmp_dbname);
 
    /*
     * Refuse to begin replication if the local node isn't yet ready to
@@ -201,7 +202,7 @@ bdr_ensure_node_ready()
    if (spi_ret != SPI_OK_CONNECT)
        elog(ERROR, "Local SPI connect failed; shouldn't happen");
 
-   status = bdr_nodes_get_local_status(sysid, &dbname);
+   status = bdr_nodes_get_local_status(sysid, ThisTimeLineID, &dbname);
 
    SPI_finish();
 
@@ -230,7 +231,7 @@ bdr_ensure_node_ready()
                 */
                ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                        errmsg(base_msg, sysid, dbname,
+                        errmsg(base_msg, sysid, NameStr(dbname),
                                "row missing, bdr not active on this "
                                "database or is initializing."),
                         errhint("Add bdr to shared_preload_libraries and "
@@ -250,7 +251,7 @@ bdr_ensure_node_ready()
                 */
                ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                        errmsg(base_msg, sysid, dbname, "status='c'"
+                        errmsg(base_msg, sysid, NameStr(dbname), "status='c'"
                                ", bdr still starting up: "
                                "catching up from remote node"),
                         errhint("Monitor pg_stat_replication on the "
@@ -267,7 +268,7 @@ bdr_ensure_node_ready()
                 */
                ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                        errmsg(base_msg, sysid, dbname,
+                        errmsg(base_msg, sysid, NameStr(dbname),
                                "status='i', bdr still starting up: applying "
                                "initial dump of remote node"),
                         errhint("Monitor pg_stat_activity and the logs, "
@@ -278,8 +279,6 @@ bdr_ensure_node_ready()
                break;
        }
    }
-
-   pfree(dbname);
 }