GetSysCacheOidError(cacheId, key1, key2, 0, 0)
/*
- * Get the bdr.bdr_nodes status value for the current local node from the local
- * database via SPI, if any such row exists.
+ * Get the bdr.bdr_nodes status value for the specififed node from the local
+ * bdr.bdr_nodes table via SPI.
*
* Returns the status value, or '\0' if no such row exists.
*
* SPI must be initialized, and you must be in a running transaction.
*/
char
-bdr_nodes_get_local_status(uint64 sysid, Name dbname)
+bdr_nodes_get_local_status(uint64 sysid, TimeLineID tli, Name dbname)
{
int spi_ret;
- Oid argtypes[] = { NUMERICOID, NAMEOID };
- Datum values[2];
+ Oid argtypes[] = { TEXTOID, OIDOID, NAMEOID };
+ Datum values[3];
bool isnull;
char status;
char sysid_str[33];
NameStr(*dbname)),
errhint("There is no bdr.bdr_connections entry for this database on the target node or bdr is not in shared_preload_libraries")));
- values[0] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
- CStringGetDatum(sysid_str),
- InvalidOid, Int32GetDatum(-1));
- values[1] = NameGetDatum(dbname);
+ values[0] = CStringGetTextDatum(sysid_str);
+ values[1] = ObjectIdGetDatum(tli);
+ values[2] = NameGetDatum(dbname);
spi_ret = SPI_execute_with_args(
"SELECT node_status FROM bdr.bdr_nodes "
- "WHERE node_sysid = $1 AND node_dbname = $2",
- 2, argtypes, values, NULL, false, 1);
+ "WHERE node_sysid = $1 AND node_timeline = $2 AND node_dbname = $3",
+ 3, argtypes, values, NULL, false, 1);
if (spi_ret != SPI_OK_SELECT)
elog(ERROR, "Unable to query bdr.bdr_nodes, SPI error %d", spi_ret);
* Unlike bdr_set_remote_status, '\0' may not be passed to delete the row, and
* no upsert is performed. This is a simple insert only.
*
+ * Unlike bdr_nodes_get_local_status, only the status of the local node may
+ * be set.
+ *
* SPI must be initialized, and you must be in a running transaction that is
* not bound to any remote node replication state.
*/
void
-bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status)
+bdr_nodes_set_local_status(Name dbname, char status)
{
int spi_ret;
- Oid argtypes[] = { CHAROID, NUMERICOID, NAMEOID };
- Datum values[3];
+ Oid argtypes[] = { CHAROID, TEXTOID, OIDOID, NAMEOID };
+ Datum values[4];
char sysid_str[33];
Assert(status != '\0'); /* Cannot pass \0 to delete */
/* Cannot have replication apply state set in this tx */
Assert(replication_origin_id == InvalidRepNodeId);
- snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid);
+ snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT,
+ GetSystemIdentifier());
sysid_str[sizeof(sysid_str)-1] = '\0';
values[0] = CharGetDatum(status);
- values[1] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
- CStringGetDatum(sysid_str),
- InvalidOid, Int32GetDatum(-1));
- values[2] = NameGetDatum(dbname);
+ values[1] = CStringGetTextDatum(sysid_str);
+ values[2] = ObjectIdGetDatum(ThisTimeLineID);
+ values[3] = NameGetDatum(dbname);
spi_ret = SPI_execute_with_args(
"INSERT INTO bdr.bdr_nodes"
- " (node_status, node_sysid, node_dbname)"
- " VALUES ($1, $2, $3);",
- 3, argtypes, values, NULL, false, 0);
+ " (node_status, node_sysid, node_timeline, node_dbname)"
+ " VALUES ($1, $2, $3, $4);",
+ 4, argtypes, values, NULL, false, 0);
if (spi_ret != SPI_OK_INSERT)
elog(ERROR, "Unable to insert row (status=%c, node_sysid="
- UINT64_FORMAT ", node_dbname=%s) into bdr.bdr_nodes, "
- "SPI error %d",
- status, sysid, NameStr(*dbname), spi_ret);
+ UINT64_FORMAT ", node_timeline=%u, node_dbname=%s) "
+ "into bdr.bdr_nodes: SPI error %d",
+ status, GetSystemIdentifier(), ThisTimeLineID,
+ NameStr(*dbname), spi_ret);
}
{
PGresult *res;
char status;
- Oid param_types[] = {NUMERICOID, TEXTOID};
- const char *param_values[2];
+ Oid param_types[] = {TEXTOID, OIDOID, NAMEOID};
+ const char *param_values[3];
/* Needs to fit max length of UINT64_FORMAT */
char sysid_str[33];
+ char tlid_str[33];
snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT,
GetSystemIdentifier());
sysid_str[sizeof(sysid_str)-1] = '\0';
- param_values[0] = sysid_str;
- param_types[0] = NUMERICOID;
+ snprintf(tlid_str, sizeof(tlid_str), "%u",
+ ThisTimeLineID);
+ tlid_str[sizeof(tlid_str)-1] = '\0';
- param_values[1] = NameStr(*dbname);
- param_types[1] = TEXTOID;
+ param_values[0] = sysid_str;
+ param_values[1] = tlid_str;
+ param_values[2] = NameStr(*dbname);
res = PQexecParams(pgconn,
"SELECT node_status FROM bdr.bdr_nodes "
- "WHERE node_sysid = $1 AND node_dbname = $2 "
+ "WHERE node_sysid = $1 AND node_timeline = $2 "
+ "AND node_dbname = $3 "
"FOR UPDATE",
- 2, param_types, param_values, NULL, NULL, 0);
+ 3, param_types, param_values, NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(FATAL, "bdr %s: Failed to get remote status during bdr init: "
const uint64 sysid = GetSystemIdentifier();
/* Needs to fit max length of UINT64_FORMAT */
char sysid_str[33];
+ char tlid_str[33];
if (status == prev_status)
/* No action required (we could check the remote, but meh) */
GetSystemIdentifier());
sysid_str[sizeof(sysid_str)-1] = '\0';
+ snprintf(tlid_str, sizeof(tlid_str), "%u",
+ ThisTimeLineID);
+ tlid_str[sizeof(tlid_str)-1] = '\0';
+
if (status == '\0')
{
- Oid param_types[] = {NUMERICOID, TEXTOID};
- const char *param_values[2];
+ Oid param_types[] = {TEXTOID, OIDOID, NAMEOID};
+ const char *param_values[3];
char new_status;
param_values[0] = sysid_str;
- param_values[1] = NameStr(*dbname);
+ param_values[1] = tlid_str;
+ param_values[2] = NameStr(*dbname);
res = PQexecParams(pgconn,
"DELETE FROM bdr.bdr_nodes WHERE node_sysid = $1"
- " AND node_dbname = $2 RETURNING node_status",
- 2, param_types, param_values, NULL, NULL, 0);
-
- elog(DEBUG2, "bdr %s: deleting bdr_nodes row with id " UINT64_FORMAT
- " and node_dbname %s ", NameStr(*dbname), sysid, NameStr(*dbname));
+ " AND node_timeline = $2 AND node_dbname = $3 "
+ "RETURNING node_status",
+ 3, param_types, param_values, NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
* If prev_status was '\0' we wouldn't be here, so we should've
* got a returned value.
*/
- elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
- UINT64_FORMAT
- ", dbname='%s' missing, expected row with status=%c",
- NameStr(*dbname), sysid, NameStr(*dbname), (int)prev_status);
+ elog(FATAL, "bdr %s: bdr.bdr_nodes row for sysid=" UINT64_FORMAT
+ ", tlid=%u, dbname='%s' missing, expected row with status=%c",
+ NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname),
+ (int)prev_status);
}
status_str = PQgetvalue(res, 0, 0);
Assert(strlen(status_str) == 1);
if (new_status != prev_status)
{
- elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid="
- UINT64_FORMAT
- ", dbname='%s' had status=%c, expected status=%c",
- NameStr(*dbname), sysid, NameStr(*dbname),
+ elog(FATAL, "bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT
+ ", timeline=%u, dbname='%s' had status=%c, expected status=%c",
+ NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname),
(int) new_status, (int) prev_status);
}
}
else
{
- Oid param_types[] = {CHAROID, NUMERICOID, TEXTOID};
- const char *param_values[3];
+ Oid param_types[] = {CHAROID, TEXTOID, OIDOID, NAMEOID};
+ const char *param_values[4];
char new_status;
char status_str[2];
snprintf(status_str, 2, "%c", (int)status);
param_values[0] = status_str;
param_values[1] = sysid_str;
- param_values[2] = NameStr(*dbname);
+ param_values[2] = tlid_str;
+ param_values[3] = NameStr(*dbname);
res = PQexecParams(pgconn,
"UPDATE bdr.bdr_nodes "
"SET node_status = $1 "
- "WHERE node_sysid = $2 AND node_dbname = $3 "
+ "WHERE node_sysid = $2 AND node_timeline = $3 "
+ "AND node_dbname = $4 "
"RETURNING ("
- "SELECT node_status FROM bdr.bdr_nodes "
- "WHERE node_sysid = $2 AND node_dbname = $3)",
- 3, param_types, param_values, NULL, NULL, 0);
-
- elog(DEBUG2, "bdr %s: update row with id "
- UINT64_FORMAT
- " and node_dbname %s from %c to %c",
- NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
+ " SELECT node_status FROM bdr.bdr_nodes "
+ " WHERE node_sysid = $2 AND node_timeline = $3 "
+ " AND node_dbname = $4"
+ ")",
+ 4, param_types, param_values, NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
{
elog(FATAL,
"bdr %s: bdr.bdr_nodes row for node_sysid=" UINT64_FORMAT
- ", dbname='%s' had status=%c, expected status=%c",
- NameStr(*dbname), sysid, NameStr(*dbname), (int)new_status,
- (int)prev_status);
+ ", timeline=%u, dbname='%s' had status=%c, expected status=%c",
+ NameStr(*dbname), sysid, ThisTimeLineID, NameStr(*dbname),
+ (int)new_status, (int)prev_status);
}
PQclear(res);
PQclear(res);
res = PQexecParams(pgconn,
"INSERT INTO bdr.bdr_nodes"
- " (node_status, node_sysid, node_dbname)"
- " VALUES ($1, $2, $3);",
- 3, param_types, param_values, NULL, NULL, 0);
-
- elog(DEBUG2, "bdr %s: insert row with id " UINT64_FORMAT
- " and node_dbname %s from %c to %c",
- NameStr(*dbname), sysid, NameStr(*dbname), prev_status, status);
+ " (node_status, node_sysid, node_timeline, node_dbname)"
+ " VALUES ($1, $2, $3, $4);",
+ 4, param_types, param_values, NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
if (spi_ret != SPI_OK_CONNECT)
elog(ERROR, "SPI already connected; this shouldn't be possible");
- status = bdr_nodes_get_local_status(GetSystemIdentifier(), dbname);
+ status = bdr_nodes_get_local_status(GetSystemIdentifier(), ThisTimeLineID,
+ dbname);
if (status == 'r')
{
/* Already in ready state, nothing more to do */
* We still have to ensure that bdr.bdr_nodes.status is 'r' for this
* node so that slot creation is permitted.
*/
- bdr_nodes_set_local_status(GetSystemIdentifier(), dbname, 'r');
+ bdr_nodes_set_local_status(dbname, 'r');
}
/*
* We no longer require the transaction for SPI; further work gets done on
for (off = 0; off < bdr_max_workers; off++)
{
BdrWorker *worker = &BdrWorkerCtl->slots[off];
- BdrConnectionConfig *cfg;
- cfg = bdr_connection_configs
- [worker->worker_data.apply_worker.connection_config_idx];
+ if (worker->worker_type == BDR_WORKER_APPLY)
+ {
+ BdrConnectionConfig * const cfg = bdr_connection_configs
+ [worker->worker_data.apply_worker.connection_config_idx];
- if (worker->worker_type == BDR_WORKER_APPLY
- && strcmp(cfg->dbname, NameStr(*dbname)) == 0)
- my_conn_idxs[n_conns++] = off;
+ if (strcmp(cfg->dbname, NameStr(*dbname)) == 0)
+ my_conn_idxs[n_conns++] = off;
+ }
}
LWLockRelease(BdrWorkerCtl->lock);
int spi_ret;
const uint64 sysid = GetSystemIdentifier();
char status;
- HeapTuple tuple;
- const char *dbname;
+ NameData dbname;
+ char *tmp_dbname;
StartTransactionCommand();
- dbname = get_database_name(MyDatabaseId);
- if (dbname == NULL)
- /* Shouldn't happen as logical rep requires a db */
- elog(ERROR, "Failed to get name of local database");
+ /* We need dbname valid outside this transaction, so copy it */
+ tmp_dbname = get_database_name(MyDatabaseId);
+ strncpy(NameStr(dbname), tmp_dbname, NAMEDATALEN);
+ NameStr(dbname)[NAMEDATALEN-1] = '\0';
+ pfree(tmp_dbname);
/*
* Refuse to begin replication if the local node isn't yet ready to
if (spi_ret != SPI_OK_CONNECT)
elog(ERROR, "Local SPI connect failed; shouldn't happen");
- status = bdr_nodes_get_local_status(sysid, &dbname);
+ status = bdr_nodes_get_local_status(sysid, ThisTimeLineID, &dbname);
SPI_finish();
*/
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg(base_msg, sysid, dbname,
+ errmsg(base_msg, sysid, NameStr(dbname),
"row missing, bdr not active on this "
"database or is initializing."),
errhint("Add bdr to shared_preload_libraries and "
*/
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg(base_msg, sysid, dbname, "status='c'"
+ errmsg(base_msg, sysid, NameStr(dbname), "status='c'"
", bdr still starting up: "
"catching up from remote node"),
errhint("Monitor pg_stat_replication on the "
*/
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg(base_msg, sysid, dbname,
+ errmsg(base_msg, sysid, NameStr(dbname),
"status='i', bdr still starting up: applying "
"initial dump of remote node"),
errhint("Monitor pg_stat_activity and the logs, "
break;
}
}
-
- pfree(dbname);
}