From: Craig Ringer Date: Thu, 15 May 2014 09:59:39 +0000 (+0800) Subject: bdr: Add timeline ID to bdr.bdr_nodes, change sysid type to text X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/?a=commitdiff_plain;h=de603a6ddaadf5becf0b46460f324ff55d09abd2;p=users%2Fandresfreund%2Fpostgres.git bdr: Add timeline ID to bdr.bdr_nodes, change sysid type to text The unique identifier for a given bdr node is (sysid, timelineid, dbname), so make sure that's the correct key in bdr.bdr_nodes. --- diff --git a/contrib/bdr/bdr--0.5.sql b/contrib/bdr/bdr--0.5.sql index 6dd30d6edf..03923f5617 100644 --- a/contrib/bdr/bdr--0.5.sql +++ b/contrib/bdr/bdr--0.5.sql @@ -305,10 +305,11 @@ COMMENT ON COLUMN bdr_conflict_history.error_message IS 'On apply error, the err -- who knows why we'd want to) so the PK should be the (dbname, sysid) tuple. -- CREATE TABLE bdr_nodes ( - node_sysid numeric not null, + node_sysid text not null, -- Really a uint64 but we have no type for that + node_timeline oid not null, node_dbname name not null, node_status "char" not null, - primary key(node_sysid, node_dbname), + primary key(node_sysid, node_timeline, node_dbname), check (node_status in ('i', 'c', 'r')) ); REVOKE ALL ON TABLE bdr_nodes FROM PUBLIC; @@ -316,6 +317,7 @@ SELECT pg_catalog.pg_extension_config_dump('bdr_nodes', ''); COMMENT ON TABLE bdr_nodes IS 'All known nodes in this BDR group.'; COMMENT ON COLUMN bdr_nodes.node_sysid IS 'system_identifier from the control file of the node'; +COMMENT ON COLUMN bdr_nodes.node_timeline IS 'timeline ID of this node'; COMMENT ON COLUMN bdr_nodes.node_dbname IS 'local database name on the node'; COMMENT ON COLUMN bdr_nodes.node_status IS 'Readiness of the node: [i]nitializing, [c]atchup, [r]eady. Doesn''t indicate connected/disconnected.'; diff --git a/contrib/bdr/bdr.h b/contrib/bdr/bdr.h index 04b86c1cb7..89bdad69bf 100644 --- a/contrib/bdr/bdr.h +++ b/contrib/bdr/bdr.h @@ -320,8 +320,8 @@ extern void init_bdr_commandfilter(void); extern void bdr_apply_main(Datum main_arg); /* manipulation of bdr catalogs */ -extern char bdr_nodes_get_local_status(uint64 sysid, Name dbname); -extern void bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status); +extern char bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Name dbname); +extern void bdr_nodes_set_local_status(Name dbname, char status); extern Oid GetSysCacheOidError(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4); diff --git a/contrib/bdr/bdr_catalogs.c b/contrib/bdr/bdr_catalogs.c index fc329bd0e0..ea5a27bea1 100644 --- a/contrib/bdr/bdr_catalogs.c +++ b/contrib/bdr/bdr_catalogs.c @@ -50,19 +50,19 @@ GetSysCacheOidError(int cacheId, GetSysCacheOidError(cacheId, key1, key2, 0, 0) /* - * Get the bdr.bdr_nodes status value for the current local node from the local - * database via SPI, if any such row exists. + * Get the bdr.bdr_nodes status value for the specififed node from the local + * bdr.bdr_nodes table via SPI. * * Returns the status value, or '\0' if no such row exists. * * SPI must be initialized, and you must be in a running transaction. */ char -bdr_nodes_get_local_status(uint64 sysid, Name dbname) +bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Name dbname) { int spi_ret; - Oid argtypes[] = { NUMERICOID, NAMEOID }; - Datum values[2]; + Oid argtypes[] = { TEXTOID, OIDOID, NAMEOID }; + Datum values[3]; bool isnull; char status; char sysid_str[33]; @@ -87,15 +87,14 @@ bdr_nodes_get_local_status(uint64 sysid, Name dbname) NameStr(*dbname)), errhint("There is no bdr.bdr_connections entry for this database on the target node or bdr is not in shared_preload_libraries"))); - values[0] = DirectFunctionCall3Coll(numeric_in, InvalidOid, - CStringGetDatum(sysid_str), - InvalidOid, Int32GetDatum(-1)); - values[1] = NameGetDatum(dbname); + values[0] = CStringGetTextDatum(sysid_str); + values[1] = ObjectIdGetDatum(tli); + values[2] = NameGetDatum(dbname); spi_ret = SPI_execute_with_args( "SELECT node_status FROM bdr.bdr_nodes " - "WHERE node_sysid = $1 AND node_dbname = $2", - 2, argtypes, values, NULL, false, 1); + "WHERE node_sysid = $1 AND node_timeline = $2 AND node_dbname = $3", + 3, argtypes, values, NULL, false, 1); if (spi_ret != SPI_OK_SELECT) elog(ERROR, "Unable to query bdr.bdr_nodes, SPI error %d", spi_ret); @@ -119,15 +118,18 @@ bdr_nodes_get_local_status(uint64 sysid, Name dbname) * Unlike bdr_set_remote_status, '\0' may not be passed to delete the row, and * no upsert is performed. This is a simple insert only. * + * Unlike bdr_nodes_get_local_status, only the status of the local node may + * be set. + * * SPI must be initialized, and you must be in a running transaction that is * not bound to any remote node replication state. */ void -bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status) +bdr_nodes_set_local_status(Name dbname, char status) { int spi_ret; - Oid argtypes[] = { CHAROID, NUMERICOID, NAMEOID }; - Datum values[3]; + Oid argtypes[] = { CHAROID, TEXTOID, OIDOID, NAMEOID }; + Datum values[4]; char sysid_str[33]; Assert(status != '\0'); /* Cannot pass \0 to delete */ @@ -135,26 +137,27 @@ bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status) /* Cannot have replication apply state set in this tx */ Assert(replication_origin_id == InvalidRepNodeId); - snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid); + snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, + GetSystemIdentifier()); sysid_str[sizeof(sysid_str)-1] = '\0'; values[0] = CharGetDatum(status); - values[1] = DirectFunctionCall3Coll(numeric_in, InvalidOid, - CStringGetDatum(sysid_str), - InvalidOid, Int32GetDatum(-1)); - values[2] = NameGetDatum(dbname); + values[1] = CStringGetTextDatum(sysid_str); + values[2] = ObjectIdGetDatum(ThisTimeLineID); + values[3] = NameGetDatum(dbname); spi_ret = SPI_execute_with_args( "INSERT INTO bdr.bdr_nodes" - " (node_status, node_sysid, node_dbname)" - " VALUES ($1, $2, $3);", - 3, argtypes, values, NULL, false, 0); + " (node_status, node_sysid, node_timeline, node_dbname)" + " VALUES ($1, $2, $3, $4);", + 4, argtypes, values, NULL, false, 0); if (spi_ret != SPI_OK_INSERT) elog(ERROR, "Unable to insert row (status=%c, node_sysid=" - UINT64_FORMAT ", node_dbname=%s) into bdr.bdr_nodes, " - "SPI error %d", - status, sysid, NameStr(*dbname), spi_ret); + UINT64_FORMAT ", node_timeline=%u, node_dbname=%s) " + "into bdr.bdr_nodes: SPI error %d", + status, GetSystemIdentifier(), ThisTimeLineID, + NameStr(*dbname), spi_ret); } diff --git a/contrib/bdr/bdr_init_replica.c b/contrib/bdr/bdr_init_replica.c index c1e22501c8..59343a42bf 100644 --- a/contrib/bdr/bdr_init_replica.c +++ b/contrib/bdr/bdr_init_replica.c @@ -106,26 +106,30 @@ bdr_get_remote_status(PGconn *pgconn, Name dbname) { PGresult *res; char status; - Oid param_types[] = {NUMERICOID, TEXTOID}; - const char *param_values[2]; + Oid param_types[] = {TEXTOID, OIDOID, NAMEOID}; + const char *param_values[3]; /* Needs to fit max length of UINT64_FORMAT */ char sysid_str[33]; + char tlid_str[33]; snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, GetSystemIdentifier()); sysid_str[sizeof(sysid_str)-1] = '\0'; - param_values[0] = sysid_str; - param_types[0] = NUMERICOID; + snprintf(tlid_str, sizeof(tlid_str), "%u", + ThisTimeLineID); + tlid_str[sizeof(tlid_str)-1] = '\0'; - param_values[1] = NameStr(*dbname); - param_types[1] = TEXTOID; + param_values[0] = sysid_str; + param_values[1] = tlid_str; + param_values[2] = NameStr(*dbname); res = PQexecParams(pgconn, "SELECT node_status FROM bdr.bdr_nodes " - "WHERE node_sysid = $1 AND node_dbname = $2 " + "WHERE node_sysid = $1 AND node_timeline = $2 " + "AND node_dbname = $3 " "FOR UPDATE", - 2, param_types, param_values, NULL, NULL, 0); + 3, param_types, param_values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { elog(FATAL, "bdr %s: Failed to get remote status during bdr init: " @@ -166,6 +170,7 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, const uint64 sysid = GetSystemIdentifier(); /* Needs to fit max length of UINT64_FORMAT */ char sysid_str[33]; + char tlid_str[33]; if (status == prev_status) /* No action required (we could check the remote, but meh) */ @@ -175,22 +180,25 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, GetSystemIdentifier()); sysid_str[sizeof(sysid_str)-1] = '\0'; + snprintf(tlid_str, sizeof(tlid_str), "%u", + ThisTimeLineID); + tlid_str[sizeof(tlid_str)-1] = '\0'; + if (status == '\0') { - Oid param_types[] = {NUMERICOID, TEXTOID}; - const char *param_values[2]; + Oid param_types[] = {TEXTOID, OIDOID, NAMEOID}; + const char *param_values[3]; char new_status; param_values[0] = sysid_str; - param_values[1] = NameStr(*dbname); + param_values[1] = tlid_str; + param_values[2] = NameStr(*dbname); res = PQexecParams(pgconn, "DELETE FROM bdr.bdr_nodes WHERE node_sysid = $1" - " AND node_dbname = $2 RETURNING node_status", - 2, param_types, param_values, NULL, NULL, 0); - - elog(DEBUG2, "bdr %s: deleting bdr_nodes row with id " UINT64_FORMAT - " and node_dbname %s ", NameStr(*dbname), sysid, NameStr(*dbname)); + " AND node_timeline = $2 AND node_dbname = $3 " + "RETURNING node_status", + 3, param_types, param_values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -204,10 +212,10 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, * If prev_status was '\0' we wouldn't be here, so we should've * got a returned value. */ - elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" - UINT64_FORMAT - ", dbname='%s' missing, expected row with status=%c", - NameStr(*dbname), sysid, NameStr(*dbname), (int)prev_status); + elog(FATAL, "bdr %s: bdr.bdr_nodes row for sysid=" UINT64_FORMAT + ", tlid=%u, dbname='%s' missing, expected row with status=%c", + NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname), + (int)prev_status); } status_str = PQgetvalue(res, 0, 0); Assert(strlen(status_str) == 1); @@ -215,10 +223,9 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, if (new_status != prev_status) { - elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" - UINT64_FORMAT - ", dbname='%s' had status=%c, expected status=%c", - NameStr(*dbname), sysid, NameStr(*dbname), + elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT + ", timeline=%u, dbname='%s' had status=%c, expected status=%c", + NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname), (int) new_status, (int) prev_status); } @@ -226,29 +233,28 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, } else { - Oid param_types[] = {CHAROID, NUMERICOID, TEXTOID}; - const char *param_values[3]; + Oid param_types[] = {CHAROID, TEXTOID, OIDOID, NAMEOID}; + const char *param_values[4]; char new_status; char status_str[2]; snprintf(status_str, 2, "%c", (int)status); param_values[0] = status_str; param_values[1] = sysid_str; - param_values[2] = NameStr(*dbname); + param_values[2] = tlid_str; + param_values[3] = NameStr(*dbname); res = PQexecParams(pgconn, "UPDATE bdr.bdr_nodes " "SET node_status = $1 " - "WHERE node_sysid = $2 AND node_dbname = $3 " + "WHERE node_sysid = $2 AND node_timeline = $3 " + "AND node_dbname = $4 " "RETURNING (" - "SELECT node_status FROM bdr.bdr_nodes " - "WHERE node_sysid = $2 AND node_dbname = $3)", - 3, param_types, param_values, NULL, NULL, 0); - - elog(DEBUG2, "bdr %s: update row with id " - UINT64_FORMAT - " and node_dbname %s from %c to %c", - NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status); + " SELECT node_status FROM bdr.bdr_nodes " + " WHERE node_sysid = $2 AND node_timeline = $3 " + " AND node_dbname = $4" + ")", + 4, param_types, param_values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -268,9 +274,9 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, { elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT - ", dbname='%s' had status=%c, expected status=%c", - NameStr(*dbname), sysid, NameStr(*dbname), (int)new_status, - (int)prev_status); + ", timeline=%u, dbname='%s' had status=%c, expected status=%c", + NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname), + (int)new_status, (int)prev_status); } PQclear(res); @@ -282,13 +288,9 @@ bdr_set_remote_status(PGconn *pgconn, Name dbname, PQclear(res); res = PQexecParams(pgconn, "INSERT INTO bdr.bdr_nodes" - " (node_status, node_sysid, node_dbname)" - " VALUES ($1, $2, $3);", - 3, param_types, param_values, NULL, NULL, 0); - - elog(DEBUG2, "bdr %s: insert row with id " UINT64_FORMAT - " and node_dbname %s from %c to %c", - NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status); + " (node_status, node_sysid, node_timeline, node_dbname)" + " VALUES ($1, $2, $3, $4);", + 4, param_types, param_values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -710,7 +712,8 @@ bdr_init_replica(Name dbname) if (spi_ret != SPI_OK_CONNECT) elog(ERROR, "SPI already connected; this shouldn't be possible"); - status = bdr_nodes_get_local_status(GetSystemIdentifier(), dbname); + status = bdr_nodes_get_local_status(GetSystemIdentifier(), ThisTimeLineID, + dbname); if (status == 'r') { /* Already in ready state, nothing more to do */ @@ -755,7 +758,7 @@ bdr_init_replica(Name dbname) * We still have to ensure that bdr.bdr_nodes.status is 'r' for this * node so that slot creation is permitted. */ - bdr_nodes_set_local_status(GetSystemIdentifier(), dbname, 'r'); + bdr_nodes_set_local_status(dbname, 'r'); } /* * We no longer require the transaction for SPI; further work gets done on @@ -878,14 +881,15 @@ bdr_init_replica(Name dbname) for (off = 0; off < bdr_max_workers; off++) { BdrWorker *worker = &BdrWorkerCtl->slots[off]; - BdrConnectionConfig *cfg; - cfg = bdr_connection_configs - [worker->worker_data.apply_worker.connection_config_idx]; + if (worker->worker_type == BDR_WORKER_APPLY) + { + BdrConnectionConfig * const cfg = bdr_connection_configs + [worker->worker_data.apply_worker.connection_config_idx]; - if (worker->worker_type == BDR_WORKER_APPLY - && strcmp(cfg->dbname, NameStr(*dbname)) == 0) - my_conn_idxs[n_conns++] = off; + if (strcmp(cfg->dbname, NameStr(*dbname)) == 0) + my_conn_idxs[n_conns++] = off; + } } LWLockRelease(BdrWorkerCtl->lock); diff --git a/contrib/bdr/bdr_output.c b/contrib/bdr/bdr_output.c index f7e24ac868..0639a70329 100644 --- a/contrib/bdr/bdr_output.c +++ b/contrib/bdr/bdr_output.c @@ -183,15 +183,16 @@ bdr_ensure_node_ready() int spi_ret; const uint64 sysid = GetSystemIdentifier(); char status; - HeapTuple tuple; - const char *dbname; + NameData dbname; + char *tmp_dbname; StartTransactionCommand(); - dbname = get_database_name(MyDatabaseId); - if (dbname == NULL) - /* Shouldn't happen as logical rep requires a db */ - elog(ERROR, "Failed to get name of local database"); + /* We need dbname valid outside this transaction, so copy it */ + tmp_dbname = get_database_name(MyDatabaseId); + strncpy(NameStr(dbname), tmp_dbname, NAMEDATALEN); + NameStr(dbname)[NAMEDATALEN-1] = '\0'; + pfree(tmp_dbname); /* * Refuse to begin replication if the local node isn't yet ready to @@ -201,7 +202,7 @@ bdr_ensure_node_ready() if (spi_ret != SPI_OK_CONNECT) elog(ERROR, "Local SPI connect failed; shouldn't happen"); - status = bdr_nodes_get_local_status(sysid, &dbname); + status = bdr_nodes_get_local_status(sysid, ThisTimeLineID, &dbname); SPI_finish(); @@ -230,7 +231,7 @@ bdr_ensure_node_ready() */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg(base_msg, sysid, dbname, + errmsg(base_msg, sysid, NameStr(dbname), "row missing, bdr not active on this " "database or is initializing."), errhint("Add bdr to shared_preload_libraries and " @@ -250,7 +251,7 @@ bdr_ensure_node_ready() */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg(base_msg, sysid, dbname, "status='c'" + errmsg(base_msg, sysid, NameStr(dbname), "status='c'" ", bdr still starting up: " "catching up from remote node"), errhint("Monitor pg_stat_replication on the " @@ -267,7 +268,7 @@ bdr_ensure_node_ready() */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg(base_msg, sysid, dbname, + errmsg(base_msg, sysid, NameStr(dbname), "status='i', bdr still starting up: applying " "initial dump of remote node"), errhint("Monitor pg_stat_activity and the logs, " @@ -278,8 +279,6 @@ bdr_ensure_node_ready() break; } } - - pfree(dbname); }