bdr: Apply event from the command queue using full blown portals.
authorAndres Freund <andres@anarazel.de>
Wed, 9 Apr 2014 12:06:38 +0000 (14:06 +0200)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:24 +0000 (17:55 +0200)
This has the advantage that we can support normal SQL statements in
the command queue which is needed for conflict triggers.

contrib/bdr/bdr_apply.c

index b6d3758916ff9e7c43deb1de6b8b464dbfd1718d..34ed64968c8dd3cf61bf1d3a61d7bb5b58f13c9d 100644 (file)
@@ -45,6 +45,7 @@
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 
+#include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
 #include "tcop/utility.h"
 
@@ -267,12 +268,17 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
        List       *plantree_list;
        List       *querytree_list;
        Node       *command = (Node *) lfirst(command_i);
-       ListCell   *stmt_i;
+       const char *commandTag;
+       Portal      portal;
+       DestReceiver *receiver;
 
+       /* temporarily push snapshot for parse analysis/planning */
        PushActiveSnapshot(GetTransactionSnapshot());
 
        oldcontext = MemoryContextSwitchTo(MessageContext);
 
+       commandTag = CreateCommandTag(command);
+
        querytree_list = pg_analyze_and_rewrite(
            command, cmdstr, NULL, 0);
 
@@ -281,17 +287,24 @@ process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
 
        PopActiveSnapshot();
 
-       foreach(stmt_i, plantree_list)
-       {
-           Node *stmt = lfirst(stmt_i);
-           if (IsA(stmt, PlannedStmt))
-               elog(ERROR, "frak");
-
-           ProcessUtility(stmt,
-                          cmdstr,
-                          isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
-                          NULL, CreateDestReceiver(DestNone), NULL);
-       }
+       portal = CreatePortal("", true, true);
+       PortalDefineQuery(portal, NULL,
+                         cmdstr, commandTag,
+                         plantree_list, NULL);
+       PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+       receiver = CreateDestReceiver(DestNone);
+
+       (void) PortalRun(portal, FETCH_ALL,
+                        isTopLevel,
+                        receiver, receiver,
+                        NULL);
+       (*receiver->rDestroy) (receiver);
+
+       PortalDrop(portal, false);
+
+       CommandCounterIncrement();
+
        MemoryContextSwitchTo(oldcontext);
    }