bdr: Prevent slot creation until BDR is up and ready
authorCraig Ringer <craig@2ndquadrant.com>
Wed, 14 May 2014 01:47:39 +0000 (09:47 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:35 +0000 (17:55 +0200)
Checks bdr.bdr_nodes status field for the local node in the local DB during bdr
output plugin startup and ERRORs if status isn't set to ready yet. This ensures
that we cannot begin sending changes to other nodes while we're still applying
a dump or performing catchup.

The BDR per-db worker now creates an entry in the local bdr.bdr_nodes if
init_replica isn't configured on any connection and no such entry exists. That
makes sure BDR without init_replica keeps working properly.

An error like:

    ERROR:  bdr.bdr_nodes entry for local node (sysid=6013159507840554182, dbname=postgres): row missing, bdr not active on this database or is initializing.

is emitted if the DB isn't ready for BDR yet. This can be tested easily with
pg_recvlogical by deleting the bdr.bdr_nodes entry for the local node or
creating an init_replica script that pauses indefinitely.

contrib/bdr/bdr.c
contrib/bdr/bdr.h
contrib/bdr/bdr_catalogs.c [new file with mode: 0644]
contrib/bdr/bdr_init_replica.c
contrib/bdr/bdr_output.c
contrib/bdr/output.mk
contrib/bdr/worker.mk
src/tools/msvc/Mkvcbuild.pm

index f588177760679126f65032673c77cbcd1d7a9fea..6a31c733fb5a42299f40dd23dc62f2f162bb0e9d 100644 (file)
@@ -1048,7 +1048,13 @@ bdr_perdb_worker_main(Datum main_arg)
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "bdr seq top-level resource owner");
    bdr_saved_resowner = CurrentResourceOwner;
 
-   /* Do we need to init the local DB from a remote node? */
+   /*
+    * Do we need to init the local DB from a remote node?
+    *
+    * Checks bdr.bdr_nodes.status, does any remote initialization required if
+    * there's an init_replica connection, and ensures that
+    * bdr.bdr_nodes.status=r for our entry before continuing.
+    */
    bdr_init_replica(&bdr_perdb_worker->dbname);
 
    elog(DEBUG1, "Starting bdr apply workers for db %s", NameStr(bdr_perdb_worker->dbname));
index c145e2e71eb34d22457736045995f97581fd11cf..936574dee56fdf3f4272fee17613979c511581db 100644 (file)
@@ -318,6 +318,10 @@ extern void init_bdr_commandfilter(void);
 /* background workers */
 extern void bdr_apply_main(Datum main_arg);
 
+/* manipulation of bdr catalogs */
+extern char bdr_nodes_get_local_status(uint64 sysid, Name dbname);
+extern void bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status);
+
 /* helpers shared by multiple worker types */
 extern struct pg_conn *
 bdr_connect(char *conninfo_repl,
diff --git a/contrib/bdr/bdr_catalogs.c b/contrib/bdr/bdr_catalogs.c
new file mode 100644 (file)
index 0000000..b96c5c7
--- /dev/null
@@ -0,0 +1,120 @@
+/* -------------------------------------------------------------------------
+ *
+ * bdr_catalogs.c
+ *     Access to bdr catalog information like bdr.bdr_nodes
+ *
+ * Functions usable by both the output plugin and the extension/workers for
+ * accessing and manipulating BDR's catalogs, like bdr.bdr_nodes.
+ *
+ * Copyright (C) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *     contrib/bdr/bdr.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "bdr.h"
+
+#include "access/xact.h"
+
+#include "catalog/pg_type.h"
+
+#include "executor/spi.h"
+
+#include "utils/builtins.h"
+
+/*
+ * Get the bdr.bdr_nodes status value for the current local node from the local
+ * database via SPI, if any such row exists.
+ *
+ * Returns the status value, or '\0' if no such row exists.
+ *
+ * SPI must be initialized, and you must be in a running transaction.
+ */
+char
+bdr_nodes_get_local_status(uint64 sysid, Name dbname)
+{
+   int         spi_ret;
+   Oid         argtypes[] = { NUMERICOID, NAMEOID };
+   Datum       values[2];
+   bool        isnull;
+   char        status;
+   char        sysid_str[33];
+
+   Assert(IsTransactionState());
+
+   snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid);
+   sysid_str[sizeof(sysid_str)-1] = '\0';
+
+   values[0] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
+                                       CStringGetDatum(sysid_str),
+                                       InvalidOid, Int32GetDatum(-1));
+   values[1] = NameGetDatum(dbname);
+
+   spi_ret = SPI_execute_with_args(
+           "SELECT node_status FROM bdr.bdr_nodes "
+           "WHERE node_sysid = $1 AND node_dbname = $2",
+           2, argtypes, values, NULL, false, 1);
+
+   if (spi_ret != SPI_OK_SELECT)
+       elog(ERROR, "Unable to query bdr.bdr_nodes, SPI error %d", spi_ret);
+
+   if (SPI_processed == 0)
+       return '\0';
+
+   status = DatumGetChar(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
+                         &isnull));
+
+   if (isnull)
+       elog(ERROR, "bdr.bdr_nodes.status NULL; shouldn't happen");
+
+   return status;
+}
+
+/*
+ * Insert a row for the local node's (sysid,dbname) with the passed status into
+ * bdr.bdr_nodes. No existing row for this key may exist.
+ *
+ * Unlike bdr_set_remote_status, '\0' may not be passed to delete the row, and
+ * no upsert is performed. This is a simple insert only.
+ *
+ * SPI must be initialized, and you must be in a running transaction that is
+ * not bound to any remote node replication state.
+ */
+void
+bdr_nodes_set_local_status(uint64 sysid, Name dbname, char status)
+{
+   int         spi_ret;
+   Oid         argtypes[] = { CHAROID, NUMERICOID, NAMEOID };
+   Datum       values[3];
+   char        sysid_str[33];
+
+   Assert(status != '\0'); /* Cannot pass \0 to delete */
+   Assert(IsTransactionState());
+   /* Cannot have replication apply state set in this tx */
+   Assert(replication_origin_id == InvalidRepNodeId);
+
+   snprintf(sysid_str, sizeof(sysid_str), UINT64_FORMAT, sysid);
+   sysid_str[sizeof(sysid_str)-1] = '\0';
+
+   values[0] = CharGetDatum(status);
+   values[1] = DirectFunctionCall3Coll(numeric_in, InvalidOid,
+                                       CStringGetDatum(sysid_str),
+                                       InvalidOid, Int32GetDatum(-1));
+   values[2] = NameGetDatum(dbname);
+
+   spi_ret = SPI_execute_with_args(
+                              "INSERT INTO bdr.bdr_nodes"
+                              "    (node_status, node_sysid, node_dbname)"
+                              "    VALUES ($1, $2, $3);",
+                              3, argtypes, values, NULL, false, 0);
+
+   if (spi_ret != SPI_OK_INSERT)
+       elog(ERROR, "Unable to insert row (status=%c, node_sysid="
+                   UINT64_FORMAT ", node_dbname=%s) into bdr.bdr_nodes, "
+                   "SPI error %d",
+                   status, sysid, NameStr(*dbname), spi_ret);
+}
index 9ac4c6ab9d4b82ebe061a3f3d080d45bc37423a2..d63f5e5857a3a2de0ed772c7a047a31bfeaeb71e 100644 (file)
@@ -35,6 +35,8 @@
 
 #include "catalog/pg_type.h"
 
+#include "executor/spi.h"
+
 #include "replication/replication_identifier.h"
 #include "replication/walreceiver.h"
 
@@ -681,12 +683,42 @@ bdr_init_replica(Name dbname)
    StringInfoData query;
    BdrWorker  *init_replica_worker;
    BdrConnectionConfig *init_replica_config;
+   int spi_ret;
 
    initStringInfo(&query);
 
    elog(DEBUG2, "bdr %s: bdr_init_replica",
         NameStr(*dbname));
 
+   /*
+    * The local SPI transaction we're about to perform must do any writes as a
+    * local transaction, not as a changeset application from a remote node.
+    * That allows rows to be repliated to other nodes. So no replication_origin_id
+    * may be set.
+    */
+   Assert(replication_origin_id == InvalidRepNodeId);
+
+   /*
+    * Check the local bdr.bdr_nodes over SPI or direct scan to see if
+    * there's an entry for ourselves in ready mode already.
+    *
+    * Note that we don't have to explicitly SPI_finish(...) on error paths;
+    * that's taken care of for us.
+    */
+   StartTransactionCommand();
+   spi_ret = SPI_connect();
+   if (spi_ret != SPI_OK_CONNECT)
+       elog(ERROR, "SPI already connected; this shouldn't be possible");
+
+   status = bdr_nodes_get_local_status(GetSystemIdentifier(), dbname);
+   if (status == 'r')
+   {
+       /* Already in ready state, nothing more to do */
+       SPI_finish();
+       CommitTransactionCommand();
+       return;
+   }
+
    /*
     * Before starting workers we must determine if we need to copy
     * initial state from a remote node. This is only necessary if
@@ -697,31 +729,53 @@ bdr_init_replica(Name dbname)
    LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
    init_replica_worker = find_init_replica_worker(dbname);
    LWLockRelease(BdrWorkerCtl->lock);
-   /* No connections have init_replica=t, nothing to do */
    if (!init_replica_worker)
    {
-       elog(DEBUG2, "bdr %s: nothing to do in bdr_init_replica",
-            NameStr(*dbname));
-       return;
+       if (status != '\0')
+       {
+           /*
+            * Even though there's no init_replica worker, the local bdr.bdr_nodes table
+            * has an entry for our (sysid,dbname) and it isn't status=r (checked above),
+            * we must've had an init_replica configured before, then removed.
+            */
+           ereport(ERROR, (errmsg("bdr.bdr_nodes row with (sysid="
+                   UINT64_FORMAT ", dbname=%s) exists and has status=%c, but "
+                   "no connection with init_replica=t is configured for this "
+                   "database. ",
+                   GetSystemIdentifier(), NameStr(*dbname), status),
+                   errdetail("You probably configured initial setup with "
+                   "init_replica on a connection, then removed or changed that "
+                   "connection before setup completed properly. "),
+                   errhint("DROP and re-create the database if it has no "
+                   "existing content of value, or add the init_replica setting "
+                   "to one of the connections.")));
+       }
+       /*
+        * No connections have init_replica=t, so there's no remote copy to do.
+        * We still have to ensure that bdr.bdr_nodes.status is 'r' for this
+        * node so that slot creation is permitted.
+        */
+       bdr_nodes_set_local_status(GetSystemIdentifier(), dbname, 'r');
    }
+   /*
+    * We no longer require the transaction for SPI; further work gets done on
+    * the remote machine's bdr.bdr_nodes table and replicated back to us via
+    * pg_dump/pg_restore, or over the walsender protocol once we start
+    * replay. If we aren't just about to exit anyway.
+    */
+   SPI_finish();
+   CommitTransactionCommand();
+
+   if (!init_replica_worker)
+       /* Cleanup done and nothing more to do */
+       return;
+
 
    init_replica_config = bdr_connection_configs
        [init_replica_worker->worker_data.apply_worker.connection_config_idx];
    elog(DEBUG2, "bdr %s: bdr_init_replica init from connection %s",
         NameStr(*dbname), NameStr(init_replica_config->name));
 
-   /*
-    * Check the local bdr.bdr_nodes over SPI or direct scan to see if
-    * there's an entry for ourselves in ready mode already.
-    *
-    * This is an optimisation we don't need to do yet...
-    */
-   /*TODO
-   (status, min_remote_lsn) = get_node_status_from_local();
-   if (status == 'r')
-       return;
-   */
-
    /*
     * Test to see if there's an entry in the remote's bdr.bdr_nodes for our
     * system identifier. If there is, that'll tell us what stage of startup
index 9b8c4a7ab4a225a585c7eadbff05d521a5f6b1bd..38c23d4a9ea88367dde3a482e3b1909c608b87b0 100644 (file)
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "bdr.h"
+#include "miscadmin.h"
 
 #include "access/sysattr.h"
 #include "access/tuptoaster.h"
 
 #include "catalog/namespace.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_type.h"
 
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
 #include "libpq/pqformat.h"
 
 #include "mb/pg_wchar.h"
@@ -165,6 +171,116 @@ bdr_req_param(const char *param)
                    param)));
 }
 
+/*
+ * Check bdr.bdr_nodes entry in local DB and if status != r,
+ * raise an error.
+ *
+ * If this function returns it's safe to begin replay.
+ */
+static void
+bdr_ensure_node_ready()
+{
+   int spi_ret;
+   const uint64 sysid = GetSystemIdentifier();
+   char status;
+   HeapTuple tuple;
+   NameData dbname;
+
+   StartTransactionCommand();
+
+   tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
+   if (!HeapTupleIsValid(tuple))
+       elog(ERROR, "Could not get name of local DB");
+   namecpy( &dbname, &((Form_pg_database) GETSTRUCT(tuple))->datname );
+   ReleaseSysCache(tuple);
+
+   /*
+    * Refuse to begin replication if the local node isn't yet ready to
+    * send data. Check the status in bdr.bdr_nodes.
+    */
+   spi_ret = SPI_connect();
+   if (spi_ret != SPI_OK_CONNECT)
+       elog(ERROR, "Local SPI connect failed; shouldn't happen");
+
+   status = bdr_nodes_get_local_status(sysid, &dbname);
+
+   SPI_finish();
+
+   CommitTransactionCommand();
+
+   /* Complain if node isn't ready. */
+   /* TODO: Allow soft error so caller can sleep and recheck? */
+   if (status != 'r')
+   {
+       const char * const base_msg =
+           "bdr.bdr_nodes entry for local node (sysid=" UINT64_FORMAT
+           ", dbname=%s): %s";
+       switch (status)
+       {
+           case 'r':
+               break; /* unreachable */
+           case '\0':
+               /*
+                * Can't allow replay when BDR hasn't started yet, as
+                * replica init might still need to run, causing a dump to
+                * be applied, catchup from a remote node, etc.
+                *
+                * If there's no init_replica set, the bdr extension will
+                * create a bdr.bdr_nodes entry with 'r' state shortly
+                * after it starts, so we won't hit this.
+                */
+               ereport(ERROR,
+                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                        errmsg(base_msg, sysid, NameStr(dbname),
+                               "row missing, bdr not active on this "
+                               "database or is initializing."),
+                        errhint("Add bdr to shared_preload_libraries and "
+                                "check logs for bdr startup errors.")));
+               break;
+           case 'c':
+               /*
+                * Can't allow replay while still catching up. We get the
+                * real origin node ID and LSN over the protocol in catchup
+                * mode, but changes are written to WAL with the ID and LSN
+                * of the immediate origin node. So if we cascade them to
+                * another node now, they'll incorrectly see the immediate
+                * origin node ID and LSN, not the true original ones.
+                *
+                * It should be possible to lift this restriction later,
+                * if we write the original node id and lsn in WAL.
+                */
+               ereport(ERROR,
+                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                        errmsg(base_msg, sysid, NameStr(dbname), "status='c'"
+                               ", bdr still starting up: "
+                               "catching up from remote node"),
+                        errhint("Monitor pg_stat_replication on the "
+                                "remote node, watch the logs and wait "
+                                "until the node has caught up")));
+               break;
+           case 'i':
+               /*
+                * Can't allow replay while still applying a dump because the
+                * origin_id and origin_lsn are not preserved on the dump, so
+                * we'd replay all the changes. If the connections are from
+                * nodes that already have that data (or the origin node),
+                * that'll create a right mess.
+                */
+               ereport(ERROR,
+                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                        errmsg(base_msg, sysid, NameStr(dbname),
+                               "status='i', bdr still starting up: applying "
+                               "initial dump of remote node"),
+                        errhint("Monitor pg_stat_activity and the logs, "
+                                "wait until the node has caught up")));
+               break;
+           default:
+               elog(ERROR, "Unhandled case status=%c", status);
+               break;
+       }
+   }
+}
+
 
 /* initialize this plugin */
 static void
@@ -313,7 +429,6 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
        if (data->client_pg_version / 100 != PG_VERSION_NUM / 100)
            data->allow_sendrecv_protocol = false;
 
-
        if (!IsTransactionState())
        {
            tx_started = false;
@@ -337,6 +452,12 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool i
 
        if (!tx_started)
            CommitTransactionCommand();
+
+       /*
+        * Make sure it's safe to begin playing changes to the remote end.
+        * This'll ERROR out if we're not ready.
+        */
+       bdr_ensure_node_ready();
    }
 }
 
index 3c19093db87a13ca8a248cbbfa71688556a14004..586d1be8752f9afaf2e3e3eddeda03f6bbd8512e 100644 (file)
@@ -1,7 +1,7 @@
 # contrib/bdr/output.mk
 
 MODULE_big = bdr_output
-OBJS = bdr_compat.o bdr_output.o
+OBJS = bdr_compat.o bdr_catalogs.o bdr_output.o
 
 PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK = $(libpq)
index f70f0454c5639ac523e8a86587f146abdfc0a712..707e0f96f0b69665882fe9e60e61bb846b45dda9 100644 (file)
@@ -3,7 +3,7 @@
 MODULE_big = bdr
 OBJS = bdr.o bdr_apply.o bdr_compat.o bdr_commandfilter.o bdr_count.o \
    bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o \
-   bdr_conflict_logging.o bdr_executor.o
+   bdr_conflict_logging.o bdr_executor.o bdr_catalogs.o
 
 EXTENSION = bdr
 DATA = bdr--0.5.sql
index 6543d41cd6cbfc65bfc3816862c55f9ff86a273f..96dc10ff1f533464e83819908a1b0b25a8f2b000 100644 (file)
@@ -512,7 +512,8 @@ sub mkvcbuild
 
    # so is bdr
    my $bdr_output = $solution->AddProject('bdr_output', 'dll', 'misc');
-   $bdr_output->AddFiles('contrib\bdr', 'bdr_compat.c', 'bdr_output.c');
+   $bdr_output->AddFiles('contrib\bdr', 'bdr_compat.c', 'bdr_output.c',
+                         'bdr_catalogs.c');
    $bdr_output->AddReference($postgres);
    $bdr_output->AddLibrary('wsock32.lib');
 
@@ -521,7 +522,8 @@ sub mkvcbuild
                 'bdr_commandfilter.c', 'bdr_compat.c',
                 'bdr_count.c', 'bdr_seq.c', 'bdr_init_replica.c',
                 'bdr_relcache.c', 'bdr_conflict_handlers.c',
-                 'bdr_conflict_logging.c', 'bdr_executor.c');
+                 'bdr_conflict_logging.c', 'bdr_executor.c',
+                'bdr_catalogs.c');
    $bdr_apply->AddReference($postgres);
    $bdr_apply->AddLibrary('wsock32.lib');
    $bdr_apply->AddIncludeDir('src\interfaces\libpq');