bdr: reorder functions in bdr_apply.c
authorAndres Freund <andres@anarazel.de>
Fri, 25 Apr 2014 12:06:27 +0000 (14:06 +0200)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:27 +0000 (17:55 +0200)
contrib/bdr/bdr_apply.c

index eeecf114acc3fed375669de172ecfb7b7ef77130..f5ca7bc40f113cea67d3aeab3fe4de16812cf9da 100644 (file)
@@ -74,50 +74,45 @@ typedef struct BDRTupleData
    bool        changed[MaxTupleAttributeNumber];
 } BDRTupleData;
 
+bool       started_transaction = false;
+Oid            QueuedDDLCommandsRelid = InvalidOid;
+Oid            QueuedDropsRelid = InvalidOid;
+
+/*
+ * This code only runs within an apply bgworker, so we can stash a pointer to our
+ * state in shm in a global for convenient access.
+ *
+ * TODO: make static once bdr_apply_main moved into bdr.c
+ */
+BdrApplyWorker *bdr_apply_worker = NULL;
+
 static void build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot);
 static void build_index_scan_key(ScanKey skey, Relation rel, Relation idx_rel, TupleTableSlot *slot);
 static bool find_pkey_tuple(ScanKey skey, Relation rel, Relation idx_rel, TupleTableSlot *slot,
                            bool lock, LockTupleMode mode);
+
 static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
 static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
+static EState *bdr_create_rel_estate(Relation rel);
+
+/* read data from the wire */
 static Relation read_rel(StringInfo s, LOCKMODE mode);
 extern void read_tuple_parts(StringInfo s, Relation rel, BDRTupleData *tup);
 static void read_tuple(StringInfo s, Relation rel, TupleTableSlot *slot);
-static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
 
-static void check_sequencer_wakeup(Relation rel);
+static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
 
 static void check_apply_update(RepNodeId local_node_id, TimestampTz ts, bool *perform_update, bool *log_update);
 static void do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts, Relation idxrel, HeapTuple old_key);
 static void do_apply_update(Relation rel, EState *estate,
                            TupleTableSlot *oldslot, TupleTableSlot *newslot,
                            BDRTupleData new_tuple);
+static void fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli);
 
-static EState *bdr_create_rel_estate(Relation rel);
-
-bool       started_transaction = false;
-Oid            QueuedDDLCommandsRelid = InvalidOid;
-Oid            QueuedDropsRelid = InvalidOid;
-
-/*
- * This code only runs within an apply bgworker, so we can stash a pointer to our
- * state in shm in a global for convenient access.
- *
- * TODO: make static once bdr_apply_main moved into bdr.c
- */
-BdrApplyWorker *bdr_apply_worker = NULL;
-
-static bool
-bdr_performing_work(void)
-{
-   if (started_transaction)
-       return false;
-
-   started_transaction = true;
-   StartTransactionCommand();
-
-   return true;
-}
+static void check_sequencer_wakeup(Relation rel);
+static HeapTuple process_queued_drop(HeapTuple cmdtup);
+static void process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started);
+static bool bdr_performing_work(void);
 
 void
 process_remote_begin(StringInfo s)
@@ -215,486 +210,156 @@ process_remote_commit(StringInfo s)
    bdr_count_commit();
 }
 
-static void
-process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+void
+process_remote_insert(StringInfo s)
 {
-   Relation    cmdsrel;
-#ifdef NOT_YET
-   HeapTuple   newtup;
+   char        action;
+   EState     *estate;
+   TupleTableSlot *slot;
+   TupleTableSlot *oldslot;
+   Relation    rel;
+   bool        started_tx;
+#ifdef VERBOSE_INSERT
+   StringInfoData o;
 #endif
-   Datum       datum;
-   char       *type;
-   char       *identstr;
-   char       *cmdstr;
-   bool        isnull;
-
-   List       *commands;
-   ListCell   *command_i;
-   bool        isTopLevel;
-   MemoryContext oldcontext;
-
-   /* ----
-    * We can't use spi here, because it implicitly assumes a transaction
-    * context. As we want to be able to replicate CONCURRENTLY commands,
-    * that's not going to work...
-    * So instead do all the work manually, being careful about managing the
-    * lifecycle of objects.
-    * ----
-    */
-   oldcontext = MemoryContextSwitchTo(MessageContext);
+   ResultRelInfo *relinfo;
+   ItemPointer conflicts;
+   bool        conflict = false;
+   ScanKey    *index_keys;
+   int         i;
+   ItemPointerData conflicting_tid;
 
-   cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
+   ItemPointerSetInvalid(&conflicting_tid);
 
-   /* fetch the object type */
-   datum = heap_getattr(cmdtup, 1,
-                        RelationGetDescr(cmdsrel),
-                        &isnull);
-   if (isnull)
-       elog(ERROR, "null object type in command tuple in \"%s\"",
-            RelationGetRelationName(cmdsrel));
-   type = TextDatumGetCString(datum);
+   started_tx = bdr_performing_work();
 
-   /* fetch the object identity */
-   datum = heap_getattr(cmdtup, 2,
-                        RelationGetDescr(cmdsrel),
-                        &isnull);
-   if (isnull)
-       elog(ERROR, "null identity in command tuple for object of type %s",
-            RelationGetRelationName(cmdsrel));
+   Assert(bdr_apply_worker != NULL);
 
-   identstr = TextDatumGetCString(datum);
+   /*
+    * Read tuple into a context that's long lived enough for CONCURRENTLY
+    * processing.
+    */
+   MemoryContextSwitchTo(MessageContext);
+   rel = read_rel(s, RowExclusiveLock);
 
-   /* finally fetch and execute the command */
-   datum = heap_getattr(cmdtup, 3,
-                        RelationGetDescr(cmdsrel),
-                        &isnull);
-   if (isnull)
-       elog(ERROR, "null command in tuple for %s \"%s\"", type, identstr);
+   action = pq_getmsgbyte(s);
+   if (action != 'N')
+       elog(ERROR, "expected new tuple but got %d",
+            action);
 
-   cmdstr = TextDatumGetCString(datum);
+   estate = bdr_create_rel_estate(rel);
+   slot = ExecInitExtraTupleSlot(estate);
+   oldslot = ExecInitExtraTupleSlot(estate);
+   ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
+   ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
 
-   /* close relation, command execution might end/start xact */
-   heap_close(cmdsrel, NoLock);
+   read_tuple(s, rel, slot);
 
-   commands = pg_parse_query(cmdstr);
+   if (rel->rd_rel->relkind != RELKIND_RELATION)
+       elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
+            rel->rd_rel->relkind, RelationGetRelationName(rel));
 
-   MemoryContextSwitchTo(oldcontext);
+   /* debug output */
+#ifdef VERBOSE_INSERT
+   initStringInfo(&o);
+   tuple_to_stringinfo(&o, RelationGetDescr(rel), slot->tts_tuple);
+   elog(LOG, "INSERT:%s", o.data);
+   resetStringInfo(&o);
+#endif
 
    /*
-    * Do a limited amount of safety checking against CONCURRENTLY commands
-    * executed in situations where they aren't allowed. The sender side shoul
-    * provide protection, but better be safe than sorry.
+    * Search for conflicting tuples.
     */
-   isTopLevel = (list_length(commands) == 1) && tx_just_started;
+   ExecOpenIndices(estate->es_result_relation_info);
+   relinfo = estate->es_result_relation_info;
+   index_keys = palloc0(relinfo->ri_NumIndices * sizeof(ScanKeyData*));
+   conflicts = palloc0(relinfo->ri_NumIndices * sizeof(ItemPointerData));
 
-   foreach(command_i, commands)
-   {
-       List       *plantree_list;
-       List       *querytree_list;
-       Node       *command = (Node *) lfirst(command_i);
-       const char *commandTag;
-       Portal      portal;
-       DestReceiver *receiver;
+   build_index_scan_keys(estate, index_keys, slot);
 
-       /* temporarily push snapshot for parse analysis/planning */
-       PushActiveSnapshot(GetTransactionSnapshot());
+   /* do a SnapshotDirty search for conflicting tuples */
+   for (i = 0; i < relinfo->ri_NumIndices; i++)
+   {
+       IndexInfo  *ii = relinfo->ri_IndexRelationInfo[i];
+       bool found = false;
 
-       oldcontext = MemoryContextSwitchTo(MessageContext);
+       Assert(ii->ii_Expressions == NIL);
 
-       commandTag = CreateCommandTag(command);
+       if (!ii->ii_Unique)
+           continue;
 
-       querytree_list = pg_analyze_and_rewrite(
-           command, cmdstr, NULL, 0);
+       /* if conflict: wait */
+       found = find_pkey_tuple(index_keys[i],
+                               rel, relinfo->ri_IndexRelationDescs[i],
+                               oldslot, true, LockTupleExclusive);
 
-       plantree_list = pg_plan_queries(
-           querytree_list, 0, NULL);
+       /* alert if there's more than one conflicting unique key */
+       if (found &&
+           ItemPointerIsValid(&conflicting_tid) &&
+           !ItemPointerEquals(&oldslot->tts_tuple->t_self,
+                              &conflicting_tid))
+       {
+           /* FIXME: improve logging here */
+           elog(ERROR, "diverging uniqueness conflict");
+       }
+       else if (found)
+       {
+           ItemPointerCopy(&oldslot->tts_tuple->t_self, &conflicting_tid);
+           conflict = true;
+           break;
+       }
+       else
+           ItemPointerSetInvalid(&conflicts[i]);
 
-       PopActiveSnapshot();
+       CHECK_FOR_INTERRUPTS();
+   }
 
-       portal = CreatePortal("", true, true);
-       PortalDefineQuery(portal, NULL,
-                         cmdstr, commandTag,
-                         plantree_list, NULL);
-       PortalStart(portal, NULL, 0, InvalidSnapshot);
+   /*
+    * If there's a conflict use the version created later, otherwise do a
+    * plain insert.
+    */
+   if (conflict)
+   {
+       TransactionId xmin;
+       TimestampTz local_ts;
+       RepNodeId   local_node_id;
+       bool        apply_update;
+       bool        log_update;
+       CommitExtraData local_node_id_raw;
 
-       receiver = CreateDestReceiver(DestNone);
+       /* refetch tuple, check for old commit ts & origin */
+       xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
 
-       (void) PortalRun(portal, FETCH_ALL,
-                        isTopLevel,
-                        receiver, receiver,
-                        NULL);
-       (*receiver->rDestroy) (receiver);
+       /*
+        * We now need to determine whether to keep the original version of
+        * the row, or apply the insert (as an update) we received.  We use
+        * the last-update-wins strategy for this, except when the new update
+        * comes from the same node that originated the previous version of
+        * the tuple.
+        */
+       TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
+       local_node_id = local_node_id_raw;
 
-       PortalDrop(portal, false);
+       check_apply_update(local_node_id, local_ts,
+                          &apply_update, &log_update);
 
-       CommandCounterIncrement();
+       elog(LOG, "insert vs insert conflict: %s",
+            apply_update ? "update" : "ignore");
 
-       MemoryContextSwitchTo(oldcontext);
-   }
-
-#ifdef NOT_YET
-   /* FIXME: update tuple to set set "executed" to true */
-   // newtup = heap_modify_tuple( .. );
-   newtup = cmdtup;
-#endif
-}
-
-static HeapTuple
-process_queued_drop(HeapTuple cmdtup)
-{
-   Relation    cmdsrel;
-   HeapTuple   newtup;
-   Datum       arrayDatum;
-   ArrayType  *array;
-   bool        null;
-   Oid         elmtype;
-   int16       elmlen;
-   bool        elmbyval;
-   char        elmalign;
-   Oid         elmoutoid;
-   bool        elmisvarlena;
-   TupleDesc   elemdesc;
-   Datum      *values;
-   int         nelems;
-   int         i;
-   ObjectAddresses *addresses;
-
-   cmdsrel = heap_open(QueuedDropsRelid, AccessShareLock);
-   arrayDatum = heap_getattr(cmdtup, 1,
-                             RelationGetDescr(cmdsrel),
-                             &null);
-   if (null)
-   {
-       elog(WARNING, "null dropped object array in command tuple in \"%s\"",
-            RelationGetRelationName(cmdsrel));
-       return cmdtup;
-   }
-   array = DatumGetArrayTypeP(arrayDatum);
-   elmtype = ARR_ELEMTYPE(array);
-
-   get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign);
-   deconstruct_array(array, elmtype,
-                     elmlen, elmbyval, elmalign,
-                     &values, NULL, &nelems);
-
-   getTypeOutputInfo(elmtype, &elmoutoid, &elmisvarlena);
-   elemdesc = TypeGetTupleDesc(elmtype, NIL);
-
-   addresses = new_object_addresses();
-
-   for (i = 0; i < nelems; i++)
-   {
-       HeapTupleHeader elemhdr;
-       HeapTupleData tmptup;
-       ObjectType objtype;
-       Datum   datum;
-       bool    isnull;
-       char   *type;
-       List   *objnames;
-       List   *objargs = NIL;
-       Relation objrel;
-       ObjectAddress addr;
-
-       elemhdr = (HeapTupleHeader) DatumGetPointer(values[i]);
-       tmptup.t_len = HeapTupleHeaderGetDatumLength(elemhdr);
-       ItemPointerSetInvalid(&(tmptup.t_self));
-       tmptup.t_tableOid = InvalidOid;
-       tmptup.t_data = elemhdr;
-
-       /* obtain the object type as a C-string ... */
-       datum = heap_getattr(&tmptup, 1, elemdesc, &isnull);
-       if (isnull)
-       {
-           elog(WARNING, "null type !?");
-           continue;
-       }
-       type = TextDatumGetCString(datum);
-       objtype = unstringify_objtype(type);
-
-       if (objtype == OBJECT_TYPE ||
-           objtype == OBJECT_DOMAIN)
-       {
-           Datum  *values;
-           bool   *nulls;
-           int     nelems;
-           char   *typestring;
-           TypeName *typeName;
-
-           datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
-           if (isnull)
-           {
-               elog(WARNING, "null typename !?");
-               continue;
-           }
-
-           deconstruct_array(DatumGetArrayTypeP(datum),
-                             TEXTOID, -1, false, 'i',
-                             &values, &nulls, &nelems);
-
-           typestring = TextDatumGetCString(values[0]);
-           typeName = typeStringToTypeName(typestring);
-           objnames = typeName->names;
-       }
-       else if (objtype == OBJECT_FUNCTION ||
-                objtype == OBJECT_AGGREGATE ||
-                objtype == OBJECT_OPERATOR)
-       {
-           Datum  *values;
-           bool   *nulls;
-           int     nelems;
-           int     i;
-           char   *typestring;
-
-           /* objname */
-           objnames = NIL;
-           datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
-           if (isnull)
-           {
-               elog(WARNING, "null objname !?");
-               continue;
-           }
-
-           deconstruct_array(DatumGetArrayTypeP(datum),
-                             TEXTOID, -1, false, 'i',
-                             &values, &nulls, &nelems);
-           for (i = 0; i < nelems; i++)
-               objnames = lappend(objnames,
-                                  makeString(TextDatumGetCString(values[i])));
-
-           /* objargs are type names */
-           datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
-           if (isnull)
-           {
-               elog(WARNING, "null typename !?");
-               continue;
-           }
-
-           deconstruct_array(DatumGetArrayTypeP(datum),
-                             TEXTOID, -1, false, 'i',
-                             &values, &nulls, &nelems);
-
-           for (i = 0; i < nelems; i++)
-           {
-               typestring = TextDatumGetCString(values[i]);
-               objargs = lappend(objargs, typeStringToTypeName(typestring));
-           }
-       }
-       else
-       {
-           Datum  *values;
-           bool   *nulls;
-           int     nelems;
-           int     i;
-
-           /* objname */
-           objnames = NIL;
-           datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
-           if (isnull)
-           {
-               elog(WARNING, "null objname !?");
-               continue;
-           }
-
-           deconstruct_array(DatumGetArrayTypeP(datum),
-                             TEXTOID, -1, false, 'i',
-                             &values, &nulls, &nelems);
-           for (i = 0; i < nelems; i++)
-               objnames = lappend(objnames,
-                                  makeString(TextDatumGetCString(values[i])));
-
-           datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
-           if (!isnull)
-           {
-               Datum  *values;
-               bool   *nulls;
-               int     nelems;
-               int     i;
-
-               deconstruct_array(DatumGetArrayTypeP(datum),
-                                 TEXTOID, -1, false, 'i',
-                                 &values, &nulls, &nelems);
-               for (i = 0; i < nelems; i++)
-                   objargs = lappend(objargs,
-                                     makeString(TextDatumGetCString(values[i])));
-           }
-       }
-
-       addr = get_object_address(objtype, objnames, objargs, &objrel,
-                                 AccessExclusiveLock, false);
-       /* unsupported object? */
-       if (addr.classId == InvalidOid)
-           continue;
-
-       /*
-        * For certain objects, get_object_address returned us an open and
-        * locked relation.  Close it because we have no use for it; but
-        * keeping the lock seems easier than figure out lock level to release.
-        */
-       if (objrel != NULL)
-           relation_close(objrel, NoLock);
-
-       add_exact_object_address(&addr, addresses);
-   }
-
-   performMultipleDeletions(addresses, DROP_RESTRICT, 0);
-
-   newtup = cmdtup;
-
-   heap_close(cmdsrel, AccessShareLock);
-
-   return newtup;
-}
-
-void
-process_remote_insert(StringInfo s)
-{
-   char        action;
-   EState     *estate;
-   TupleTableSlot *slot;
-   TupleTableSlot *oldslot;
-   Relation    rel;
-   bool        started_tx;
-#ifdef VERBOSE_INSERT
-   StringInfoData o;
-#endif
-   ResultRelInfo *relinfo;
-   ItemPointer conflicts;
-   bool        conflict = false;
-   ScanKey    *index_keys;
-   int         i;
-   ItemPointerData conflicting_tid;
-
-   ItemPointerSetInvalid(&conflicting_tid);
-
-   started_tx = bdr_performing_work();
-
-   Assert(bdr_apply_worker != NULL);
-
-   /*
-    * Read tuple into a context that's long lived enough for CONCURRENTLY
-    * processing.
-    */
-   MemoryContextSwitchTo(MessageContext);
-   rel = read_rel(s, RowExclusiveLock);
-
-   action = pq_getmsgbyte(s);
-   if (action != 'N')
-       elog(ERROR, "expected new tuple but got %d",
-            action);
-
-   estate = bdr_create_rel_estate(rel);
-   slot = ExecInitExtraTupleSlot(estate);
-   oldslot = ExecInitExtraTupleSlot(estate);
-   ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
-   ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
-
-   read_tuple(s, rel, slot);
-
-   if (rel->rd_rel->relkind != RELKIND_RELATION)
-       elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
-            rel->rd_rel->relkind, RelationGetRelationName(rel));
-
-   /* debug output */
-#ifdef VERBOSE_INSERT
-   initStringInfo(&o);
-   tuple_to_stringinfo(&o, RelationGetDescr(rel), slot->tts_tuple);
-   elog(LOG, "INSERT:%s", o.data);
-   resetStringInfo(&o);
-#endif
-
-   /*
-    * Search for conflicting tuples.
-    */
-   ExecOpenIndices(estate->es_result_relation_info);
-   relinfo = estate->es_result_relation_info;
-   index_keys = palloc0(relinfo->ri_NumIndices * sizeof(ScanKeyData*));
-   conflicts = palloc0(relinfo->ri_NumIndices * sizeof(ItemPointerData));
-
-   build_index_scan_keys(estate, index_keys, slot);
-
-   /* do a SnapshotDirty search for conflicting tuples */
-   for (i = 0; i < relinfo->ri_NumIndices; i++)
-   {
-       IndexInfo  *ii = relinfo->ri_IndexRelationInfo[i];
-       bool found = false;
-
-       Assert(ii->ii_Expressions == NIL);
-
-       if (!ii->ii_Unique)
-           continue;
-
-       /* if conflict: wait */
-       found = find_pkey_tuple(index_keys[i],
-                               rel, relinfo->ri_IndexRelationDescs[i],
-                               oldslot, true, LockTupleExclusive);
-
-       /* alert if there's more than one conflicting unique key */
-       if (found &&
-           ItemPointerIsValid(&conflicting_tid) &&
-           !ItemPointerEquals(&oldslot->tts_tuple->t_self,
-                              &conflicting_tid))
-       {
-           /* FIXME: improve logging here */
-           elog(ERROR, "diverging uniqueness conflict");
-       }
-       else if (found)
-       {
-           ItemPointerCopy(&oldslot->tts_tuple->t_self, &conflicting_tid);
-           conflict = true;
-           break;
-       }
-       else
-           ItemPointerSetInvalid(&conflicts[i]);
-
-       CHECK_FOR_INTERRUPTS();
-   }
-
-   /*
-    * If there's a conflict use the version created later, otherwise do a
-    * plain insert.
-    */
-   if (conflict)
-   {
-       TransactionId xmin;
-       TimestampTz local_ts;
-       RepNodeId   local_node_id;
-       bool        apply_update;
-       bool        log_update;
-       CommitExtraData local_node_id_raw;
-
-       /* refetch tuple, check for old commit ts & origin */
-       xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
-
-       /*
-        * We now need to determine whether to keep the original version of
-        * the row, or apply the insert (as an update) we received.  We use
-        * the last-update-wins strategy for this, except when the new update
-        * comes from the same node that originated the previous version of
-        * the tuple.
-        */
-       TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
-       local_node_id = local_node_id_raw;
-
-       check_apply_update(local_node_id, local_ts,
-                          &apply_update, &log_update);
-
-       elog(LOG, "insert vs insert conflict: %s",
-            apply_update ? "update" : "ignore");
-
-       if (apply_update)
-       {
-           simple_heap_update(rel,
-                              &oldslot->tts_tuple->t_self,
-                              slot->tts_tuple);
-           /* races will be resolved by abort/retry */
-           UserTableUpdateOpenIndexes(estate, slot);
-       }
-   }
-   else
-   {
-       simple_heap_insert(rel, slot->tts_tuple);
-       /* races will be resolved by abort/retry */
-       UserTableUpdateOpenIndexes(estate, slot);
+       if (apply_update)
+       {
+           simple_heap_update(rel,
+                              &oldslot->tts_tuple->t_self,
+                              slot->tts_tuple);
+           /* races will be resolved by abort/retry */
+           UserTableUpdateOpenIndexes(estate, slot);
+       }
+   }
+   else
+   {
+       simple_heap_insert(rel, slot->tts_tuple);
+       /* races will be resolved by abort/retry */
+       UserTableUpdateOpenIndexes(estate, slot);
    }
 
    ExecCloseIndices(estate->es_result_relation_info);
@@ -751,44 +416,6 @@ process_remote_insert(StringInfo s)
    CommandCounterIncrement();
 }
 
-static void
-fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli)
-{
-   if (node_id == InvalidRepNodeId)
-   {
-       *sysid = GetSystemIdentifier();
-       *tli = ThisTimeLineID;
-   }
-   else
-   {
-       HeapTuple   node;
-       Form_pg_replication_identifier node_class;
-       char *ident;
-
-       uint64 remote_sysid;
-       Oid remote_dboid;
-       TimeLineID remote_tli;
-       Oid local_dboid;
-       NameData replication_name;
-
-       node = GetReplicationInfoByIdentifier(node_id, false);
-
-       node_class = (Form_pg_replication_identifier) GETSTRUCT(node);
-
-       ident = text_to_cstring(&node_class->riname);
-
-       if (sscanf(ident, BDR_NODE_ID_FORMAT,
-                  &remote_sysid, &remote_tli, &remote_dboid, &local_dboid,
-                  NameStr(replication_name)) != 4)
-           elog(ERROR, "could not parse sysid: %s", ident);
-       ReleaseSysCache(node);
-       pfree(ident);
-
-       *sysid = remote_sysid;
-       *tli = remote_tli;
-   }
-}
-
 void
 process_remote_update(StringInfo s)
 {
@@ -924,22 +551,125 @@ process_remote_update(StringInfo s)
    }
    else
    {
-       initStringInfo(&o);
-       tuple_to_stringinfo(&o, RelationGetDescr(rel),
-                           oldslot->tts_tuple);
-       bdr_count_update_conflict();
+       initStringInfo(&o);
+       tuple_to_stringinfo(&o, RelationGetDescr(rel),
+                           oldslot->tts_tuple);
+       bdr_count_update_conflict();
+
+       ereport(ERROR,
+               (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+                errmsg("CONFLICT: could not find existing tuple for pkey %s",
+                       o.data)));
+   }
+
+   PopActiveSnapshot();
+
+   check_sequencer_wakeup(rel);
+
+   /* release locks upon commit */
+   index_close(idxrel, NoLock);
+   heap_close(rel, NoLock);
+
+   ExecResetTupleTable(estate->es_tupleTable, true);
+   FreeExecutorState(estate);
+
+   CommandCounterIncrement();
+}
+
+void
+process_remote_delete(StringInfo s)
+{
+#ifdef VERBOSE_DELETE
+   StringInfoData o;
+#endif
+   char        action;
+   EState     *estate;
+   TupleTableSlot *slot;
+   Oid         idxoid;
+   Relation    rel;
+   Relation    idxrel;
+   ScanKeyData skey[INDEX_MAX_KEYS];
+   bool        found_old;
+
+   Assert(bdr_apply_worker != NULL);
+
+   bdr_performing_work();
+
+   rel = read_rel(s, RowExclusiveLock);
+
+   action = pq_getmsgbyte(s);
+
+   if (action != 'K' && action != 'E')
+       elog(ERROR, "expected action K or E got %c", action);
+
+   if (action == 'E')
+   {
+       elog(WARNING, "got delete without pkey");
+       return;
+   }
+
+   estate = bdr_create_rel_estate(rel);
+   slot = ExecInitExtraTupleSlot(estate);
+   ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
+
+   read_tuple(s, rel, slot);
+
+   /* lookup index to build scankey */
+   if (rel->rd_indexvalid == 0)
+       RelationGetIndexList(rel);
+   idxoid = rel->rd_replidindex;
+   if (!OidIsValid(idxoid))
+   {
+       elog(ERROR, "could not find primary key for table with oid %u",
+            RelationGetRelid(rel));
+       return;
+   }
+
+   /* Now open the primary key index */
+   idxrel = index_open(idxoid, RowExclusiveLock);
+
+   if (rel->rd_rel->relkind != RELKIND_RELATION)
+       elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
+            rel->rd_rel->relkind, RelationGetRelationName(rel));
+
+#ifdef VERBOSE_DELETE
+   initStringInfo(&o);
+   tuple_to_stringinfo(&o, RelationGetDescr(idxrel), slot->tts_tuple);
+   elog(LOG, "DELETE old-key:%s", o.data);
+   resetStringInfo(&o);
+#endif
+
+   PushActiveSnapshot(GetTransactionSnapshot());
+
+   build_index_scan_key(skey, rel, idxrel, slot);
+
+   /* try to find tuple via a (candidate|primary) key */
+   found_old = find_pkey_tuple(skey, rel, idxrel, slot, true, LockTupleExclusive);
+
+   if (found_old)
+   {
+       simple_heap_delete(rel, &slot->tts_tuple->t_self);
+       bdr_count_delete();
+   }
+   else
+   {
+       StringInfoData s_key;
+
+       bdr_count_delete_conflict();
+
+       initStringInfo(&s_key);
+       tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), slot->tts_tuple);
 
        ereport(ERROR,
                (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                errmsg("CONFLICT: could not find existing tuple for pkey %s",
-                       o.data)));
+                errmsg("CONFLICT: DELETE could not find existing tuple for pkey %s", s_key.data)));
+       resetStringInfo(&s_key);
    }
 
    PopActiveSnapshot();
 
    check_sequencer_wakeup(rel);
 
-   /* release locks upon commit */
    index_close(idxrel, NoLock);
    heap_close(rel, NoLock);
 
@@ -1046,145 +776,423 @@ do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts,
    fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
                            &remote_sysid, &remote_tli);
 
-   Assert(remote_sysid == bdr_apply_worker->sysid);
-   Assert(remote_tli == bdr_apply_worker->timeline);
+   Assert(remote_sysid == bdr_apply_worker->sysid);
+   Assert(remote_tli == bdr_apply_worker->timeline);
+
+   memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
+          MAXDATELEN);
+   memcpy(local_ts, timestamptz_to_str(ts),
+          MAXDATELEN);
+
+   initStringInfo(&s_key);
+   tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
+
+   ereport(LOG,
+           (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+            errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
+                   apply_update ? "applying" : "skipping",
+                   remote_sysid, remote_tli, remote_ts,
+                   local_node_id == InvalidRepNodeId ? "local" : "remote",
+                   local_sysid, local_tli, local_ts, s_key.data)));
+   resetStringInfo(&s_key);
+}
+
+static void
+do_apply_update(Relation rel, EState *estate,
+               TupleTableSlot *oldslot,
+               TupleTableSlot *newslot,
+               BDRTupleData new_tuple)
+{
+   HeapTuple   nt;
+
+   nt = heap_modify_tuple(oldslot->tts_tuple, RelationGetDescr(rel),
+                          new_tuple.values, new_tuple.isnull,
+                          new_tuple.changed);
+   ExecStoreTuple(nt, newslot, InvalidBuffer, true);
+   simple_heap_update(rel, &oldslot->tts_tuple->t_self, newslot->tts_tuple);
+   UserTableUpdateIndexes(estate, newslot);
+   bdr_count_update();
+}
+
+
+static void
+process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+{
+   Relation    cmdsrel;
+#ifdef NOT_YET
+   HeapTuple   newtup;
+#endif
+   Datum       datum;
+   char       *type;
+   char       *identstr;
+   char       *cmdstr;
+   bool        isnull;
+
+   List       *commands;
+   ListCell   *command_i;
+   bool        isTopLevel;
+   MemoryContext oldcontext;
+
+   /* ----
+    * We can't use spi here, because it implicitly assumes a transaction
+    * context. As we want to be able to replicate CONCURRENTLY commands,
+    * that's not going to work...
+    * So instead do all the work manually, being careful about managing the
+    * lifecycle of objects.
+    * ----
+    */
+   oldcontext = MemoryContextSwitchTo(MessageContext);
+
+   cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
+
+   /* fetch the object type */
+   datum = heap_getattr(cmdtup, 1,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
+   if (isnull)
+       elog(ERROR, "null object type in command tuple in \"%s\"",
+            RelationGetRelationName(cmdsrel));
+   type = TextDatumGetCString(datum);
+
+   /* fetch the object identity */
+   datum = heap_getattr(cmdtup, 2,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
+   if (isnull)
+       elog(ERROR, "null identity in command tuple for object of type %s",
+            RelationGetRelationName(cmdsrel));
+
+   identstr = TextDatumGetCString(datum);
+
+   /* finally fetch and execute the command */
+   datum = heap_getattr(cmdtup, 3,
+                        RelationGetDescr(cmdsrel),
+                        &isnull);
+   if (isnull)
+       elog(ERROR, "null command in tuple for %s \"%s\"", type, identstr);
+
+   cmdstr = TextDatumGetCString(datum);
+
+   /* close relation, command execution might end/start xact */
+   heap_close(cmdsrel, NoLock);
+
+   commands = pg_parse_query(cmdstr);
+
+   MemoryContextSwitchTo(oldcontext);
+
+   /*
+    * Do a limited amount of safety checking against CONCURRENTLY commands
+    * executed in situations where they aren't allowed. The sender side shoul
+    * provide protection, but better be safe than sorry.
+    */
+   isTopLevel = (list_length(commands) == 1) && tx_just_started;
+
+   foreach(command_i, commands)
+   {
+       List       *plantree_list;
+       List       *querytree_list;
+       Node       *command = (Node *) lfirst(command_i);
+       const char *commandTag;
+       Portal      portal;
+       DestReceiver *receiver;
+
+       /* temporarily push snapshot for parse analysis/planning */
+       PushActiveSnapshot(GetTransactionSnapshot());
+
+       oldcontext = MemoryContextSwitchTo(MessageContext);
+
+       commandTag = CreateCommandTag(command);
+
+       querytree_list = pg_analyze_and_rewrite(
+           command, cmdstr, NULL, 0);
+
+       plantree_list = pg_plan_queries(
+           querytree_list, 0, NULL);
+
+       PopActiveSnapshot();
+
+       portal = CreatePortal("", true, true);
+       PortalDefineQuery(portal, NULL,
+                         cmdstr, commandTag,
+                         plantree_list, NULL);
+       PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+       receiver = CreateDestReceiver(DestNone);
+
+       (void) PortalRun(portal, FETCH_ALL,
+                        isTopLevel,
+                        receiver, receiver,
+                        NULL);
+       (*receiver->rDestroy) (receiver);
+
+       PortalDrop(portal, false);
+
+       CommandCounterIncrement();
+
+       MemoryContextSwitchTo(oldcontext);
+   }
+
+#ifdef NOT_YET
+   /* FIXME: update tuple to set set "executed" to true */
+   // newtup = heap_modify_tuple( .. );
+   newtup = cmdtup;
+#endif
+}
+
+static HeapTuple
+process_queued_drop(HeapTuple cmdtup)
+{
+   Relation    cmdsrel;
+   HeapTuple   newtup;
+   Datum       arrayDatum;
+   ArrayType  *array;
+   bool        null;
+   Oid         elmtype;
+   int16       elmlen;
+   bool        elmbyval;
+   char        elmalign;
+   Oid         elmoutoid;
+   bool        elmisvarlena;
+   TupleDesc   elemdesc;
+   Datum      *values;
+   int         nelems;
+   int         i;
+   ObjectAddresses *addresses;
+
+   cmdsrel = heap_open(QueuedDropsRelid, AccessShareLock);
+   arrayDatum = heap_getattr(cmdtup, 1,
+                             RelationGetDescr(cmdsrel),
+                             &null);
+   if (null)
+   {
+       elog(WARNING, "null dropped object array in command tuple in \"%s\"",
+            RelationGetRelationName(cmdsrel));
+       return cmdtup;
+   }
+   array = DatumGetArrayTypeP(arrayDatum);
+   elmtype = ARR_ELEMTYPE(array);
+
+   get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign);
+   deconstruct_array(array, elmtype,
+                     elmlen, elmbyval, elmalign,
+                     &values, NULL, &nelems);
+
+   getTypeOutputInfo(elmtype, &elmoutoid, &elmisvarlena);
+   elemdesc = TypeGetTupleDesc(elmtype, NIL);
+
+   addresses = new_object_addresses();
+
+   for (i = 0; i < nelems; i++)
+   {
+       HeapTupleHeader elemhdr;
+       HeapTupleData tmptup;
+       ObjectType objtype;
+       Datum   datum;
+       bool    isnull;
+       char   *type;
+       List   *objnames;
+       List   *objargs = NIL;
+       Relation objrel;
+       ObjectAddress addr;
+
+       elemhdr = (HeapTupleHeader) DatumGetPointer(values[i]);
+       tmptup.t_len = HeapTupleHeaderGetDatumLength(elemhdr);
+       ItemPointerSetInvalid(&(tmptup.t_self));
+       tmptup.t_tableOid = InvalidOid;
+       tmptup.t_data = elemhdr;
+
+       /* obtain the object type as a C-string ... */
+       datum = heap_getattr(&tmptup, 1, elemdesc, &isnull);
+       if (isnull)
+       {
+           elog(WARNING, "null type !?");
+           continue;
+       }
+       type = TextDatumGetCString(datum);
+       objtype = unstringify_objtype(type);
+
+       if (objtype == OBJECT_TYPE ||
+           objtype == OBJECT_DOMAIN)
+       {
+           Datum  *values;
+           bool   *nulls;
+           int     nelems;
+           char   *typestring;
+           TypeName *typeName;
 
-   memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
-          MAXDATELEN);
-   memcpy(local_ts, timestamptz_to_str(ts),
-          MAXDATELEN);
+           datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
+           if (isnull)
+           {
+               elog(WARNING, "null typename !?");
+               continue;
+           }
 
-   initStringInfo(&s_key);
-   tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
+           deconstruct_array(DatumGetArrayTypeP(datum),
+                             TEXTOID, -1, false, 'i',
+                             &values, &nulls, &nelems);
 
-   ereport(LOG,
-           (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-            errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
-                   apply_update ? "applying" : "skipping",
-                   remote_sysid, remote_tli, remote_ts,
-                   local_node_id == InvalidRepNodeId ? "local" : "remote",
-                   local_sysid, local_tli, local_ts, s_key.data)));
-   resetStringInfo(&s_key);
-}
+           typestring = TextDatumGetCString(values[0]);
+           typeName = typeStringToTypeName(typestring);
+           objnames = typeName->names;
+       }
+       else if (objtype == OBJECT_FUNCTION ||
+                objtype == OBJECT_AGGREGATE ||
+                objtype == OBJECT_OPERATOR)
+       {
+           Datum  *values;
+           bool   *nulls;
+           int     nelems;
+           int     i;
+           char   *typestring;
 
-static void
-do_apply_update(Relation rel, EState *estate,
-               TupleTableSlot *oldslot,
-               TupleTableSlot *newslot,
-               BDRTupleData new_tuple)
-{
-   HeapTuple   nt;
+           /* objname */
+           objnames = NIL;
+           datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
+           if (isnull)
+           {
+               elog(WARNING, "null objname !?");
+               continue;
+           }
 
-   nt = heap_modify_tuple(oldslot->tts_tuple, RelationGetDescr(rel),
-                          new_tuple.values, new_tuple.isnull,
-                          new_tuple.changed);
-   ExecStoreTuple(nt, newslot, InvalidBuffer, true);
-   simple_heap_update(rel, &oldslot->tts_tuple->t_self, newslot->tts_tuple);
-   UserTableUpdateIndexes(estate, newslot);
-   bdr_count_update();
-}
+           deconstruct_array(DatumGetArrayTypeP(datum),
+                             TEXTOID, -1, false, 'i',
+                             &values, &nulls, &nelems);
+           for (i = 0; i < nelems; i++)
+               objnames = lappend(objnames,
+                                  makeString(TextDatumGetCString(values[i])));
 
-void
-process_remote_delete(StringInfo s)
-{
-#ifdef VERBOSE_DELETE
-   StringInfoData o;
-#endif
-   char        action;
-   EState     *estate;
-   TupleTableSlot *slot;
-   Oid         idxoid;
-   Relation    rel;
-   Relation    idxrel;
-   ScanKeyData skey[INDEX_MAX_KEYS];
-   bool        found_old;
+           /* objargs are type names */
+           datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
+           if (isnull)
+           {
+               elog(WARNING, "null typename !?");
+               continue;
+           }
 
-   Assert(bdr_apply_worker != NULL);
+           deconstruct_array(DatumGetArrayTypeP(datum),
+                             TEXTOID, -1, false, 'i',
+                             &values, &nulls, &nelems);
 
-   bdr_performing_work();
+           for (i = 0; i < nelems; i++)
+           {
+               typestring = TextDatumGetCString(values[i]);
+               objargs = lappend(objargs, typeStringToTypeName(typestring));
+           }
+       }
+       else
+       {
+           Datum  *values;
+           bool   *nulls;
+           int     nelems;
+           int     i;
 
-   rel = read_rel(s, RowExclusiveLock);
+           /* objname */
+           objnames = NIL;
+           datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
+           if (isnull)
+           {
+               elog(WARNING, "null objname !?");
+               continue;
+           }
 
-   action = pq_getmsgbyte(s);
+           deconstruct_array(DatumGetArrayTypeP(datum),
+                             TEXTOID, -1, false, 'i',
+                             &values, &nulls, &nelems);
+           for (i = 0; i < nelems; i++)
+               objnames = lappend(objnames,
+                                  makeString(TextDatumGetCString(values[i])));
 
-   if (action != 'K' && action != 'E')
-       elog(ERROR, "expected action K or E got %c", action);
+           datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
+           if (!isnull)
+           {
+               Datum  *values;
+               bool   *nulls;
+               int     nelems;
+               int     i;
 
-   if (action == 'E')
-   {
-       elog(WARNING, "got delete without pkey");
-       return;
-   }
+               deconstruct_array(DatumGetArrayTypeP(datum),
+                                 TEXTOID, -1, false, 'i',
+                                 &values, &nulls, &nelems);
+               for (i = 0; i < nelems; i++)
+                   objargs = lappend(objargs,
+                                     makeString(TextDatumGetCString(values[i])));
+           }
+       }
 
-   estate = bdr_create_rel_estate(rel);
-   slot = ExecInitExtraTupleSlot(estate);
-   ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
+       addr = get_object_address(objtype, objnames, objargs, &objrel,
+                                 AccessExclusiveLock, false);
+       /* unsupported object? */
+       if (addr.classId == InvalidOid)
+           continue;
 
-   read_tuple(s, rel, slot);
+       /*
+        * For certain objects, get_object_address returned us an open and
+        * locked relation.  Close it because we have no use for it; but
+        * keeping the lock seems easier than figure out lock level to release.
+        */
+       if (objrel != NULL)
+           relation_close(objrel, NoLock);
 
-   /* lookup index to build scankey */
-   if (rel->rd_indexvalid == 0)
-       RelationGetIndexList(rel);
-   idxoid = rel->rd_replidindex;
-   if (!OidIsValid(idxoid))
-   {
-       elog(ERROR, "could not find primary key for table with oid %u",
-            RelationGetRelid(rel));
-       return;
+       add_exact_object_address(&addr, addresses);
    }
 
-   /* Now open the primary key index */
-   idxrel = index_open(idxoid, RowExclusiveLock);
-
-   if (rel->rd_rel->relkind != RELKIND_RELATION)
-       elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
-            rel->rd_rel->relkind, RelationGetRelationName(rel));
-
-#ifdef VERBOSE_DELETE
-   initStringInfo(&o);
-   tuple_to_stringinfo(&o, RelationGetDescr(idxrel), slot->tts_tuple);
-   elog(LOG, "DELETE old-key:%s", o.data);
-   resetStringInfo(&o);
-#endif
+   performMultipleDeletions(addresses, DROP_RESTRICT, 0);
 
-   PushActiveSnapshot(GetTransactionSnapshot());
+   newtup = cmdtup;
 
-   build_index_scan_key(skey, rel, idxrel, slot);
+   heap_close(cmdsrel, AccessShareLock);
 
-   /* try to find tuple via a (candidate|primary) key */
-   found_old = find_pkey_tuple(skey, rel, idxrel, slot, true, LockTupleExclusive);
+   return newtup;
+}
 
-   if (found_old)
+static void
+fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli)
+{
+   if (node_id == InvalidRepNodeId)
    {
-       simple_heap_delete(rel, &slot->tts_tuple->t_self);
-       bdr_count_delete();
+       *sysid = GetSystemIdentifier();
+       *tli = ThisTimeLineID;
    }
    else
    {
-       StringInfoData s_key;
+       HeapTuple   node;
+       Form_pg_replication_identifier node_class;
+       char *ident;
 
-       bdr_count_delete_conflict();
+       uint64 remote_sysid;
+       Oid remote_dboid;
+       TimeLineID remote_tli;
+       Oid local_dboid;
+       NameData replication_name;
 
-       initStringInfo(&s_key);
-       tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), slot->tts_tuple);
+       node = GetReplicationInfoByIdentifier(node_id, false);
 
-       ereport(ERROR,
-               (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
-                errmsg("CONFLICT: DELETE could not find existing tuple for pkey %s", s_key.data)));
-       resetStringInfo(&s_key);
-   }
+       node_class = (Form_pg_replication_identifier) GETSTRUCT(node);
 
-   PopActiveSnapshot();
+       ident = text_to_cstring(&node_class->riname);
 
-   check_sequencer_wakeup(rel);
+       if (sscanf(ident, BDR_NODE_ID_FORMAT,
+                  &remote_sysid, &remote_tli, &remote_dboid, &local_dboid,
+                  NameStr(replication_name)) != 4)
+           elog(ERROR, "could not parse sysid: %s", ident);
+       ReleaseSysCache(node);
+       pfree(ident);
 
-   index_close(idxrel, NoLock);
-   heap_close(rel, NoLock);
+       *sysid = remote_sysid;
+       *tli = remote_tli;
+   }
+}
 
-   ExecResetTupleTable(estate->es_tupleTable, true);
-   FreeExecutorState(estate);
+static bool
+bdr_performing_work(void)
+{
+   if (started_transaction)
+       return false;
 
-   CommandCounterIncrement();
+   started_transaction = true;
+   StartTransactionCommand();
+
+   return true;
 }
 
 static void