CREATE TABLE bdr_queued_commands (
lsn pg_lsn NOT NULL,
- queued_at timestamptz NOT NULL,
- command_tag text,
- command text,
- executed bool
+ queued_at TIMESTAMP WITH TIME ZONE NOT NULL,
+ perpetrator TEXT NOT NULL,
+ command_tag TEXT NOT NULL,
+ command TEXT NOT NULL
);
REVOKE ALL ON TABLE bdr_queued_commands FROM PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('bdr_queued_commands', '');
ident := quote_ident(TG_TABLE_SCHEMA)||'.'||quote_ident(TG_TABLE_NAME);
INSERT INTO bdr.bdr_queued_commands (
- lsn, queued_at,
- command_tag, command, executed
+ lsn, queued_at, perpetrator,
+ command_tag, command
)
VALUES (
pg_current_xlog_location(),
- NOW(),
+ NOW(), CURRENT_USER,
'TRUNCATE (automatic)',
- 'TRUNCATE TABLE ONLY ' || ident,
- 'false');
+ 'TRUNCATE TABLE ONLY ' || ident
+ );
RETURN NULL;
END;
$function$;
-CREATE OR REPLACE FUNCTION bdr.queue_commands()
+CREATE OR REPLACE FUNCTION bdr.bdr_queue_ddl_commands()
RETURNS event_trigger
-LANGUAGE plpgsql
-AS $function$
-DECLARE
- r RECORD;
-BEGIN
- -- don't recursively log ddl commands
- IF pg_replication_identifier_is_replaying() THEN
- RETURN;
- END IF;
-
- IF current_setting('bdr.skip_ddl_replication')::boolean THEN
- -- If we're doing a pg_restore from a remote BDR node's
- -- state, we must not create truncate triggers etc because
- -- they'll get copied over in the dump.
- RETURN;
- END IF;
-
- FOR r IN SELECT * FROM pg_event_trigger_get_creation_commands()
- LOOP
- /* ignore temporary objects */
- IF r.schema = 'pg_temp' THEN
- CONTINUE;
- END IF;
-
- /* ignore objects that are part of an extension */
- IF r.in_extension THEN
- CONTINUE;
- END IF;
-
- INSERT INTO bdr.bdr_queued_commands(
- lsn, queued_at,
- command_tag, command, executed
- )
- VALUES (
- pg_current_xlog_location(),
- NOW(),
- r.command_tag,
- pg_catalog.pg_event_trigger_expand_command(r.command),
- 'false'
- );
-
- IF r.command_tag = 'CREATE TABLE' and r.object_type = 'table' THEN
- EXECUTE 'CREATE TRIGGER truncate_trigger AFTER TRUNCATE ON ' ||
- r.identity ||
- ' FOR EACH STATEMENT EXECUTE PROCEDURE bdr.queue_truncate()';
- END IF;
- END LOOP;
-END;
-$function$;
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
-- This type is tailored to use as input to get_object_address
CREATE TYPE bdr.dropped_object AS (
--- this should always be last to avoid replicating our internal schema
---
-CREATE EVENT TRIGGER queue_commands
+CREATE EVENT TRIGGER bdr_queue_ddl_commands
ON ddl_command_end
-EXECUTE PROCEDURE bdr.queue_commands();
+EXECUTE PROCEDURE bdr.bdr_queue_ddl_commands();
SET bdr.permit_unsafe_ddl_commands = false;
RESET search_path;
process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
{
Relation cmdsrel;
-#ifdef NOT_YET
- HeapTuple newtup;
-#endif
Datum datum;
char *command_tag;
char *cmdstr;
bool isnull;
-
+ char *perpetrator;
List *commands;
ListCell *command_i;
bool isTopLevel;
cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
- /* fetch the command tag */
+ /* fetch the perpetrator user identifier */
datum = heap_getattr(cmdtup, 3,
RelationGetDescr(cmdsrel),
&isnull);
+ if (isnull)
+ elog(ERROR, "null command perpetrator in command tuple in \"%s\"",
+ RelationGetRelationName(cmdsrel));
+ perpetrator = TextDatumGetCString(datum);
+
+ /* fetch the command tag */
+ datum = heap_getattr(cmdtup, 4,
+ RelationGetDescr(cmdsrel),
+ &isnull);
if (isnull)
elog(ERROR, "null command tag in command tuple in \"%s\"",
RelationGetRelationName(cmdsrel));
command_tag = TextDatumGetCString(datum);
/* finally fetch and execute the command */
- datum = heap_getattr(cmdtup, 4,
+ datum = heap_getattr(cmdtup, 5,
RelationGetDescr(cmdsrel),
&isnull);
if (isnull)
oldcontext = MemoryContextSwitchTo(MessageContext);
+ /*
+ * Set the current role to the user that executed the command on the
+ * origin server. NB: there is no need to reset this afterwards, as
+ * the value will be gone with our transaction.
+ */
+ SetConfigOption("role", perpetrator, PGC_INTERNAL, PGC_S_OVERRIDE);
+
commandTag = CreateCommandTag(command);
querytree_list = pg_analyze_and_rewrite(
MemoryContextSwitchTo(oldcontext);
}
-
-#ifdef NOT_YET
- /* FIXME: update tuple to set set "executed" to true */
- // newtup = heap_modify_tuple( .. );
- newtup = cmdtup;
-#endif
}
static HeapTuple
#include "access/heapam.h"
#include "access/skey.h"
#include "access/xact.h"
+#include "access/xlog_fn.h"
+
+#include "catalog/pg_trigger.h"
+
+#include "commands/trigger.h"
#include "executor/executor.h"
+#include "executor/spi.h"
#include "executor/tuptable.h"
+#include "miscadmin.h"
+
#include "nodes/execnodes.h"
+#include "nodes/makefuncs.h"
+#include "nodes/parsenodes.h"
#include "parser/parse_relation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
#include "utils/lsyscache.h"
+#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
+
+PG_FUNCTION_INFO_V1(bdr_queue_ddl_commands);
+
EState *
bdr_create_rel_estate(Relation rel)
{
return found;
}
+
+
+/*
+ * bdr_queue_ddl_commands
+ * ddl_command_end event triggger handler for BDR
+ *
+ * This function queues all commands reported in a replicated table, so that
+ * they can be replayed by remote BDR nodes.
+ */
+Datum
+bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
+{
+ EState *estate;
+ TupleTableSlot *slot;
+ RangeVar *rv;
+ Relation queuedcmds;
+ char *skip_ddl;
+ int res;
+ int i;
+ MemoryContext tupcxt;
+
+ /*
+ * If we're currently replaying something from a remote node, don't queue
+ * the commands; that would cause recursion.
+ */
+ if (replication_origin_id != InvalidRepNodeId)
+ PG_RETURN_VOID(); /* XXX return type? */
+
+ /*
+ * Similarly, if configured to skip queueing DDL, don't queue. This is
+ * mostly used when pg_restore brings a remote node state, so all objects
+ * will be copied over in the dump anyway.
+ */
+ skip_ddl = GetConfigOptionByName("bdr.skip_ddl_replication", NULL);
+ if (strcmp(skip_ddl, "on") == 0)
+ PG_RETURN_VOID();
+
+ /*
+ * Connect to SPI early, so that all memory allocated in this routine is
+ * released when we disconnect. Also create a memory context that's reset
+ * for each iteration, to avoid per-tuple leakage. Normally there would be
+ * very few tuples, but it's possible to create larger commands and it's
+ * pretty easy to fix the issue anyway.
+ */
+ SPI_connect();
+ tupcxt = AllocSetContextCreate(CurrentMemoryContext,
+ "per-tuple DDL queue cxt",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ res = SPI_execute("SELECT "
+ " command_tag, object_type, schema, identity, "
+ " in_extension, "
+ " pg_event_trigger_expand_command(command) AS command "
+ "FROM "
+ " pg_catalog.pg_event_trigger_get_creation_commands()",
+ false, 0);
+ if (res != SPI_OK_SELECT)
+ elog(ERROR, "SPI query failed: %d", res);
+
+ /* prepare bdr.bdr_queued_commands for insert */
+ rv = makeRangeVar("bdr", "bdr_queued_commands", -1);
+ queuedcmds = heap_openrv(rv, RowExclusiveLock);
+ slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
+ estate = bdr_create_rel_estate(queuedcmds);
+ ExecOpenIndices(estate->es_result_relation_info);
+
+ /*
+ * For each command row reported by the event trigger facility, insert zero
+ * or one row in the BDR queued commands table specifying how to replicate
+ * it.
+ */
+ MemoryContextSwitchTo(tupcxt);
+ for (i = 0; i < SPI_processed; i++)
+ {
+ HeapTuple newtup = NULL;
+ Datum cmdvalues[6]; /* # cols returned by above query */
+ bool cmdnulls[6];
+ Datum values[5]; /* # cols in bdr_queued_commands */
+ bool nulls[5];
+
+ MemoryContextReset(tupcxt);
+
+ /* this is the tuple reported by event triggers */
+ heap_deform_tuple(SPI_tuptable->vals[i], SPI_tuptable->tupdesc,
+ cmdvalues, cmdnulls);
+
+ /* if a temp object, ignore it */
+ if (!cmdnulls[2] &&
+ (strcmp(TextDatumGetCString(cmdvalues[1]), "pg_temp") == 0))
+ continue;
+
+ /* if in_extension, ignore the command */
+ if (DatumGetBool(cmdvalues[4]))
+ continue;
+
+ /* lsn, queued_at, perpetrator, command_tag, command */
+ values[0] = pg_current_xlog_location(NULL);
+ values[1] = now(NULL);
+ values[2] = PointerGetDatum(cstring_to_text(GetUserNameFromId(GetUserId())));
+ values[3] = cmdvalues[0];
+ values[4] = cmdvalues[5];
+ MemSet(nulls, 0, sizeof(nulls));
+
+ newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
+ simple_heap_insert(queuedcmds, newtup);
+ ExecStoreTuple(newtup, slot, InvalidBuffer, true);
+ UserTableUpdateOpenIndexes(estate, slot);
+
+ /*
+ * If we're creating a table, attach a per-stmt trigger to it too, so
+ * that whenever a TRUNCATE is executed in a node, it's replicated to
+ * all other nodes.
+ */
+ if ((strcmp(TextDatumGetCString(cmdvalues[0]), "CREATE TABLE") == 0) &&
+ (strcmp(TextDatumGetCString(cmdvalues[1]), "table") == 0))
+ {
+ char *stmt;
+
+ /* The table identity is already quoted */
+ stmt = psprintf("CREATE TRIGGER truncate_trigger AFTER TRUNCATE "
+ "ON %s FOR EACH STATEMENT EXECUTE PROCEDURE "
+ "bdr.queue_truncate()",
+ TextDatumGetCString(cmdvalues[3]));
+ res = SPI_execute(stmt, false, 0);
+ if (res != SPI_OK_UTILITY)
+ elog(ERROR, "SPI failure: %d", res);
+ }
+ }
+
+ ExecCloseIndices(estate->es_result_relation_info);
+ ExecDropSingleTupleTableSlot(slot);
+ heap_close(queuedcmds, RowExclusiveLock);
+
+ SPI_finish();
+
+ PG_RETURN_VOID();
+}