bdr/deparse: have DDL replication track perpetrators
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 22 May 2014 19:23:27 +0000 (15:23 -0400)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:38 +0000 (17:55 +0200)
This means tables (and other objects) are created with the right owners,
GRANTs are executed with the right grantors, etc.

Queueing function had to be rewritten in C in order for this to work.

contrib/bdr/bdr--0.5.sql
contrib/bdr/bdr_apply.c
contrib/bdr/bdr_conflict_handlers.c
contrib/bdr/bdr_executor.c

index 52e42b41b01cf978bbef94eda925dd1197d61f18..3db89c2a2bc35cc3d649593460193cf0059a490d 100644 (file)
@@ -319,10 +319,10 @@ COMMENT ON COLUMN bdr_nodes.node_status IS 'Readiness of the node: [i]nitializin
 
 CREATE TABLE bdr_queued_commands (
     lsn pg_lsn NOT NULL,
-    queued_at timestamptz NOT NULL,
-    command_tag text,
-    command text,
-    executed bool
+    queued_at TIMESTAMP WITH TIME ZONE NOT NULL,
+    perpetrator TEXT NOT NULL,
+    command_tag TEXT NOT NULL,
+    command TEXT NOT NULL
 );
 REVOKE ALL ON TABLE bdr_queued_commands FROM PUBLIC;
 SELECT pg_catalog.pg_extension_config_dump('bdr_queued_commands', '');
@@ -342,70 +342,24 @@ BEGIN
     ident := quote_ident(TG_TABLE_SCHEMA)||'.'||quote_ident(TG_TABLE_NAME);
 
     INSERT INTO bdr.bdr_queued_commands (
-        lsn, queued_at,
-        command_tag, command, executed
+        lsn, queued_at, perpetrator,
+        command_tag, command
     )
     VALUES (
         pg_current_xlog_location(),
-        NOW(),
+        NOW(), CURRENT_USER,
         'TRUNCATE (automatic)',
-        'TRUNCATE TABLE ONLY ' || ident,
-        'false');
+        'TRUNCATE TABLE ONLY ' || ident
+        );
     RETURN NULL;
 END;
 $function$;
 
-CREATE OR REPLACE FUNCTION bdr.queue_commands()
+CREATE OR REPLACE FUNCTION bdr.bdr_queue_ddl_commands()
 RETURNS event_trigger
-LANGUAGE plpgsql
-AS $function$
-DECLARE
-    r RECORD;
-BEGIN
-    -- don't recursively log ddl commands
-    IF pg_replication_identifier_is_replaying() THEN
-       RETURN;
-    END IF;
-
-    IF current_setting('bdr.skip_ddl_replication')::boolean THEN
-        -- If we're doing a pg_restore from a remote BDR node's
-        -- state, we must not create truncate triggers etc because
-        -- they'll get copied over in the dump.
-        RETURN;
-    END IF;
-
-    FOR r IN SELECT * FROM pg_event_trigger_get_creation_commands()
-    LOOP
-        /* ignore temporary objects */
-        IF r.schema = 'pg_temp' THEN
-            CONTINUE;
-        END IF;
-
-        /* ignore objects that are part of an extension */
-        IF r.in_extension THEN
-            CONTINUE;
-        END IF;
-
-        INSERT INTO bdr.bdr_queued_commands(
-            lsn, queued_at,
-            command_tag, command, executed
-        )
-        VALUES (
-            pg_current_xlog_location(),
-            NOW(),
-            r.command_tag,
-            pg_catalog.pg_event_trigger_expand_command(r.command),
-            'false'
-        );
-
-        IF r.command_tag = 'CREATE TABLE' and r.object_type = 'table' THEN
-            EXECUTE 'CREATE TRIGGER truncate_trigger AFTER TRUNCATE ON ' ||
-                r.identity ||
-                ' FOR EACH STATEMENT EXECUTE PROCEDURE bdr.queue_truncate()';
-        END IF;
-    END LOOP;
-END;
-$function$;
+LANGUAGE C
+AS 'MODULE_PATHNAME'
+;
 
 -- This type is tailored to use as input to get_object_address
 CREATE TYPE bdr.dropped_object AS (
@@ -479,9 +433,9 @@ AS 'MODULE_PATHNAME'
 --- this should always be last to avoid replicating our internal schema
 ---
 
-CREATE EVENT TRIGGER queue_commands
+CREATE EVENT TRIGGER bdr_queue_ddl_commands
 ON ddl_command_end
-EXECUTE PROCEDURE bdr.queue_commands();
+EXECUTE PROCEDURE bdr.bdr_queue_ddl_commands();
 
 SET bdr.permit_unsafe_ddl_commands = false;
 RESET search_path;
index b3c65f0b754c1439b47f586e9713a42cc6c2861a..ba20a57e0dc16d50dd880361b037375849c38602 100644 (file)
@@ -1109,14 +1109,11 @@ static void
 process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 {
    Relation    cmdsrel;
-#ifdef NOT_YET
-   HeapTuple   newtup;
-#endif
    Datum       datum;
    char       *command_tag;
    char       *cmdstr;
    bool        isnull;
-
+   char       *perpetrator;
    List       *commands;
    ListCell   *command_i;
    bool        isTopLevel;
@@ -1134,17 +1131,26 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 
    cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
 
-   /* fetch the command tag */
+   /* fetch the perpetrator user identifier */
    datum = heap_getattr(cmdtup, 3,
                         RelationGetDescr(cmdsrel),
                         &isnull);
+   if (isnull)
+       elog(ERROR, "null command perpetrator in command tuple in \"%s\"",
+            RelationGetRelationName(cmdsrel));
+   perpetrator = TextDatumGetCString(datum);
+
+   /* fetch the command tag */
+   datum = heap_getattr(cmdtup, 4,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
    if (isnull)
        elog(ERROR, "null command tag in command tuple in \"%s\"",
             RelationGetRelationName(cmdsrel));
    command_tag = TextDatumGetCString(datum);
 
    /* finally fetch and execute the command */
-   datum = heap_getattr(cmdtup, 4,
+   datum = heap_getattr(cmdtup, 5,
                         RelationGetDescr(cmdsrel),
                         &isnull);
    if (isnull)
@@ -1180,6 +1186,13 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 
        oldcontext = MemoryContextSwitchTo(MessageContext);
 
+       /*
+        * Set the current role to the user that executed the command on the
+        * origin server.  NB: there is no need to reset this afterwards, as
+        * the value will be gone with our transaction.
+        */
+       SetConfigOption("role", perpetrator, PGC_INTERNAL, PGC_S_OVERRIDE);
+
        commandTag = CreateCommandTag(command);
 
        querytree_list = pg_analyze_and_rewrite(
@@ -1210,12 +1223,6 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 
        MemoryContextSwitchTo(oldcontext);
    }
-
-#ifdef NOT_YET
-   /* FIXME: update tuple to set set "executed" to true */
-   // newtup = heap_modify_tuple( .. );
-   newtup = cmdtup;
-#endif
 }
 
 static HeapTuple
index b59a1adecd0d1a7f1052e33f9697a4a8a00b6896..ba4c71ca374d98026a73a74a6009c95b862b7a81 100644 (file)
@@ -55,8 +55,8 @@ const char *drop_handler_get_tbl_oid_sql =
 "SELECT oid FROM bdr.bdr_conflict_handlers WHERE ch_name = $1 AND ch_reloid = $2";
 
 const char *handler_queued_table_sql =
-"INSERT INTO bdr.bdr_queued_commands (lsn, queued_at, command_tag, command, executed)\n" \
-"   VALUES (pg_current_xlog_location(), NOW(), 'SELECT', $1, false)";
+"INSERT INTO bdr.bdr_queued_commands (lsn, queued_at, perpetrator, command_tag, command)\n" \
+"   VALUES (pg_current_xlog_location(), NOW(), CURRENT_USER, 'SELECT', $1)";
 
 const char *get_conflict_handlers_for_table_sql =
 "SELECT ch_fun, ch_type::text ch_type, ch_timeframe FROM bdr.bdr_conflict_handlers" \
index af15f5ba76b5d34f8e156f3171f2f92bbd735886..4d3613e064863851bd7cd2ca977863decba6c5e6 100644 (file)
 #include "access/heapam.h"
 #include "access/skey.h"
 #include "access/xact.h"
+#include "access/xlog_fn.h"
+
+#include "catalog/pg_trigger.h"
+
+#include "commands/trigger.h"
 
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "executor/tuptable.h"
 
+#include "miscadmin.h"
+
 #include "nodes/execnodes.h"
+#include "nodes/makefuncs.h"
+#include "nodes/parsenodes.h"
 
 #include "parser/parse_relation.h"
 
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 
+#include "utils/builtins.h"
+#include "utils/guc.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
 
+
+PG_FUNCTION_INFO_V1(bdr_queue_ddl_commands);
+
 EState *
 bdr_create_rel_estate(Relation rel)
 {
@@ -263,3 +279,142 @@ retry:
 
    return found;
 }
+
+
+/*
+ * bdr_queue_ddl_commands
+ *         ddl_command_end event triggger handler for BDR
+ *
+ * This function queues all commands reported in a replicated table, so that
+ * they can be replayed by remote BDR nodes.
+ */
+Datum
+bdr_queue_ddl_commands(PG_FUNCTION_ARGS)
+{
+   EState  *estate;
+   TupleTableSlot  *slot;
+   RangeVar *rv;
+   Relation queuedcmds;
+   char   *skip_ddl;
+   int     res;
+   int     i;
+   MemoryContext   tupcxt;
+
+   /*
+    * If we're currently replaying something from a remote node, don't queue
+    * the commands; that would cause recursion.
+    */
+   if (replication_origin_id != InvalidRepNodeId)
+       PG_RETURN_VOID();   /* XXX return type? */
+
+   /*
+    * Similarly, if configured to skip queueing DDL, don't queue.  This is
+    * mostly used when pg_restore brings a remote node state, so all objects
+    * will be copied over in the dump anyway.
+    */
+   skip_ddl = GetConfigOptionByName("bdr.skip_ddl_replication", NULL);
+   if (strcmp(skip_ddl, "on") == 0)
+       PG_RETURN_VOID();
+
+   /*
+    * Connect to SPI early, so that all memory allocated in this routine is
+    * released when we disconnect.  Also create a memory context that's reset
+    * for each iteration, to avoid per-tuple leakage.  Normally there would be
+    * very few tuples, but it's possible to create larger commands and it's
+    * pretty easy to fix the issue anyway.
+    */
+   SPI_connect();
+   tupcxt = AllocSetContextCreate(CurrentMemoryContext,
+                                  "per-tuple DDL queue cxt",
+                                  ALLOCSET_DEFAULT_MINSIZE,
+                                  ALLOCSET_DEFAULT_INITSIZE,
+                                  ALLOCSET_DEFAULT_MAXSIZE);
+
+   res = SPI_execute("SELECT "
+                     "   command_tag, object_type, schema, identity, "
+                     "   in_extension, "
+                     "   pg_event_trigger_expand_command(command) AS command "
+                     "FROM "
+                     "   pg_catalog.pg_event_trigger_get_creation_commands()",
+                     false, 0);
+   if (res != SPI_OK_SELECT)
+       elog(ERROR, "SPI query failed: %d", res);
+
+   /* prepare bdr.bdr_queued_commands for insert */
+   rv = makeRangeVar("bdr", "bdr_queued_commands", -1);
+   queuedcmds = heap_openrv(rv, RowExclusiveLock);
+   slot = MakeSingleTupleTableSlot(RelationGetDescr(queuedcmds));
+   estate = bdr_create_rel_estate(queuedcmds);
+   ExecOpenIndices(estate->es_result_relation_info);
+
+   /*
+    * For each command row reported by the event trigger facility, insert zero
+    * or one row in the BDR queued commands table specifying how to replicate
+    * it.
+    */
+   MemoryContextSwitchTo(tupcxt);
+   for (i = 0; i < SPI_processed; i++)
+   {
+       HeapTuple   newtup = NULL;
+       Datum       cmdvalues[6];   /* # cols returned by above query */
+       bool        cmdnulls[6];
+       Datum       values[5];      /* # cols in bdr_queued_commands */
+       bool        nulls[5];
+
+       MemoryContextReset(tupcxt);
+
+       /* this is the tuple reported by event triggers */
+       heap_deform_tuple(SPI_tuptable->vals[i], SPI_tuptable->tupdesc,
+                         cmdvalues, cmdnulls);
+
+       /* if a temp object, ignore it */
+       if (!cmdnulls[2] &&
+           (strcmp(TextDatumGetCString(cmdvalues[1]), "pg_temp") == 0))
+           continue;
+
+       /* if in_extension, ignore the command */
+       if (DatumGetBool(cmdvalues[4]))
+           continue;
+
+       /* lsn, queued_at, perpetrator, command_tag, command */
+       values[0] = pg_current_xlog_location(NULL);
+       values[1] = now(NULL);
+       values[2] = PointerGetDatum(cstring_to_text(GetUserNameFromId(GetUserId())));
+       values[3] = cmdvalues[0];
+       values[4] = cmdvalues[5];
+       MemSet(nulls, 0, sizeof(nulls));
+
+       newtup = heap_form_tuple(RelationGetDescr(queuedcmds), values, nulls);
+       simple_heap_insert(queuedcmds, newtup);
+       ExecStoreTuple(newtup, slot, InvalidBuffer, true);
+       UserTableUpdateOpenIndexes(estate, slot);
+
+       /*
+        * If we're creating a table, attach a per-stmt trigger to it too, so
+        * that whenever a TRUNCATE is executed in a node, it's replicated to
+        * all other nodes.
+        */
+       if ((strcmp(TextDatumGetCString(cmdvalues[0]), "CREATE TABLE") == 0) &&
+           (strcmp(TextDatumGetCString(cmdvalues[1]), "table") == 0))
+       {
+           char    *stmt;
+
+           /* The table identity is already quoted */
+           stmt = psprintf("CREATE TRIGGER truncate_trigger AFTER TRUNCATE "
+                           "ON %s FOR EACH STATEMENT EXECUTE PROCEDURE "
+                           "bdr.queue_truncate()",
+                           TextDatumGetCString(cmdvalues[3]));
+           res = SPI_execute(stmt, false, 0);
+           if (res != SPI_OK_UTILITY)
+               elog(ERROR, "SPI failure: %d", res);
+       }
+   }
+
+   ExecCloseIndices(estate->es_result_relation_info);
+   ExecDropSingleTupleTableSlot(slot);
+   heap_close(queuedcmds, RowExclusiveLock);
+
+   SPI_finish();
+
+   PG_RETURN_VOID();
+}