CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
bdr_saved_resowner = CurrentResourceOwner;
- /* Do we need to init the local DB from a remote node? */
+ /*
+ * Do we need to init the local DB from a remote node?
+ *
+ * Checks bdr.bdr_nodes.status, does any remote initialization required if
+ * there's an init_replica connection, and ensures that
+ * bdr.bdr_nodes.status=r for our entry before continuing.
+ */
bdr_init_replica(&bdr_perdb_worker->dbname);
elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname));
/* background workers */
extern void bdr_apply_main(Datum main_arg);
+/* manipulation of bdr catalogs */
+extern char bdr_nodes_get_local_status(uint64 sysid, Name dbname);
+extern void bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status);
+
/* helpers shared by multiple worker types */
extern struct pg_conn *
bdr_connect(char *conninfo_repl,
--- /dev/null
+/* -------------------------------------------------------------------------
+ *
+ * bdr_catalogs.c
+ * Access to bdr catalog information like bdr.bdr_nodes
+ *
+ * Functions usable by both the output plugin and the extension/workers for
+ * accessing and manipulating BDR's catalogs, like bdr.bdr_nodes.
+ *
+ * Copyright (C) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/bdr/bdr.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "bdr.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_type.h"
+
+#include "executor/spi.h"
+
+#include "utils/builtins.h"
+
+/*
+ * Get the bdr.bdr_nodes status value for the current local node from the local
+ * database via SPI, if any such row exists.
+ *
+ * Returns the status value, or '\0' if no such row exists.
+ *
+ * SPI must be initialized, and you must be in a running transaction.
+ */
+char
+bdr_nodes_get_local_status(uint64 sysid, Name dbname)
+{
+ int spi_ret;
+ Oid argtypes[] = { NUMERICOID, NAMEOID };
+ Datum values[2];
+ bool isnull;
+ char status;
+ char sysid_str[33];
+
+ Assert(IsTransactionState());
+
+ snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid);
+ sysid_str[sizeof(sysid_str)-1] = '\0';
+
+ values[0] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
+ CStringGetDatum(sysid_str),
+ InvalidOid, Int32GetDatum(-1));
+ values[1] = NameGetDatum(dbname);
+
+ spi_ret = SPI_execute_with_args(
+ "SELECT node_status FROM bdr.bdr_nodes "
+ "WHERE node_sysid = $1 AND node_dbname = $2",
+ 2, argtypes, values, NULL, false, 1);
+
+ if (spi_ret != SPI_OK_SELECT)
+ elog(ERROR, "Unable to query bdr.bdr_nodes, SPI error %d", spi_ret);
+
+ if (SPI_processed == 0)
+ return '\0';
+
+ status = DatumGetChar(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
+ &isnull));
+
+ if (isnull)
+ elog(ERROR, "bdr.bdr_nodes.status NULL; shouldn't happen");
+
+ return status;
+}
+
+/*
+ * Insert a row for the local node's (sysid,dbname) with the passed status into
+ * bdr.bdr_nodes. No existing row for this key may exist.
+ *
+ * Unlike bdr_set_remote_status, '\0' may not be passed to delete the row, and
+ * no upsert is performed. This is a simple insert only.
+ *
+ * SPI must be initialized, and you must be in a running transaction that is
+ * not bound to any remote node replication state.
+ */
+void
+bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status)
+{
+ int spi_ret;
+ Oid argtypes[] = { CHAROID, NUMERICOID, NAMEOID };
+ Datum values[3];
+ char sysid_str[33];
+
+ Assert(status != '\0'); /* Cannot pass \0 to delete */
+ Assert(IsTransactionState());
+ /* Cannot have replication apply state set in this tx */
+ Assert(replication_origin_id == InvalidRepNodeId);
+
+ snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid);
+ sysid_str[sizeof(sysid_str)-1] = '\0';
+
+ values[0] = CharGetDatum(status);
+ values[1] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
+ CStringGetDatum(sysid_str),
+ InvalidOid, Int32GetDatum(-1));
+ values[2] = NameGetDatum(dbname);
+
+ spi_ret = SPI_execute_with_args(
+ "INSERT INTO bdr.bdr_nodes"
+ " (node_status, node_sysid, node_dbname)"
+ " VALUES ($1, $2, $3);",
+ 3, argtypes, values, NULL, false, 0);
+
+ if (spi_ret != SPI_OK_INSERT)
+ elog(ERROR, "Unable to insert row (status=%c, node_sysid="
+ UINT64_FORMAT ", node_dbname=%s) into bdr.bdr_nodes, "
+ "SPI error %d",
+ status, sysid, NameStr(*dbname), spi_ret);
+}
#include "catalog/pg_type.h"
+#include "executor/spi.h"
+
#include "replication/replication_identifier.h"
#include "replication/walreceiver.h"
StringInfoData query;
BdrWorker *init_replica_worker;
BdrConnectionConfig *init_replica_config;
+ int spi_ret;
initStringInfo(&query);
elog(DEBUG2, "bdr %s: bdr_init_replica",
NameStr(*dbname));
+ /*
+ * The local SPI transaction we're about to perform must do any writes as a
+ * local transaction, not as a changeset application from a remote node.
+ * That allows rows to be repliated to other nodes. So no replication_origin_id
+ * may be set.
+ */
+ Assert(replication_origin_id == InvalidRepNodeId);
+
+ /*
+ * Check the local bdr.bdr_nodes over SPI or direct scan to see if
+ * there's an entry for ourselves in ready mode already.
+ *
+ * Note that we don't have to explicitly SPI_finish(...) on error paths;
+ * that's taken care of for us.
+ */
+ StartTransactionCommand();
+ spi_ret = SPI_connect();
+ if (spi_ret != SPI_OK_CONNECT)
+ elog(ERROR, "SPI already connected; this shouldn't be possible");
+
+ status = bdr_nodes_get_local_status(GetSystemIdentifier(), dbname);
+ if (status == 'r')
+ {
+ /* Already in ready state, nothing more to do */
+ SPI_finish();
+ CommitTransactionCommand();
+ return;
+ }
+
/*
* Before starting workers we must determine if we need to copy
* initial state from a remote node. This is only necessary if
LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
init_replica_worker = find_init_replica_worker(dbname);
LWLockRelease(BdrWorkerCtl->lock);
- /* No connections have init_replica=t, nothing to do */
if (!init_replica_worker)
{
- elog(DEBUG2, "bdr %s: nothing to do in bdr_init_replica",
- NameStr(*dbname));
- return;
+ if (status != '\0')
+ {
+ /*
+ * Even though there's no init_replica worker, the local bdr.bdr_nodes table
+ * has an entry for our (sysid,dbname) and it isn't status=r (checked above),
+ * we must've had an init_replica configured before, then removed.
+ */
+ ereport(ERROR, (errmsg("bdr.bdr_nodes row with (sysid="
+ UINT64_FORMAT ", dbname=%s) exists and has status=%c, but "
+ "no connection with init_replica=t is configured for this "
+ "database. ",
+ GetSystemIdentifier(), NameStr(*dbname), status),
+ errdetail("You probably configured initial setup with "
+ "init_replica on a connection, then removed or changed that "
+ "connection before setup completed properly. "),
+ errhint("DROP and re-create the database if it has no "
+ "existing content of value, or add the init_replica setting "
+ "to one of the connections.")));
+ }
+ /*
+ * No connections have init_replica=t, so there's no remote copy to do.
+ * We still have to ensure that bdr.bdr_nodes.status is 'r' for this
+ * node so that slot creation is permitted.
+ */
+ bdr_nodes_set_local_status(GetSystemIdentifier(), dbname, 'r');
}
+ /*
+ * We no longer require the transaction for SPI; further work gets done on
+ * the remote machine's bdr.bdr_nodes table and replicated back to us via
+ * pg_dump/pg_restore, or over the walsender protocol once we start
+ * replay. If we aren't just about to exit anyway.
+ */
+ SPI_finish();
+ CommitTransactionCommand();
+
+ if (!init_replica_worker)
+ /* Cleanup done and nothing more to do */
+ return;
+
init_replica_config = bdr_connection_configs
[init_replica_worker->worker_data.apply_worker.connection_config_idx];
elog(DEBUG2, "bdr %s: bdr_init_replica init from connection %s",
NameStr(*dbname), NameStr(init_replica_config->name));
- /*
- * Check the local bdr.bdr_nodes over SPI or direct scan to see if
- * there's an entry for ourselves in ready mode already.
- *
- * This is an optimisation we don't need to do yet...
- */
- /*TODO
- (status, min_remote_lsn) = get_node_status_from_local();
- if (status == 'r')
- return;
- */
-
/*
* Test to see if there's an entry in the remote's bdr.bdr_nodes for our
* system identifier. If there is, that'll tell us what stage of startup
#include "postgres.h"
#include "bdr.h"
+#include "miscadmin.h"
#include "access/sysattr.h"
#include "access/tuptoaster.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
param)));
}
+/*
+ * Check bdr.bdr_nodes entry in local DB and if status != r,
+ * raise an error.
+ *
+ * If this function returns it's safe to begin replay.
+ */
+static void
+bdr_ensure_node_ready()
+{
+ int spi_ret;
+ const uint64 sysid = GetSystemIdentifier();
+ char status;
+ HeapTuple tuple;
+ NameData dbname;
+
+ StartTransactionCommand();
+
+ tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "Could not get name of local DB");
+ namecpy( &dbname, &((Form_pg_database) GETSTRUCT(tuple))->datname );
+ ReleaseSysCache(tuple);
+
+ /*
+ * Refuse to begin replication if the local node isn't yet ready to
+ * send data. Check the status in bdr.bdr_nodes.
+ */
+ spi_ret = SPI_connect();
+ if (spi_ret != SPI_OK_CONNECT)
+ elog(ERROR, "Local SPI connect failed; shouldn't happen");
+
+ status = bdr_nodes_get_local_status(sysid, &dbname);
+
+ SPI_finish();
+
+ CommitTransactionCommand();
+
+ /* Complain if node isn't ready. */
+ /* TODO: Allow soft error so caller can sleep and recheck? */
+ if (status != 'r')
+ {
+ const char * const base_msg =
+ "bdr.bdr_nodes entry for local node (sysid=" UINT64_FORMAT
+ ", dbname=%s): %s";
+ switch (status)
+ {
+ case 'r':
+ break; /* unreachable */
+ case '\0':
+ /*
+ * Can't allow replay when BDR hasn't started yet, as
+ * replica init might still need to run, causing a dump to
+ * be applied, catchup from a remote node, etc.
+ *
+ * If there's no init_replica set, the bdr extension will
+ * create a bdr.bdr_nodes entry with 'r' state shortly
+ * after it starts, so we won't hit this.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg(base_msg, sysid, NameStr(dbname),
+ "row missing, bdr not active on this "
+ "database or is initializing."),
+ errhint("Add bdr to shared_preload_libraries and "
+ "check logs for bdr startup errors.")));
+ break;
+ case 'c':
+ /*
+ * Can't allow replay while still catching up. We get the
+ * real origin node ID and LSN over the protocol in catchup
+ * mode, but changes are written to WAL with the ID and LSN
+ * of the immediate origin node. So if we cascade them to
+ * another node now, they'll incorrectly see the immediate
+ * origin node ID and LSN, not the true original ones.
+ *
+ * It should be possible to lift this restriction later,
+ * if we write the original node id and lsn in WAL.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg(base_msg, sysid, NameStr(dbname), "status='c'"
+ ", bdr still starting up: "
+ "catching up from remote node"),
+ errhint("Monitor pg_stat_replication on the "
+ "remote node, watch the logs and wait "
+ "until the node has caught up")));
+ break;
+ case 'i':
+ /*
+ * Can't allow replay while still applying a dump because the
+ * origin_id and origin_lsn are not preserved on the dump, so
+ * we'd replay all the changes. If the connections are from
+ * nodes that already have that data (or the origin node),
+ * that'll create a right mess.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg(base_msg, sysid, NameStr(dbname),
+ "status='i', bdr still starting up: applying "
+ "initial dump of remote node"),
+ errhint("Monitor pg_stat_activity and the logs, "
+ "wait until the node has caught up")));
+ break;
+ default:
+ elog(ERROR, "Unhandled case status=%c", status);
+ break;
+ }
+ }
+}
+
/* initialize this plugin */
static void
if (data->client_pg_version / 100 != PG_VERSION_NUM / 100)
data->allow_sendrecv_protocol = false;
-
if (!IsTransactionState())
{
tx_started = false;
if (!tx_started)
CommitTransactionCommand();
+
+ /*
+ * Make sure it's safe to begin playing changes to the remote end.
+ * This'll ERROR out if we're not ready.
+ */
+ bdr_ensure_node_ready();
}
}
# contrib/bdr/output.mk
MODULE_big = bdr_output
-OBJS = bdr_compat.o bdr_output.o
+OBJS = bdr_compat.o bdr_catalogs.o bdr_output.o
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK = $(libpq)
MODULE_big = bdr
OBJS = bdr.o bdr_apply.o bdr_compat.o bdr_commandfilter.o bdr_count.o \
bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o \
- bdr_conflict_logging.o bdr_executor.o
+ bdr_conflict_logging.o bdr_executor.o bdr_catalogs.o
EXTENSION = bdr
DATA = bdr--0.5.sql
# so is bdr
my $bdr_output = $solution->AddProject('bdr_output', 'dll', 'misc');
- $bdr_output->AddFiles('contrib\bdr', 'bdr_compat.c', 'bdr_output.c');
+ $bdr_output->AddFiles('contrib\bdr', 'bdr_compat.c', 'bdr_output.c',
+ 'bdr_catalogs.c');
$bdr_output->AddReference($postgres);
$bdr_output->AddLibrary('wsock32.lib');
'bdr_commandfilter.c', 'bdr_compat.c',
'bdr_count.c', 'bdr_seq.c', 'bdr_init_replica.c',
'bdr_relcache.c', 'bdr_conflict_handlers.c',
- 'bdr_conflict_logging.c', 'bdr_executor.c');
+ 'bdr_conflict_logging.c', 'bdr_executor.c',
+ 'bdr_catalogs.c');
$bdr_apply->AddReference($postgres);
$bdr_apply->AddLibrary('wsock32.lib');
$bdr_apply->AddIncludeDir('src\interfaces\libpq');