bool changed[MaxTupleAttributeNumber];
} BDRTupleData;
+bool started_transaction = false;
+Oid QueuedDDLCommandsRelid = InvalidOid;
+Oid QueuedDropsRelid = InvalidOid;
+
+/*
+ * This code only runs within an apply bgworker, so we can stash a pointer to our
+ * state in shm in a global for convenient access.
+ *
+ * TODO: make static once bdr_apply_main moved into bdr.c
+ */
+BdrApplyWorker *bdr_apply_worker = NULL;
+
static void build_index_scan_keys(EState *estate, ScanKey *scan_keys, TupleTableSlot *slot);
static void build_index_scan_key(ScanKey skey, Relation rel, Relation idx_rel, TupleTableSlot *slot);
static bool find_pkey_tuple(ScanKey skey, Relation rel, Relation idx_rel, TupleTableSlot *slot,
bool lock, LockTupleMode mode);
+
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
+static EState *bdr_create_rel_estate(Relation rel);
+
+/* read data from the wire */
static Relation read_rel(StringInfo s, LOCKMODE mode);
extern void read_tuple_parts(StringInfo s, Relation rel, BDRTupleData *tup);
static void read_tuple(StringInfo s, Relation rel, TupleTableSlot *slot);
-static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
-static void check_sequencer_wakeup(Relation rel);
+static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple);
static void check_apply_update(RepNodeId local_node_id, TimestampTz ts, bool *perform_update, bool *log_update);
static void do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts, Relation idxrel, HeapTuple old_key);
static void do_apply_update(Relation rel, EState *estate,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
BDRTupleData new_tuple);
+static void fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli);
-static EState *bdr_create_rel_estate(Relation rel);
-
-bool started_transaction = false;
-Oid QueuedDDLCommandsRelid = InvalidOid;
-Oid QueuedDropsRelid = InvalidOid;
-
-/*
- * This code only runs within an apply bgworker, so we can stash a pointer to our
- * state in shm in a global for convenient access.
- *
- * TODO: make static once bdr_apply_main moved into bdr.c
- */
-BdrApplyWorker *bdr_apply_worker = NULL;
-
-static bool
-bdr_performing_work(void)
-{
- if (started_transaction)
- return false;
-
- started_transaction = true;
- StartTransactionCommand();
-
- return true;
-}
+static void check_sequencer_wakeup(Relation rel);
+static HeapTuple process_queued_drop(HeapTuple cmdtup);
+static void process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started);
+static bool bdr_performing_work(void);
void
process_remote_begin(StringInfo s)
bdr_count_commit();
}
-static void
-process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+void
+process_remote_insert(StringInfo s)
{
- Relation cmdsrel;
-#ifdef NOT_YET
- HeapTuple newtup;
+ char action;
+ EState *estate;
+ TupleTableSlot *slot;
+ TupleTableSlot *oldslot;
+ Relation rel;
+ bool started_tx;
+#ifdef VERBOSE_INSERT
+ StringInfoData o;
#endif
- Datum datum;
- char *type;
- char *identstr;
- char *cmdstr;
- bool isnull;
-
- List *commands;
- ListCell *command_i;
- bool isTopLevel;
- MemoryContext oldcontext;
-
- /* ----
- * We can't use spi here, because it implicitly assumes a transaction
- * context. As we want to be able to replicate CONCURRENTLY commands,
- * that's not going to work...
- * So instead do all the work manually, being careful about managing the
- * lifecycle of objects.
- * ----
- */
- oldcontext = MemoryContextSwitchTo(MessageContext);
+ ResultRelInfo *relinfo;
+ ItemPointer conflicts;
+ bool conflict = false;
+ ScanKey *index_keys;
+ int i;
+ ItemPointerData conflicting_tid;
- cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
+ ItemPointerSetInvalid(&conflicting_tid);
- /* fetch the object type */
- datum = heap_getattr(cmdtup, 1,
- RelationGetDescr(cmdsrel),
- &isnull);
- if (isnull)
- elog(ERROR, "null object type in command tuple in \"%s\"",
- RelationGetRelationName(cmdsrel));
- type = TextDatumGetCString(datum);
+ started_tx = bdr_performing_work();
- /* fetch the object identity */
- datum = heap_getattr(cmdtup, 2,
- RelationGetDescr(cmdsrel),
- &isnull);
- if (isnull)
- elog(ERROR, "null identity in command tuple for object of type %s",
- RelationGetRelationName(cmdsrel));
+ Assert(bdr_apply_worker != NULL);
- identstr = TextDatumGetCString(datum);
+ /*
+ * Read tuple into a context that's long lived enough for CONCURRENTLY
+ * processing.
+ */
+ MemoryContextSwitchTo(MessageContext);
+ rel = read_rel(s, RowExclusiveLock);
- /* finally fetch and execute the command */
- datum = heap_getattr(cmdtup, 3,
- RelationGetDescr(cmdsrel),
- &isnull);
- if (isnull)
- elog(ERROR, "null command in tuple for %s \"%s\"", type, identstr);
+ action = pq_getmsgbyte(s);
+ if (action != 'N')
+ elog(ERROR, "expected new tuple but got %d",
+ action);
- cmdstr = TextDatumGetCString(datum);
+ estate = bdr_create_rel_estate(rel);
+ slot = ExecInitExtraTupleSlot(estate);
+ oldslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
+ ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
- /* close relation, command execution might end/start xact */
- heap_close(cmdsrel, NoLock);
+ read_tuple(s, rel, slot);
- commands = pg_parse_query(cmdstr);
+ if (rel->rd_rel->relkind != RELKIND_RELATION)
+ elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
+ rel->rd_rel->relkind, RelationGetRelationName(rel));
- MemoryContextSwitchTo(oldcontext);
+ /* debug output */
+#ifdef VERBOSE_INSERT
+ initStringInfo(&o);
+ tuple_to_stringinfo(&o, RelationGetDescr(rel), slot->tts_tuple);
+ elog(LOG, "INSERT:%s", o.data);
+ resetStringInfo(&o);
+#endif
/*
- * Do a limited amount of safety checking against CONCURRENTLY commands
- * executed in situations where they aren't allowed. The sender side shoul
- * provide protection, but better be safe than sorry.
+ * Search for conflicting tuples.
*/
- isTopLevel = (list_length(commands) == 1) && tx_just_started;
+ ExecOpenIndices(estate->es_result_relation_info);
+ relinfo = estate->es_result_relation_info;
+ index_keys = palloc0(relinfo->ri_NumIndices * sizeof(ScanKeyData*));
+ conflicts = palloc0(relinfo->ri_NumIndices * sizeof(ItemPointerData));
- foreach(command_i, commands)
- {
- List *plantree_list;
- List *querytree_list;
- Node *command = (Node *) lfirst(command_i);
- const char *commandTag;
- Portal portal;
- DestReceiver *receiver;
+ build_index_scan_keys(estate, index_keys, slot);
- /* temporarily push snapshot for parse analysis/planning */
- PushActiveSnapshot(GetTransactionSnapshot());
+ /* do a SnapshotDirty search for conflicting tuples */
+ for (i = 0; i < relinfo->ri_NumIndices; i++)
+ {
+ IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
+ bool found = false;
- oldcontext = MemoryContextSwitchTo(MessageContext);
+ Assert(ii->ii_Expressions == NIL);
- commandTag = CreateCommandTag(command);
+ if (!ii->ii_Unique)
+ continue;
- querytree_list = pg_analyze_and_rewrite(
- command, cmdstr, NULL, 0);
+ /* if conflict: wait */
+ found = find_pkey_tuple(index_keys[i],
+ rel, relinfo->ri_IndexRelationDescs[i],
+ oldslot, true, LockTupleExclusive);
- plantree_list = pg_plan_queries(
- querytree_list, 0, NULL);
+ /* alert if there's more than one conflicting unique key */
+ if (found &&
+ ItemPointerIsValid(&conflicting_tid) &&
+ !ItemPointerEquals(&oldslot->tts_tuple->t_self,
+ &conflicting_tid))
+ {
+ /* FIXME: improve logging here */
+ elog(ERROR, "diverging uniqueness conflict");
+ }
+ else if (found)
+ {
+ ItemPointerCopy(&oldslot->tts_tuple->t_self, &conflicting_tid);
+ conflict = true;
+ break;
+ }
+ else
+ ItemPointerSetInvalid(&conflicts[i]);
- PopActiveSnapshot();
+ CHECK_FOR_INTERRUPTS();
+ }
- portal = CreatePortal("", true, true);
- PortalDefineQuery(portal, NULL,
- cmdstr, commandTag,
- plantree_list, NULL);
- PortalStart(portal, NULL, 0, InvalidSnapshot);
+ /*
+ * If there's a conflict use the version created later, otherwise do a
+ * plain insert.
+ */
+ if (conflict)
+ {
+ TransactionId xmin;
+ TimestampTz local_ts;
+ RepNodeId local_node_id;
+ bool apply_update;
+ bool log_update;
+ CommitExtraData local_node_id_raw;
- receiver = CreateDestReceiver(DestNone);
+ /* refetch tuple, check for old commit ts & origin */
+ xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
- (void) PortalRun(portal, FETCH_ALL,
- isTopLevel,
- receiver, receiver,
- NULL);
- (*receiver->rDestroy) (receiver);
+ /*
+ * We now need to determine whether to keep the original version of
+ * the row, or apply the insert (as an update) we received. We use
+ * the last-update-wins strategy for this, except when the new update
+ * comes from the same node that originated the previous version of
+ * the tuple.
+ */
+ TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
+ local_node_id = local_node_id_raw;
- PortalDrop(portal, false);
+ check_apply_update(local_node_id, local_ts,
+ &apply_update, &log_update);
- CommandCounterIncrement();
+ elog(LOG, "insert vs insert conflict: %s",
+ apply_update ? "update" : "ignore");
- MemoryContextSwitchTo(oldcontext);
- }
-
-#ifdef NOT_YET
- /* FIXME: update tuple to set set "executed" to true */
- // newtup = heap_modify_tuple( .. );
- newtup = cmdtup;
-#endif
-}
-
-static HeapTuple
-process_queued_drop(HeapTuple cmdtup)
-{
- Relation cmdsrel;
- HeapTuple newtup;
- Datum arrayDatum;
- ArrayType *array;
- bool null;
- Oid elmtype;
- int16 elmlen;
- bool elmbyval;
- char elmalign;
- Oid elmoutoid;
- bool elmisvarlena;
- TupleDesc elemdesc;
- Datum *values;
- int nelems;
- int i;
- ObjectAddresses *addresses;
-
- cmdsrel = heap_open(QueuedDropsRelid, AccessShareLock);
- arrayDatum = heap_getattr(cmdtup, 1,
- RelationGetDescr(cmdsrel),
- &null);
- if (null)
- {
- elog(WARNING, "null dropped object array in command tuple in \"%s\"",
- RelationGetRelationName(cmdsrel));
- return cmdtup;
- }
- array = DatumGetArrayTypeP(arrayDatum);
- elmtype = ARR_ELEMTYPE(array);
-
- get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign);
- deconstruct_array(array, elmtype,
- elmlen, elmbyval, elmalign,
- &values, NULL, &nelems);
-
- getTypeOutputInfo(elmtype, &elmoutoid, &elmisvarlena);
- elemdesc = TypeGetTupleDesc(elmtype, NIL);
-
- addresses = new_object_addresses();
-
- for (i = 0; i < nelems; i++)
- {
- HeapTupleHeader elemhdr;
- HeapTupleData tmptup;
- ObjectType objtype;
- Datum datum;
- bool isnull;
- char *type;
- List *objnames;
- List *objargs = NIL;
- Relation objrel;
- ObjectAddress addr;
-
- elemhdr = (HeapTupleHeader) DatumGetPointer(values[i]);
- tmptup.t_len = HeapTupleHeaderGetDatumLength(elemhdr);
- ItemPointerSetInvalid(&(tmptup.t_self));
- tmptup.t_tableOid = InvalidOid;
- tmptup.t_data = elemhdr;
-
- /* obtain the object type as a C-string ... */
- datum = heap_getattr(&tmptup, 1, elemdesc, &isnull);
- if (isnull)
- {
- elog(WARNING, "null type !?");
- continue;
- }
- type = TextDatumGetCString(datum);
- objtype = unstringify_objtype(type);
-
- if (objtype == OBJECT_TYPE ||
- objtype == OBJECT_DOMAIN)
- {
- Datum *values;
- bool *nulls;
- int nelems;
- char *typestring;
- TypeName *typeName;
-
- datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
- if (isnull)
- {
- elog(WARNING, "null typename !?");
- continue;
- }
-
- deconstruct_array(DatumGetArrayTypeP(datum),
- TEXTOID, -1, false, 'i',
- &values, &nulls, &nelems);
-
- typestring = TextDatumGetCString(values[0]);
- typeName = typeStringToTypeName(typestring);
- objnames = typeName->names;
- }
- else if (objtype == OBJECT_FUNCTION ||
- objtype == OBJECT_AGGREGATE ||
- objtype == OBJECT_OPERATOR)
- {
- Datum *values;
- bool *nulls;
- int nelems;
- int i;
- char *typestring;
-
- /* objname */
- objnames = NIL;
- datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
- if (isnull)
- {
- elog(WARNING, "null objname !?");
- continue;
- }
-
- deconstruct_array(DatumGetArrayTypeP(datum),
- TEXTOID, -1, false, 'i',
- &values, &nulls, &nelems);
- for (i = 0; i < nelems; i++)
- objnames = lappend(objnames,
- makeString(TextDatumGetCString(values[i])));
-
- /* objargs are type names */
- datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
- if (isnull)
- {
- elog(WARNING, "null typename !?");
- continue;
- }
-
- deconstruct_array(DatumGetArrayTypeP(datum),
- TEXTOID, -1, false, 'i',
- &values, &nulls, &nelems);
-
- for (i = 0; i < nelems; i++)
- {
- typestring = TextDatumGetCString(values[i]);
- objargs = lappend(objargs, typeStringToTypeName(typestring));
- }
- }
- else
- {
- Datum *values;
- bool *nulls;
- int nelems;
- int i;
-
- /* objname */
- objnames = NIL;
- datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
- if (isnull)
- {
- elog(WARNING, "null objname !?");
- continue;
- }
-
- deconstruct_array(DatumGetArrayTypeP(datum),
- TEXTOID, -1, false, 'i',
- &values, &nulls, &nelems);
- for (i = 0; i < nelems; i++)
- objnames = lappend(objnames,
- makeString(TextDatumGetCString(values[i])));
-
- datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
- if (!isnull)
- {
- Datum *values;
- bool *nulls;
- int nelems;
- int i;
-
- deconstruct_array(DatumGetArrayTypeP(datum),
- TEXTOID, -1, false, 'i',
- &values, &nulls, &nelems);
- for (i = 0; i < nelems; i++)
- objargs = lappend(objargs,
- makeString(TextDatumGetCString(values[i])));
- }
- }
-
- addr = get_object_address(objtype, objnames, objargs, &objrel,
- AccessExclusiveLock, false);
- /* unsupported object? */
- if (addr.classId == InvalidOid)
- continue;
-
- /*
- * For certain objects, get_object_address returned us an open and
- * locked relation. Close it because we have no use for it; but
- * keeping the lock seems easier than figure out lock level to release.
- */
- if (objrel != NULL)
- relation_close(objrel, NoLock);
-
- add_exact_object_address(&addr, addresses);
- }
-
- performMultipleDeletions(addresses, DROP_RESTRICT, 0);
-
- newtup = cmdtup;
-
- heap_close(cmdsrel, AccessShareLock);
-
- return newtup;
-}
-
-void
-process_remote_insert(StringInfo s)
-{
- char action;
- EState *estate;
- TupleTableSlot *slot;
- TupleTableSlot *oldslot;
- Relation rel;
- bool started_tx;
-#ifdef VERBOSE_INSERT
- StringInfoData o;
-#endif
- ResultRelInfo *relinfo;
- ItemPointer conflicts;
- bool conflict = false;
- ScanKey *index_keys;
- int i;
- ItemPointerData conflicting_tid;
-
- ItemPointerSetInvalid(&conflicting_tid);
-
- started_tx = bdr_performing_work();
-
- Assert(bdr_apply_worker != NULL);
-
- /*
- * Read tuple into a context that's long lived enough for CONCURRENTLY
- * processing.
- */
- MemoryContextSwitchTo(MessageContext);
- rel = read_rel(s, RowExclusiveLock);
-
- action = pq_getmsgbyte(s);
- if (action != 'N')
- elog(ERROR, "expected new tuple but got %d",
- action);
-
- estate = bdr_create_rel_estate(rel);
- slot = ExecInitExtraTupleSlot(estate);
- oldslot = ExecInitExtraTupleSlot(estate);
- ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
- ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
-
- read_tuple(s, rel, slot);
-
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
- rel->rd_rel->relkind, RelationGetRelationName(rel));
-
- /* debug output */
-#ifdef VERBOSE_INSERT
- initStringInfo(&o);
- tuple_to_stringinfo(&o, RelationGetDescr(rel), slot->tts_tuple);
- elog(LOG, "INSERT:%s", o.data);
- resetStringInfo(&o);
-#endif
-
- /*
- * Search for conflicting tuples.
- */
- ExecOpenIndices(estate->es_result_relation_info);
- relinfo = estate->es_result_relation_info;
- index_keys = palloc0(relinfo->ri_NumIndices * sizeof(ScanKeyData*));
- conflicts = palloc0(relinfo->ri_NumIndices * sizeof(ItemPointerData));
-
- build_index_scan_keys(estate, index_keys, slot);
-
- /* do a SnapshotDirty search for conflicting tuples */
- for (i = 0; i < relinfo->ri_NumIndices; i++)
- {
- IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
- bool found = false;
-
- Assert(ii->ii_Expressions == NIL);
-
- if (!ii->ii_Unique)
- continue;
-
- /* if conflict: wait */
- found = find_pkey_tuple(index_keys[i],
- rel, relinfo->ri_IndexRelationDescs[i],
- oldslot, true, LockTupleExclusive);
-
- /* alert if there's more than one conflicting unique key */
- if (found &&
- ItemPointerIsValid(&conflicting_tid) &&
- !ItemPointerEquals(&oldslot->tts_tuple->t_self,
- &conflicting_tid))
- {
- /* FIXME: improve logging here */
- elog(ERROR, "diverging uniqueness conflict");
- }
- else if (found)
- {
- ItemPointerCopy(&oldslot->tts_tuple->t_self, &conflicting_tid);
- conflict = true;
- break;
- }
- else
- ItemPointerSetInvalid(&conflicts[i]);
-
- CHECK_FOR_INTERRUPTS();
- }
-
- /*
- * If there's a conflict use the version created later, otherwise do a
- * plain insert.
- */
- if (conflict)
- {
- TransactionId xmin;
- TimestampTz local_ts;
- RepNodeId local_node_id;
- bool apply_update;
- bool log_update;
- CommitExtraData local_node_id_raw;
-
- /* refetch tuple, check for old commit ts & origin */
- xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
-
- /*
- * We now need to determine whether to keep the original version of
- * the row, or apply the insert (as an update) we received. We use
- * the last-update-wins strategy for this, except when the new update
- * comes from the same node that originated the previous version of
- * the tuple.
- */
- TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
- local_node_id = local_node_id_raw;
-
- check_apply_update(local_node_id, local_ts,
- &apply_update, &log_update);
-
- elog(LOG, "insert vs insert conflict: %s",
- apply_update ? "update" : "ignore");
-
- if (apply_update)
- {
- simple_heap_update(rel,
- &oldslot->tts_tuple->t_self,
- slot->tts_tuple);
- /* races will be resolved by abort/retry */
- UserTableUpdateOpenIndexes(estate, slot);
- }
- }
- else
- {
- simple_heap_insert(rel, slot->tts_tuple);
- /* races will be resolved by abort/retry */
- UserTableUpdateOpenIndexes(estate, slot);
+ if (apply_update)
+ {
+ simple_heap_update(rel,
+ &oldslot->tts_tuple->t_self,
+ slot->tts_tuple);
+ /* races will be resolved by abort/retry */
+ UserTableUpdateOpenIndexes(estate, slot);
+ }
+ }
+ else
+ {
+ simple_heap_insert(rel, slot->tts_tuple);
+ /* races will be resolved by abort/retry */
+ UserTableUpdateOpenIndexes(estate, slot);
}
ExecCloseIndices(estate->es_result_relation_info);
CommandCounterIncrement();
}
-static void
-fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli)
-{
- if (node_id == InvalidRepNodeId)
- {
- *sysid = GetSystemIdentifier();
- *tli = ThisTimeLineID;
- }
- else
- {
- HeapTuple node;
- Form_pg_replication_identifier node_class;
- char *ident;
-
- uint64 remote_sysid;
- Oid remote_dboid;
- TimeLineID remote_tli;
- Oid local_dboid;
- NameData replication_name;
-
- node = GetReplicationInfoByIdentifier(node_id, false);
-
- node_class = (Form_pg_replication_identifier) GETSTRUCT(node);
-
- ident = text_to_cstring(&node_class->riname);
-
- if (sscanf(ident, BDR_NODE_ID_FORMAT,
- &remote_sysid, &remote_tli, &remote_dboid, &local_dboid,
- NameStr(replication_name)) != 4)
- elog(ERROR, "could not parse sysid: %s", ident);
- ReleaseSysCache(node);
- pfree(ident);
-
- *sysid = remote_sysid;
- *tli = remote_tli;
- }
-}
-
void
process_remote_update(StringInfo s)
{
}
else
{
- initStringInfo(&o);
- tuple_to_stringinfo(&o, RelationGetDescr(rel),
- oldslot->tts_tuple);
- bdr_count_update_conflict();
+ initStringInfo(&o);
+ tuple_to_stringinfo(&o, RelationGetDescr(rel),
+ oldslot->tts_tuple);
+ bdr_count_update_conflict();
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+ errmsg("CONFLICT: could not find existing tuple for pkey %s",
+ o.data)));
+ }
+
+ PopActiveSnapshot();
+
+ check_sequencer_wakeup(rel);
+
+ /* release locks upon commit */
+ index_close(idxrel, NoLock);
+ heap_close(rel, NoLock);
+
+ ExecResetTupleTable(estate->es_tupleTable, true);
+ FreeExecutorState(estate);
+
+ CommandCounterIncrement();
+}
+
+void
+process_remote_delete(StringInfo s)
+{
+#ifdef VERBOSE_DELETE
+ StringInfoData o;
+#endif
+ char action;
+ EState *estate;
+ TupleTableSlot *slot;
+ Oid idxoid;
+ Relation rel;
+ Relation idxrel;
+ ScanKeyData skey[INDEX_MAX_KEYS];
+ bool found_old;
+
+ Assert(bdr_apply_worker != NULL);
+
+ bdr_performing_work();
+
+ rel = read_rel(s, RowExclusiveLock);
+
+ action = pq_getmsgbyte(s);
+
+ if (action != 'K' && action != 'E')
+ elog(ERROR, "expected action K or E got %c", action);
+
+ if (action == 'E')
+ {
+ elog(WARNING, "got delete without pkey");
+ return;
+ }
+
+ estate = bdr_create_rel_estate(rel);
+ slot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
+
+ read_tuple(s, rel, slot);
+
+ /* lookup index to build scankey */
+ if (rel->rd_indexvalid == 0)
+ RelationGetIndexList(rel);
+ idxoid = rel->rd_replidindex;
+ if (!OidIsValid(idxoid))
+ {
+ elog(ERROR, "could not find primary key for table with oid %u",
+ RelationGetRelid(rel));
+ return;
+ }
+
+ /* Now open the primary key index */
+ idxrel = index_open(idxoid, RowExclusiveLock);
+
+ if (rel->rd_rel->relkind != RELKIND_RELATION)
+ elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
+ rel->rd_rel->relkind, RelationGetRelationName(rel));
+
+#ifdef VERBOSE_DELETE
+ initStringInfo(&o);
+ tuple_to_stringinfo(&o, RelationGetDescr(idxrel), slot->tts_tuple);
+ elog(LOG, "DELETE old-key:%s", o.data);
+ resetStringInfo(&o);
+#endif
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ build_index_scan_key(skey, rel, idxrel, slot);
+
+ /* try to find tuple via a (candidate|primary) key */
+ found_old = find_pkey_tuple(skey, rel, idxrel, slot, true, LockTupleExclusive);
+
+ if (found_old)
+ {
+ simple_heap_delete(rel, &slot->tts_tuple->t_self);
+ bdr_count_delete();
+ }
+ else
+ {
+ StringInfoData s_key;
+
+ bdr_count_delete_conflict();
+
+ initStringInfo(&s_key);
+ tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), slot->tts_tuple);
ereport(ERROR,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg("CONFLICT: could not find existing tuple for pkey %s",
- o.data)));
+ errmsg("CONFLICT: DELETE could not find existing tuple for pkey %s", s_key.data)));
+ resetStringInfo(&s_key);
}
PopActiveSnapshot();
check_sequencer_wakeup(rel);
- /* release locks upon commit */
index_close(idxrel, NoLock);
heap_close(rel, NoLock);
fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
&remote_sysid, &remote_tli);
- Assert(remote_sysid == bdr_apply_worker->sysid);
- Assert(remote_tli == bdr_apply_worker->timeline);
+ Assert(remote_sysid == bdr_apply_worker->sysid);
+ Assert(remote_tli == bdr_apply_worker->timeline);
+
+ memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
+ MAXDATELEN);
+ memcpy(local_ts, timestamptz_to_str(ts),
+ MAXDATELEN);
+
+ initStringInfo(&s_key);
+ tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
+
+ ereport(LOG,
+ (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+ errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
+ apply_update ? "applying" : "skipping",
+ remote_sysid, remote_tli, remote_ts,
+ local_node_id == InvalidRepNodeId ? "local" : "remote",
+ local_sysid, local_tli, local_ts, s_key.data)));
+ resetStringInfo(&s_key);
+}
+
+static void
+do_apply_update(Relation rel, EState *estate,
+ TupleTableSlot *oldslot,
+ TupleTableSlot *newslot,
+ BDRTupleData new_tuple)
+{
+ HeapTuple nt;
+
+ nt = heap_modify_tuple(oldslot->tts_tuple, RelationGetDescr(rel),
+ new_tuple.values, new_tuple.isnull,
+ new_tuple.changed);
+ ExecStoreTuple(nt, newslot, InvalidBuffer, true);
+ simple_heap_update(rel, &oldslot->tts_tuple->t_self, newslot->tts_tuple);
+ UserTableUpdateIndexes(estate, newslot);
+ bdr_count_update();
+}
+
+
+static void
+process_queued_ddl_command(HeapTuple cmdtup, bool tx_just_started)
+{
+ Relation cmdsrel;
+#ifdef NOT_YET
+ HeapTuple newtup;
+#endif
+ Datum datum;
+ char *type;
+ char *identstr;
+ char *cmdstr;
+ bool isnull;
+
+ List *commands;
+ ListCell *command_i;
+ bool isTopLevel;
+ MemoryContext oldcontext;
+
+ /* ----
+ * We can't use spi here, because it implicitly assumes a transaction
+ * context. As we want to be able to replicate CONCURRENTLY commands,
+ * that's not going to work...
+ * So instead do all the work manually, being careful about managing the
+ * lifecycle of objects.
+ * ----
+ */
+ oldcontext = MemoryContextSwitchTo(MessageContext);
+
+ cmdsrel = heap_open(QueuedDDLCommandsRelid, NoLock);
+
+ /* fetch the object type */
+ datum = heap_getattr(cmdtup, 1,
+ RelationGetDescr(cmdsrel),
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null object type in command tuple in \"%s\"",
+ RelationGetRelationName(cmdsrel));
+ type = TextDatumGetCString(datum);
+
+ /* fetch the object identity */
+ datum = heap_getattr(cmdtup, 2,
+ RelationGetDescr(cmdsrel),
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null identity in command tuple for object of type %s",
+ RelationGetRelationName(cmdsrel));
+
+ identstr = TextDatumGetCString(datum);
+
+ /* finally fetch and execute the command */
+ datum = heap_getattr(cmdtup, 3,
+ RelationGetDescr(cmdsrel),
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null command in tuple for %s \"%s\"", type, identstr);
+
+ cmdstr = TextDatumGetCString(datum);
+
+ /* close relation, command execution might end/start xact */
+ heap_close(cmdsrel, NoLock);
+
+ commands = pg_parse_query(cmdstr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Do a limited amount of safety checking against CONCURRENTLY commands
+ * executed in situations where they aren't allowed. The sender side shoul
+ * provide protection, but better be safe than sorry.
+ */
+ isTopLevel = (list_length(commands) == 1) && tx_just_started;
+
+ foreach(command_i, commands)
+ {
+ List *plantree_list;
+ List *querytree_list;
+ Node *command = (Node *) lfirst(command_i);
+ const char *commandTag;
+ Portal portal;
+ DestReceiver *receiver;
+
+ /* temporarily push snapshot for parse analysis/planning */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ oldcontext = MemoryContextSwitchTo(MessageContext);
+
+ commandTag = CreateCommandTag(command);
+
+ querytree_list = pg_analyze_and_rewrite(
+ command, cmdstr, NULL, 0);
+
+ plantree_list = pg_plan_queries(
+ querytree_list, 0, NULL);
+
+ PopActiveSnapshot();
+
+ portal = CreatePortal("", true, true);
+ PortalDefineQuery(portal, NULL,
+ cmdstr, commandTag,
+ plantree_list, NULL);
+ PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+ receiver = CreateDestReceiver(DestNone);
+
+ (void) PortalRun(portal, FETCH_ALL,
+ isTopLevel,
+ receiver, receiver,
+ NULL);
+ (*receiver->rDestroy) (receiver);
+
+ PortalDrop(portal, false);
+
+ CommandCounterIncrement();
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+#ifdef NOT_YET
+ /* FIXME: update tuple to set set "executed" to true */
+ // newtup = heap_modify_tuple( .. );
+ newtup = cmdtup;
+#endif
+}
+
+static HeapTuple
+process_queued_drop(HeapTuple cmdtup)
+{
+ Relation cmdsrel;
+ HeapTuple newtup;
+ Datum arrayDatum;
+ ArrayType *array;
+ bool null;
+ Oid elmtype;
+ int16 elmlen;
+ bool elmbyval;
+ char elmalign;
+ Oid elmoutoid;
+ bool elmisvarlena;
+ TupleDesc elemdesc;
+ Datum *values;
+ int nelems;
+ int i;
+ ObjectAddresses *addresses;
+
+ cmdsrel = heap_open(QueuedDropsRelid, AccessShareLock);
+ arrayDatum = heap_getattr(cmdtup, 1,
+ RelationGetDescr(cmdsrel),
+ &null);
+ if (null)
+ {
+ elog(WARNING, "null dropped object array in command tuple in \"%s\"",
+ RelationGetRelationName(cmdsrel));
+ return cmdtup;
+ }
+ array = DatumGetArrayTypeP(arrayDatum);
+ elmtype = ARR_ELEMTYPE(array);
+
+ get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign);
+ deconstruct_array(array, elmtype,
+ elmlen, elmbyval, elmalign,
+ &values, NULL, &nelems);
+
+ getTypeOutputInfo(elmtype, &elmoutoid, &elmisvarlena);
+ elemdesc = TypeGetTupleDesc(elmtype, NIL);
+
+ addresses = new_object_addresses();
+
+ for (i = 0; i < nelems; i++)
+ {
+ HeapTupleHeader elemhdr;
+ HeapTupleData tmptup;
+ ObjectType objtype;
+ Datum datum;
+ bool isnull;
+ char *type;
+ List *objnames;
+ List *objargs = NIL;
+ Relation objrel;
+ ObjectAddress addr;
+
+ elemhdr = (HeapTupleHeader) DatumGetPointer(values[i]);
+ tmptup.t_len = HeapTupleHeaderGetDatumLength(elemhdr);
+ ItemPointerSetInvalid(&(tmptup.t_self));
+ tmptup.t_tableOid = InvalidOid;
+ tmptup.t_data = elemhdr;
+
+ /* obtain the object type as a C-string ... */
+ datum = heap_getattr(&tmptup, 1, elemdesc, &isnull);
+ if (isnull)
+ {
+ elog(WARNING, "null type !?");
+ continue;
+ }
+ type = TextDatumGetCString(datum);
+ objtype = unstringify_objtype(type);
+
+ if (objtype == OBJECT_TYPE ||
+ objtype == OBJECT_DOMAIN)
+ {
+ Datum *values;
+ bool *nulls;
+ int nelems;
+ char *typestring;
+ TypeName *typeName;
- memcpy(remote_ts, timestamptz_to_str(replication_origin_timestamp),
- MAXDATELEN);
- memcpy(local_ts, timestamptz_to_str(ts),
- MAXDATELEN);
+ datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
+ if (isnull)
+ {
+ elog(WARNING, "null typename !?");
+ continue;
+ }
- initStringInfo(&s_key);
- tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), old_key);
+ deconstruct_array(DatumGetArrayTypeP(datum),
+ TEXTOID, -1, false, 'i',
+ &values, &nulls, &nelems);
- ereport(LOG,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg("CONFLICT: %s remote update originating at node " UINT64_FORMAT ":%u at ts %s; row was previously updated at %s node " UINT64_FORMAT ":%u at ts %s. PKEY:%s",
- apply_update ? "applying" : "skipping",
- remote_sysid, remote_tli, remote_ts,
- local_node_id == InvalidRepNodeId ? "local" : "remote",
- local_sysid, local_tli, local_ts, s_key.data)));
- resetStringInfo(&s_key);
-}
+ typestring = TextDatumGetCString(values[0]);
+ typeName = typeStringToTypeName(typestring);
+ objnames = typeName->names;
+ }
+ else if (objtype == OBJECT_FUNCTION ||
+ objtype == OBJECT_AGGREGATE ||
+ objtype == OBJECT_OPERATOR)
+ {
+ Datum *values;
+ bool *nulls;
+ int nelems;
+ int i;
+ char *typestring;
-static void
-do_apply_update(Relation rel, EState *estate,
- TupleTableSlot *oldslot,
- TupleTableSlot *newslot,
- BDRTupleData new_tuple)
-{
- HeapTuple nt;
+ /* objname */
+ objnames = NIL;
+ datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
+ if (isnull)
+ {
+ elog(WARNING, "null objname !?");
+ continue;
+ }
- nt = heap_modify_tuple(oldslot->tts_tuple, RelationGetDescr(rel),
- new_tuple.values, new_tuple.isnull,
- new_tuple.changed);
- ExecStoreTuple(nt, newslot, InvalidBuffer, true);
- simple_heap_update(rel, &oldslot->tts_tuple->t_self, newslot->tts_tuple);
- UserTableUpdateIndexes(estate, newslot);
- bdr_count_update();
-}
+ deconstruct_array(DatumGetArrayTypeP(datum),
+ TEXTOID, -1, false, 'i',
+ &values, &nulls, &nelems);
+ for (i = 0; i < nelems; i++)
+ objnames = lappend(objnames,
+ makeString(TextDatumGetCString(values[i])));
-void
-process_remote_delete(StringInfo s)
-{
-#ifdef VERBOSE_DELETE
- StringInfoData o;
-#endif
- char action;
- EState *estate;
- TupleTableSlot *slot;
- Oid idxoid;
- Relation rel;
- Relation idxrel;
- ScanKeyData skey[INDEX_MAX_KEYS];
- bool found_old;
+ /* objargs are type names */
+ datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
+ if (isnull)
+ {
+ elog(WARNING, "null typename !?");
+ continue;
+ }
- Assert(bdr_apply_worker != NULL);
+ deconstruct_array(DatumGetArrayTypeP(datum),
+ TEXTOID, -1, false, 'i',
+ &values, &nulls, &nelems);
- bdr_performing_work();
+ for (i = 0; i < nelems; i++)
+ {
+ typestring = TextDatumGetCString(values[i]);
+ objargs = lappend(objargs, typeStringToTypeName(typestring));
+ }
+ }
+ else
+ {
+ Datum *values;
+ bool *nulls;
+ int nelems;
+ int i;
- rel = read_rel(s, RowExclusiveLock);
+ /* objname */
+ objnames = NIL;
+ datum = heap_getattr(&tmptup, 2, elemdesc, &isnull);
+ if (isnull)
+ {
+ elog(WARNING, "null objname !?");
+ continue;
+ }
- action = pq_getmsgbyte(s);
+ deconstruct_array(DatumGetArrayTypeP(datum),
+ TEXTOID, -1, false, 'i',
+ &values, &nulls, &nelems);
+ for (i = 0; i < nelems; i++)
+ objnames = lappend(objnames,
+ makeString(TextDatumGetCString(values[i])));
- if (action != 'K' && action != 'E')
- elog(ERROR, "expected action K or E got %c", action);
+ datum = heap_getattr(&tmptup, 3, elemdesc, &isnull);
+ if (!isnull)
+ {
+ Datum *values;
+ bool *nulls;
+ int nelems;
+ int i;
- if (action == 'E')
- {
- elog(WARNING, "got delete without pkey");
- return;
- }
+ deconstruct_array(DatumGetArrayTypeP(datum),
+ TEXTOID, -1, false, 'i',
+ &values, &nulls, &nelems);
+ for (i = 0; i < nelems; i++)
+ objargs = lappend(objargs,
+ makeString(TextDatumGetCString(values[i])));
+ }
+ }
- estate = bdr_create_rel_estate(rel);
- slot = ExecInitExtraTupleSlot(estate);
- ExecSetSlotDescriptor(slot, RelationGetDescr(rel));
+ addr = get_object_address(objtype, objnames, objargs, &objrel,
+ AccessExclusiveLock, false);
+ /* unsupported object? */
+ if (addr.classId == InvalidOid)
+ continue;
- read_tuple(s, rel, slot);
+ /*
+ * For certain objects, get_object_address returned us an open and
+ * locked relation. Close it because we have no use for it; but
+ * keeping the lock seems easier than figure out lock level to release.
+ */
+ if (objrel != NULL)
+ relation_close(objrel, NoLock);
- /* lookup index to build scankey */
- if (rel->rd_indexvalid == 0)
- RelationGetIndexList(rel);
- idxoid = rel->rd_replidindex;
- if (!OidIsValid(idxoid))
- {
- elog(ERROR, "could not find primary key for table with oid %u",
- RelationGetRelid(rel));
- return;
+ add_exact_object_address(&addr, addresses);
}
- /* Now open the primary key index */
- idxrel = index_open(idxoid, RowExclusiveLock);
-
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
- rel->rd_rel->relkind, RelationGetRelationName(rel));
-
-#ifdef VERBOSE_DELETE
- initStringInfo(&o);
- tuple_to_stringinfo(&o, RelationGetDescr(idxrel), slot->tts_tuple);
- elog(LOG, "DELETE old-key:%s", o.data);
- resetStringInfo(&o);
-#endif
+ performMultipleDeletions(addresses, DROP_RESTRICT, 0);
- PushActiveSnapshot(GetTransactionSnapshot());
+ newtup = cmdtup;
- build_index_scan_key(skey, rel, idxrel, slot);
+ heap_close(cmdsrel, AccessShareLock);
- /* try to find tuple via a (candidate|primary) key */
- found_old = find_pkey_tuple(skey, rel, idxrel, slot, true, LockTupleExclusive);
+ return newtup;
+}
- if (found_old)
+static void
+fetch_sysid_via_node_id(RepNodeId node_id, uint64 *sysid, TimeLineID *tli)
+{
+ if (node_id == InvalidRepNodeId)
{
- simple_heap_delete(rel, &slot->tts_tuple->t_self);
- bdr_count_delete();
+ *sysid = GetSystemIdentifier();
+ *tli = ThisTimeLineID;
}
else
{
- StringInfoData s_key;
+ HeapTuple node;
+ Form_pg_replication_identifier node_class;
+ char *ident;
- bdr_count_delete_conflict();
+ uint64 remote_sysid;
+ Oid remote_dboid;
+ TimeLineID remote_tli;
+ Oid local_dboid;
+ NameData replication_name;
- initStringInfo(&s_key);
- tuple_to_stringinfo(&s_key, RelationGetDescr(idxrel), slot->tts_tuple);
+ node = GetReplicationInfoByIdentifier(node_id, false);
- ereport(ERROR,
- (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
- errmsg("CONFLICT: DELETE could not find existing tuple for pkey %s", s_key.data)));
- resetStringInfo(&s_key);
- }
+ node_class = (Form_pg_replication_identifier) GETSTRUCT(node);
- PopActiveSnapshot();
+ ident = text_to_cstring(&node_class->riname);
- check_sequencer_wakeup(rel);
+ if (sscanf(ident, BDR_NODE_ID_FORMAT,
+ &remote_sysid, &remote_tli, &remote_dboid, &local_dboid,
+ NameStr(replication_name)) != 4)
+ elog(ERROR, "could not parse sysid: %s", ident);
+ ReleaseSysCache(node);
+ pfree(ident);
- index_close(idxrel, NoLock);
- heap_close(rel, NoLock);
+ *sysid = remote_sysid;
+ *tli = remote_tli;
+ }
+}
- ExecResetTupleTable(estate->es_tupleTable, true);
- FreeExecutorState(estate);
+static bool
+bdr_performing_work(void)
+{
+ if (started_transaction)
+ return false;
- CommandCounterIncrement();
+ started_transaction = true;
+ StartTransactionCommand();
+
+ return true;
}
static void