SELECT pg_catalog.pg_extension_config_dump('bdr_nodes', '');
+CREATE TYPE bdr_conflict_type AS ENUM
+(
+ 'insert_insert',
+ 'update_update',
+ 'update_delete',
+ 'unhandled_tx_abort'
+);
+
+COMMENT ON TYPE bdr_conflict_type IS 'The nature of a BDR apply conflict - concurrent updates (update_update), conflicting inserts, etc.';
+
+CREATE TYPE bdr_conflict_resolution AS ENUM
+(
+ 'conflict_trigger_skip_change',
+ 'conflict_trigger_returned_tuple',
+ 'last_update_wins_keep_local',
+ 'last_update_wins_keep_remote',
+ 'unhandled_tx_abort'
+);
+
+COMMENT ON TYPE bdr_conflict_resolution IS 'Resolution of a bdr conflict - if a conflict was resolved by a conflict trigger, by last-update-wins tests on commit timestamps, etc.';
+
+--
+-- bdr_conflict_history records apply conflicts so they can be queried and
+-- analysed by administrators.
+--
+-- This must remain in sync with bdr_log_handled_conflict(...) and
+-- struct BdrApplyConflict
+--
+CREATE SEQUENCE bdr_conflict_history_id_seq;
+
+CREATE TABLE bdr_conflict_history (
+ conflict_id bigint not null default nextval('bdr_conflict_history_id_seq'),
+ local_node_sysid text not null, -- really uint64 but we don't have the type for it
+ PRIMARY KEY (local_node_sysid, conflict_id),
+
+ local_conflict_xid xid not null, -- xid of conflicting apply tx
+ local_conflict_lsn pg_lsn not null, -- lsn of local node at the time the conflict was detected
+ local_conflict_time timestamptz not null,
+ object_schema text,
+ object_name text,
+ remote_node_sysid text not null, -- again, really uint64
+ remote_txid xid not null,
+ remote_commit_time timestamptz not null,
+ remote_commit_lsn pg_lsn not null,
+ conflict_type bdr_conflict_type not null,
+ conflict_resolution bdr_conflict_resolution not null,
+ local_tuple json,
+ remote_tuple json,
+ local_tuple_xmin xid,
+ local_tuple_origin_sysid text, -- also really uint64
+
+ -- The following apply only for unhandled apply errors and
+ -- correspond to fields in ErrorData in elog.h .
+ error_message text,
+ error_sqlstate text CHECK (length(error_sqlstate) = 5),
+ error_querystring text,
+ error_cursorpos integer,
+ error_detail text,
+ error_hint text,
+ error_context text,
+ error_columnname text, -- schema and table in object_schema, object_name above
+ error_typename text,
+ error_constraintname text,
+ error_filename text,
+ error_lineno integer,
+ error_funcname text
+);
+
+ALTER SEQUENCE bdr_conflict_history_id_seq OWNED BY bdr_conflict_history.conflict_id;
+
+COMMENT ON TABLE bdr_conflict_history IS 'Log of all conflicts in this BDR group';
+COMMENT ON COLUMN bdr_conflict_history.local_node_sysid IS 'sysid of the local node where the apply conflict occurred';
+COMMENT ON COLUMN bdr_conflict_history.remote_node_sysid IS 'sysid of the remote node the conflicting transaction originated from';
+COMMENT ON COLUMN bdr_conflict_history.object_schema IS 'Schema of the object involved in the conflict';
+COMMENT ON COLUMN bdr_conflict_history.object_name IS 'Name of the object (table, etc) involved in the conflict';
+COMMENT ON COLUMN bdr_conflict_history.local_conflict_xid IS 'Transaction ID of the apply transaction that encountered the conflict';
+COMMENT ON COLUMN bdr_conflict_history.local_conflict_lsn IS 'xlog position at the time the conflict occured on the applying node';
+COMMENT ON COLUMN bdr_conflict_history.local_conflict_time IS 'The time the conflict was detected on the applying node';
+COMMENT ON COLUMN bdr_conflict_history.remote_txid IS 'xid of the remote transaction involved in the conflict';
+COMMENT ON COLUMN bdr_conflict_history.remote_commit_time IS 'The time the remote transaction involved in this conflict committed';
+COMMENT ON COLUMN bdr_conflict_history.remote_commit_lsn IS 'LSN on remote node at which conflicting transaction committed';
+COMMENT ON COLUMN bdr_conflict_history.conflict_type IS 'Nature of the conflict - insert/insert, update/delete, etc';
+COMMENT ON COLUMN bdr_conflict_history.local_tuple IS 'For DML conflicts, the conflicting tuple from the local DB (as json), if logged';
+COMMENT ON COLUMN bdr_conflict_history.local_tuple_xmin IS 'If local_tuple is set, the xmin of the conflicting local tuple';
+COMMENT ON COLUMN bdr_conflict_history.local_tuple_origin_sysid IS 'The node id for the true origin of the local tuple. Differs from local_node_sysid if the tuple was originally replicated from another node.';
+COMMENT ON COLUMN bdr_conflict_history.remote_tuple IS 'For DML conflicts, the conflicting tuple from the remote DB (as json), if logged';
+COMMENT ON COLUMN bdr_conflict_history.conflict_resolution IS 'How the conflict was resolved/handled; see the enum definition';
+COMMENT ON COLUMN bdr_conflict_history.error_message IS 'On apply error, the error message from ereport/elog. Other error fields match.';
+
+SELECT pg_catalog.pg_extension_config_dump('bdr_conflict_history', '');
+REVOKE ALL ON TABLE bdr_conflict_history FROM PUBLIC;
+
+
-- This type is tailored to use as input to get_object_address
CREATE TYPE bdr.dropped_object AS
(objtype text, objnames text[], objargs text[]);
bool exit_worker = false;
/* During apply, holds xid of remote transaction */
-static TransactionId replication_origin_xid = InvalidTransactionId;
+TransactionId replication_origin_xid = InvalidTransactionId;
/*
* This code only runs within an apply bgworker, so we can stash a pointer to our
static void check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
BDRRelation *rel, HeapTuple local_tuple,
HeapTuple remote_tuple, HeapTuple *new_tuple,
- bool *perform_update, bool *log_update);
+ bool *perform_update, bool *log_update,
+ BdrConflictResolution *resolution);
static void do_log_update(RepNodeId local_node_id, bool apply_update,
TimestampTz ts, Relation idxrel, BDRRelation *rel,
HeapTuple old_key, HeapTuple user_tuple);
bool apply_update;
bool log_update;
CommitExtraData local_node_id_raw;
+ BdrConflictResolution resolution;
/* refetch tuple, check for old commit ts & origin */
xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
/*
- * We now need to determine whether to keep the original version of
- * the row, or apply the insert (as an update) we received. We use
- * the last-update-wins strategy for this, except when the new update
- * comes from the same node that originated the previous version of
- * the tuple.
+ * Use conflict triggers and/or last-update-wins to decide which tuple
+ * to retain.
*/
TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
local_node_id = local_node_id_raw;
check_apply_update(local_node_id, local_ts, rel,
- NULL, NULL, NULL, &apply_update, &log_update);
+ NULL, NULL, NULL, &apply_update, &log_update,
+ &resolution);
- elog(LOG, "insert vs insert conflict: %s",
- apply_update ? "update" : "ignore");
+ if (log_update)
+ /* TODO: Roll into conflict logging code */
+ elog(DEBUG2, "bdr: insert vs insert conflict: %s",
+ apply_update ? "update" : "ignore");
if (apply_update)
{
/* races will be resolved by abort/retry */
UserTableUpdateOpenIndexes(estate, slot);
}
+
+ if (log_update)
+ {
+ bdr_conflict_log(BdrConflictType_InsertInsert, resolution,
+ replication_origin_xid, rel, oldslot,
+ local_node_id, slot, NULL /*no error*/);
+ }
}
else
{
RepNodeId local_node_id;
bool apply_update;
bool log_update;
+ BdrConflictResolution resolution;
CommitExtraData local_node_id_raw;
xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
/*
- * We now need to determine whether to keep the original version of
- * the row, or apply the update we received. We use the
- * last-update-wins strategy for this, except when the new update
- * comes from the same node that originated the previous version of
- * the tuple.
+ * Use conflict triggers and/or last-update-wins to decide which tuple
+ * to retain.
*/
TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
local_node_id = local_node_id_raw;
check_apply_update(local_node_id, local_ts, rel, oldslot->tts_tuple,
remote_tuple, &user_tuple, &apply_update,
- &log_update);
+ &log_update, &resolution);
if (log_update)
do_log_update(local_node_id, apply_update, local_ts,
}
else
{
+ /*
+ * Update target is missing. We don't know if this is an update-vs-delete
+ * conflict or if the target tuple came from some 3rd node and hasn't yet
+ * been applied to the local node.
+ */
initStringInfo(&o);
tuple_to_stringinfo(&o, RelationGetDescr(rel->rel),
oldslot->tts_tuple);
}
/*
- * Check whether a remote update conflicts with the local row version.
+ * Check whether a remote insert or update conflicts with the local row
+ * version.
+ *
+ * User-defined conflict triggers get invoked here.
*
* perform_update, log_update is set to true if the update should be performed
* and logged respectively
+ *
+ * resolution is set to indicate how the conflict was resolved if log_update
+ * is true. Its value is undefined if log_update is false.
*/
static void
check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
BDRRelation *rel, HeapTuple local_tuple, HeapTuple remote_tuple,
- HeapTuple *new_tuple, bool *perform_update, bool *log_update)
+ HeapTuple *new_tuple, bool *perform_update, bool *log_update,
+ BdrConflictResolution *resolution)
{
uint64 local_sysid,
remote_sysid;
else
{
/*
- * Decide what update wins based on transaction timestamp difference.
- * The later transaction wins. If the timestamps compare equal, use
- * sysid + TLI to discern.
+ * Decide whether to keep the remote or local tuple based on a conflict
+ * trigger (if defined) or last-update-wins.
+ *
+ * If the caller doesn't provide storage for the conflict handler to
+ * store a new tuple in, don't fire any conflict triggers.
*/
+ *log_update = true;
if (new_tuple)
{
/*
- * We let users decide how the conflict should be resolved; if no
- * trigger could be found or if the trigger decided not to care,
- * we fall back to „last update wins“
+ * --------------
+ * Conflict trigger conflict handling - let the user decide whether to:
+ * - Ignore the remote update;
+ * - Supply a new tuple to replace the current tuple; or
+ * - Take no action and fall through to the next handling option
+ * --------------
*/
-
TimestampDifference(replication_origin_timestamp, local_ts,
&secs, µsecs);
if (skip)
{
*perform_update = false;
- *log_update = false;
+ *resolution = BdrConflictResolution_ConflictTriggerSkipChange;
return;
}
else if (*new_tuple)
{
*perform_update = true;
- *log_update = true;
+ *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
return;
}
-
/*
* if user decided not to skip the conflict but didn't provide a
* resolving tuple we fall back to default handling
*/
}
-
+ /* Last update wins conflict handling */
cmp = timestamptz_cmp_internal(replication_origin_timestamp, local_ts);
-
if (cmp > 0)
{
+ /* The most recent update is the remote one; apply it */
*perform_update = true;
- *log_update = false;
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+ return;
+ }
+ else if (cmp < 0)
+ {
+ /* The most recent update is the local one; retain it */
+ *perform_update = false;
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
return;
}
else if (cmp == 0)
{
+ /*
+ * Timestamps are equal. Use sysid + timeline id to decide which
+ * tuple to retain.
+ */
fetch_sysid_via_node_id(local_node_id,
&local_sysid, &local_tli);
fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
&remote_sysid, &remote_tli);
/*
- * Always ignore this update if the user decides to; otherwise
- * apply the user tuple or, if none supplied, fall back to „last
- * update wins“
+ * Apply the user tuple or, if none supplied, fall back to "last
+ * update wins"
*/
if (local_sysid < remote_sysid)
*perform_update = true;
/* shouldn't happen */
elog(ERROR, "unsuccessful node comparison");
- *log_update = true;
- return;
- }
- else
- {
- *perform_update = false;
- *log_update = true;
+ /*
+ * We don't log whether we used timestamp, sysid or timeline id to
+ * decide which tuple to retain. That'll be in the log record
+ * anyway, so we can reconstruct the decision from the log record
+ * later.
+ */
+ if (*perform_update)
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+ else
+ *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
return;
}
--- /dev/null
+#include "postgres.h"
+
+#include "bdr.h"
+
+#include "funcapi.h"
+
+#include "access/xact.h"
+
+#include "catalog/index.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/sequence.h"
+
+#include "replication/replication_identifier.h"
+
+#include "tcop/tcopprot.h"
+
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/json.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
+
+static Oid BdrConflictTypeOid = InvalidOid;
+static Oid BdrConflictResolutionOid = InvalidOid;
+static Oid BdrConflictHistorySeqId = InvalidOid;
+
+static bool bdr_log_conflicts_to_table = false;
+static bool bdr_conflict_logging_include_tuples = false;
+
+/*
+ * All this code runs only in the context of an apply worker, so
+ * we can access the apply worker state global safely
+ */
+extern BdrApplyWorker *bdr_apply_worker;
+
+#define BDR_CONFLICT_HISTORY_COLS 30
+#define SYSID_DIGITS 33
+
+/*
+ * Details of a conflict detected by an apply process, destined for logging
+ * output and/or conflict triggers.
+ *
+ * Closely related to bdr.bdr_conflict_history SQL table.
+ */
+typedef struct BdrApplyConflict
+{
+ TransactionId local_conflict_txid;
+ XLogRecPtr local_conflict_lsn;
+ TimestampTz local_conflict_time;
+ const char *object_schema; /* unused if apply_error */
+ const char *object_name; /* unused if apply_error */
+ uint64 remote_sysid;
+ TransactionId remote_txid;
+ TimestampTz remote_commit_time;
+ XLogRecPtr remote_commit_lsn;
+ BdrConflictType conflict_type;
+ BdrConflictResolution conflict_resolution;
+ bool local_tuple_null;
+ Datum local_tuple; /* composite */
+ TransactionId local_tuple_xmin;
+ uint64 local_tuple_origin_sysid; /* init to 0 if unknown */
+ bool remote_tuple_null;
+ Datum remote_tuple; /* composite */
+ ErrorData *apply_error;
+} BdrApplyConflict;
+
+/*
+ * Perform syscache lookups etc for BDR conflict logging.
+ *
+ * Must be called during apply worker startup, after schema
+ * maintenance.
+ *
+ * Runs even if !bdr_log_conflicts_to_table as that can be
+ * toggled at runtime.
+ */
+void
+bdr_conflict_logging_startup()
+{
+ Oid schema_oid;
+
+ StartTransactionCommand();
+
+ schema_oid = get_namespace_oid("bdr", false);
+
+ BdrConflictTypeOid = GetSysCacheOid2(TYPENAMENSP,
+ CStringGetDatum("bdr_conflict_type"), ObjectIdGetDatum(schema_oid));
+
+ if (BdrConflictTypeOid == InvalidOid)
+ elog(ERROR, "Type cache lookup failed for bdr_conflict_type in schema %d",
+ schema_oid);
+
+ BdrConflictResolutionOid = GetSysCacheOid2(TYPENAMENSP,
+ CStringGetDatum("bdr_conflict_resolution"),
+ ObjectIdGetDatum(schema_oid));
+
+ if (BdrConflictResolutionOid == InvalidOid)
+ elog(ERROR, "Type cache lookup failed for bdr_conflict_resolution in schema %d",
+ schema_oid);
+
+ BdrConflictHistorySeqId = get_relname_relid("bdr_conflict_history_id_seq",
+ ObjectIdGetDatum(schema_oid));
+
+ if (BdrConflictHistorySeqId == InvalidOid)
+ elog(ERROR, "Relcache lookup failed for bdr_conflict_history_id_seq in schema %d",
+ schema_oid);
+
+ CommitTransactionCommand();
+}
+
+void
+bdr_conflict_logging_create_gucs()
+{
+ DefineCustomBoolVariable("bdr.log_conflicts_to_table",
+ "Log BDR conflicts to bdr.conflict_history table",
+ NULL,
+ &bdr_log_conflicts_to_table,
+ false,
+ PGC_SIGHUP,
+ 0,
+ NULL, NULL, NULL);
+
+ DefineCustomBoolVariable("bdr.conflict_logging_include_tuples",
+ "Log whole tuples when logging BDR conflicts",
+ NULL,
+ &bdr_conflict_logging_include_tuples,
+ true,
+ PGC_SIGHUP,
+ 0,
+ NULL, NULL, NULL);
+}
+
+/* Get the enum oid for a given BdrConflictType */
+static Datum
+bdr_conflict_type_get_datum(BdrConflictType conflict_type)
+{
+ Oid conflict_type_oid;
+ char *enumname = NULL;
+
+ switch(conflict_type)
+ {
+ case BdrConflictType_InsertInsert:
+ enumname = "insert_insert";
+ break;
+ case BdrConflictType_UpdateUpdate:
+ enumname = "update_update";
+ break;
+ case BdrConflictType_UpdateDelete:
+ enumname = "update_delete";
+ break;
+ case BdrConflictType_UnhandledTxAbort:
+ enumname = "unhandled_tx_abort";
+ break;
+ }
+ Assert(enumname != NULL);
+ conflict_type_oid = GetSysCacheOid2(ENUMTYPOIDNAME,
+ BdrConflictTypeOid, CStringGetDatum(enumname));
+ if (conflict_type_oid == InvalidOid)
+ elog(ERROR, "syscache lookup for enum %s of type "
+ "bdr.bdr_conflict_type failed", enumname);
+ return conflict_type_oid;
+}
+
+/* Get the enum oid for a given BdrConflictResolution */
+static Datum
+bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
+{
+ Oid conflict_resolution_oid;
+ char *enumname = NULL;
+
+ switch (conflict_resolution)
+ {
+ case BdrConflictResolution_ConflictTriggerSkipChange:
+ enumname = "conflict_trigger_skip_change";
+ break;
+ case BdrConflictResolution_ConflictTriggerReturnedTuple:
+ enumname = "conflict_trigger_returned_tuple";
+ break;
+ case BdrConflictResolution_LastUpdateWins_KeepLocal:
+ enumname = "last_update_wins_keep_local";
+ break;
+ case BdrConflictResolution_LastUpdateWins_KeepRemote:
+ enumname = "last_update_wins_keep_remote";
+ break;
+ case BdrConflictResolution_UnhandledTxAbort:
+ enumname = "unhandled_tx_abort";
+ break;
+ }
+ Assert(enumname != NULL);
+ conflict_resolution_oid = GetSysCacheOid2(ENUMTYPOIDNAME,
+ BdrConflictResolutionOid, CStringGetDatum(enumname));
+ if (conflict_resolution_oid == InvalidOid)
+ elog(ERROR, "syscache lookup for enum %s of type "
+ "bdr.bdr_conflict_resolution failed", enumname);
+ return conflict_resolution_oid;
+}
+
+/*
+ * Convert the target row to json form if it isn't null.
+ */
+static Datum
+bdr_conflict_row_to_json(Datum row, bool row_isnull, bool *ret_isnull)
+{
+ Datum row_json;
+ if (row_isnull)
+ {
+ row_json = (Datum) 0;
+ *ret_isnull = 1;
+ }
+ else
+ {
+ /*
+ * We don't handle errors with a PG_TRY / PG_CATCH here, because that's
+ * not sufficient to make the transaction usable given that we might
+ * fail in user defined casts, etc. We'd need a full savepoint, which
+ * is too expensive. So if this fails we'll just propagate the exception
+ * and abort the apply transaction.
+ *
+ * It shouldn't fail unless something's pretty broken anyway.
+ */
+ row_json = DirectFunctionCall1(row_to_json, row);
+ *ret_isnull = 0;
+ }
+ return row_json;
+}
+
+static void
+bdr_conflict_strtodatum(bool *nulls, Datum *values, int idx,
+ const char *in_str)
+{
+ if (in_str == NULL)
+ {
+ nulls[idx] = true;
+ values[idx] = (Datum) 0;
+ }
+ else
+ {
+ nulls[idx] = false;
+ values[idx] = CStringGetTextDatum(in_str);
+ }
+}
+
+/*
+ * Log a BDR apply conflict to the bdr.bdr_conflict_history table.
+ *
+ * The change will then be replicated to other nodes.
+ */
+static void
+bdr_conflict_log_table(BdrApplyConflict *conflict)
+{
+ Datum values[BDR_CONFLICT_HISTORY_COLS];
+ bool nulls[BDR_CONFLICT_HISTORY_COLS];
+ int attno;
+ int object_schema_attno, object_name_attno;
+ char sqlstate[12];
+ Relation log_rel;
+ HeapTuple log_tup;
+ TupleTableSlot *log_slot;
+ EState *log_estate;
+ char local_sysid[SYSID_DIGITS];
+ char remote_sysid[SYSID_DIGITS];
+ char origin_sysid[SYSID_DIGITS];
+
+ if (!bdr_log_conflicts_to_table)
+ /* No logging enabled and we don't own any memory, just bail */
+ return;
+
+ /* Pg has no uint64 SQL type so we have to store all them as text */
+ snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+
+ snprintf(remote_sysid, sizeof(remote_sysid), UINT64_FORMAT,
+ conflict->remote_sysid);
+
+ if (conflict->local_tuple_origin_sysid != 0)
+ snprintf(origin_sysid, sizeof(origin_sysid), UINT64_FORMAT,
+ conflict->local_tuple_origin_sysid);
+ else
+ origin_sysid[0] = '\0';
+
+ memset(nulls, 0, sizeof(bool) * BDR_CONFLICT_HISTORY_COLS);
+ memset(values, 0, sizeof(Datum) * BDR_CONFLICT_HISTORY_COLS);
+
+ /* Begin forming the tuple. See the extension SQL file for field info. */
+ attno = 0;
+ values[attno++] = DirectFunctionCall1(nextval_oid,
+ BdrConflictHistorySeqId);
+ values[attno++] = CStringGetTextDatum(local_sysid);
+ values[attno++] = TransactionIdGetDatum(conflict->local_conflict_txid);
+ values[attno++] = LSNGetDatum(conflict->local_conflict_lsn);
+ values[attno++] = TimestampTzGetDatum(conflict->local_conflict_time);
+
+ object_schema_attno = attno;
+ bdr_conflict_strtodatum(nulls, values, attno++, conflict->object_schema);
+
+ object_name_attno = attno;
+ bdr_conflict_strtodatum(nulls, values, attno++, conflict->object_name);
+
+ values[attno++] = CStringGetTextDatum(remote_sysid);
+ if (conflict->remote_txid != InvalidTransactionId)
+ values[attno] = TransactionIdGetDatum(conflict->remote_txid);
+ else
+ nulls[attno] = 1;
+ attno++;
+
+ values[attno++] = TimestampTzGetDatum(conflict->remote_commit_time);
+ values[attno++] = LSNGetDatum(conflict->remote_commit_lsn);
+ values[attno++] = bdr_conflict_type_get_datum(conflict->conflict_type);
+
+ values[attno++] =
+ bdr_conflict_resolution_get_datum(conflict->conflict_resolution);
+
+ values[attno] = bdr_conflict_row_to_json(conflict->local_tuple,
+ conflict->local_tuple_null, &nulls[attno]);
+ attno++;
+
+ values[attno] = bdr_conflict_row_to_json(conflict->remote_tuple,
+ conflict->remote_tuple_null, &nulls[attno]);
+ attno++;
+
+ if (conflict->local_tuple_xmin != InvalidTransactionId)
+ values[attno] = TransactionIdGetDatum(conflict->local_tuple_xmin);
+ else
+ nulls[attno] = 1;
+ attno++;
+
+ if (conflict->local_tuple_origin_sysid != 0)
+ values[attno] = CStringGetTextDatum(origin_sysid);
+ else
+ nulls[attno] = 1;
+ attno++;
+
+ if (conflict->apply_error == NULL)
+ {
+ /* all the 13 remaining cols are error_ cols and are all null */
+ memset(&nulls[attno], 1, sizeof(bool) * 13);
+ attno += 13;
+ }
+ else
+ {
+ /*
+ * There's error data to log. We don't attempt to log it selectively,
+ * as bdr apply errors are not supposed to be routine anyway.
+ */
+ ErrorData *edata = conflict->apply_error;
+
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->message);
+
+ /*
+ * Always log the SQLSTATE. If it's ERRCODE_INTERNAL_ERROR - like after
+ * an elog(...) - we'll just be writing XX0000, but that's still better
+ * than nothing.
+ */
+ strncpy(sqlstate, unpack_sql_state(edata->sqlerrcode), 12);
+ sqlstate[sizeof(sqlstate)-1] = '\0';
+ values[attno] = CStringGetTextDatum(sqlstate);
+
+ /*
+ * We'd like to log the statement running at the time of the ERROR (for
+ * DDL apply errors) but have no reliable way to acquire it yet. So for
+ * now...
+ */
+ nulls[attno] = 1;
+ attno++;
+
+ if (edata->cursorpos != 0)
+ values[attno] = Int32GetDatum(edata->cursorpos);
+ else
+ nulls[attno] = 1;
+ attno++;
+
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->detail);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->hint);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->context);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->column_name);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->datatype_name);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->constraint_name);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->filename);
+ values[attno++] = Int32GetDatum(edata->lineno);
+ bdr_conflict_strtodatum(nulls, values, attno++, edata->funcname);
+
+ /* Set schema and table name based on the error, not arg values */
+ bdr_conflict_strtodatum(nulls, values, object_schema_attno,
+ edata->schema_name);
+ bdr_conflict_strtodatum(nulls, values, object_name_attno,
+ edata->table_name);
+
+ /* note: do NOT free the errordata, it's the caller's responsibility */
+ }
+
+ /* Make sure assignments match allocated tuple size */
+ Assert(attno == BDR_CONFLICT_HISTORY_COLS);
+
+ /*
+ * Construct a bdr.bdr_conflict_history tuple from the conflict info we've
+ * been passed and insert it into bdr.bdr_conflict_history.
+ */
+ log_rel = heap_open(BdrConflictHistoryRelId, RowExclusiveLock);
+
+ /* Prepare executor state for index updates */
+ log_estate = bdr_create_rel_estate(log_rel);
+ log_slot = ExecInitExtraTupleSlot(log_estate);
+ ExecSetSlotDescriptor(log_slot, RelationGetDescr(log_rel));
+ /* Construct the tuple and insert it */
+ log_tup = heap_form_tuple(RelationGetDescr(log_rel), values, nulls);
+ ExecStoreTuple(log_tup, log_slot, InvalidBuffer, true);
+ simple_heap_insert(log_rel, log_slot->tts_tuple);
+ /* Then do any index maintanence required */
+ UserTableUpdateIndexes(log_estate, log_slot);
+ /* and finish up */
+ heap_close(log_rel, RowExclusiveLock);
+ FreeExecutorState(log_estate);
+}
+
+/*
+ * Log a BDR apply conflict to the bdr.bdr_conflict_history table and/or
+ * system log.
+ *
+ * If a transaction is in progress, the current transaction is used, in which
+ * case the write is lost if that transaction subsequently aborts. If no
+ * transaction is already open the caller must open one and commit it after
+ * calling.
+ *
+ * Any open aborted transaction must be rolled back before calling.
+ *
+ * If apply_error is passed then this call is because of an unhandled error. In
+ * this case the passed object_schema and object_name are ignored in favour of
+ * those in the ErrorData struct and all the error_ fields are populated from
+ * the ErrorData struct.
+ *
+ * Any palloc'd or copied values passed must be freed by the caller after
+ * bdr_conflict_log returns. It won't retain references to any values and won't
+ * free any of the values its self.
+ *
+ * The origin id - i.e. the id of the node the local tuple was first created
+ * on, in case the local tuple was originally replicated from another node -
+ * may be obtained with TransactionIdGetCommitTsData on the xmin of the
+ * TupleTableSlot for the tuple. It isn't done here because the caller
+ * frequently already has the node id to hand.
+ */
+void
+bdr_conflict_log(BdrConflictType conflict_type,
+ BdrConflictResolution resolution, TransactionId remote_txid,
+ BDRRelation *conflict_relation, TupleTableSlot *local_tuple,
+ RepNodeId local_tuple_origin_id, TupleTableSlot *remote_tuple,
+ ErrorData *apply_error)
+{
+ MemoryContext log_context, old_context;
+ BdrApplyConflict conflict;
+ TimeLineID tli;
+
+ if (IsAbortedTransactionBlockState())
+ elog(ERROR, "bdr: attempt to log conflict in aborted transaction");
+
+ if (!IsTransactionState())
+ elog(ERROR, "bdr: attempt to log conflict without surrounding transaction");
+
+ /* We want our own memory ctx to clean up easily & reliably */
+ log_context = AllocSetContextCreate(CurrentMemoryContext,
+ "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
+ old_context = MemoryContextSwitchTo(log_context);
+
+ /* Populate the conflict record we're going to log */
+ conflict.conflict_type = conflict_type;
+ conflict.conflict_resolution = resolution;
+
+ conflict.local_conflict_txid = GetTopTransactionIdIfAny();
+ conflict.local_conflict_lsn = GetXLogInsertRecPtr();
+ conflict.local_conflict_time = GetCurrentTimestamp();
+ conflict.remote_txid = remote_txid;
+
+ /* set using bdr_conflict_setrel */
+ if (conflict_relation == NULL)
+ {
+ conflict.object_schema = NULL;
+ conflict.object_name = NULL;
+ }
+ else
+ {
+ conflict.object_name = RelationGetRelationName(conflict_relation->rel);
+ conflict.object_schema =
+ get_namespace_name(RelationGetNamespace(conflict_relation->rel));
+ }
+
+ /* TODO: May make sense to cache the remote sysid in a global too... */
+ fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+ &conflict.remote_sysid, &tli);
+ conflict.remote_commit_time = replication_origin_timestamp;
+ conflict.remote_txid = remote_txid;
+ conflict.remote_commit_lsn = replication_origin_lsn;
+
+ if (local_tuple != NULL)
+ {
+ /* Log local tuple xmin even if actual tuple value logging is off */
+ conflict.local_tuple_xmin =
+ HeapTupleHeaderGetXmin(local_tuple->tts_tuple->t_data);
+ Assert(conflict.local_tuple_xmin >= FirstNormalTransactionId ||
+ conflict.local_tuple_xmin == FrozenTransactionId);
+ if (bdr_conflict_logging_include_tuples)
+ {
+ conflict.local_tuple = ExecFetchSlotTupleDatum(local_tuple);
+ conflict.local_tuple_null = false;
+ }
+ }
+ else
+ {
+ conflict.local_tuple_null = true;
+ conflict.local_tuple = (Datum) 0;
+ conflict.local_tuple_xmin = InvalidTransactionId;
+ }
+
+ if (local_tuple_origin_id != InvalidRepNodeId)
+ {
+ fetch_sysid_via_node_id(local_tuple_origin_id,
+ &conflict.local_tuple_origin_sysid, &tli);
+ }
+ else
+ {
+ conflict.local_tuple_origin_sysid = 0;
+ }
+
+ if (remote_tuple != NULL && bdr_conflict_logging_include_tuples)
+ {
+ conflict.remote_tuple = ExecFetchSlotTupleDatum(remote_tuple);
+ conflict.remote_tuple_null = false;
+ }
+ else
+ {
+ conflict.remote_tuple_null = true;
+ conflict.remote_tuple = (Datum) 0;
+ }
+
+ conflict.apply_error = apply_error;
+
+ bdr_conflict_log_table(&conflict);
+
+ MemoryContextSwitchTo(old_context);
+ MemoryContextDelete(log_context);
+}