From: Craig Ringer Date: Mon, 5 May 2014 06:54:27 +0000 (+0800) Subject: bdr: Log conflicts to a bdr.bdr_conflict_history table X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/?a=commitdiff_plain;h=ec926b9788f36fa78214989ef2cb3ea2facab5af;p=users%2Fandresfreund%2Fpostgres.git bdr: Log conflicts to a bdr.bdr_conflict_history table Creates a bdr.bdr_conflict_history table and logs conflicts to it when they're detected, with details on both sides of the error. Currently only logs insert/insert conflicts. The infrastructure for reporting ERROR conflicts is present but not yet used. --- diff --git a/contrib/bdr/bdr--0.5.sql b/contrib/bdr/bdr--0.5.sql index 45e7adfc13..5b43052e04 100644 --- a/contrib/bdr/bdr--0.5.sql +++ b/contrib/bdr/bdr--0.5.sql @@ -321,6 +321,99 @@ COMMENT ON COLUMN bdr_nodes.node_status IS 'Readiness of the node: [i]nitializin SELECT pg_catalog.pg_extension_config_dump('bdr_nodes', ''); +CREATE TYPE bdr_conflict_type AS ENUM +( + 'insert_insert', + 'update_update', + 'update_delete', + 'unhandled_tx_abort' +); + +COMMENT ON TYPE bdr_conflict_type IS 'The nature of a BDR apply conflict - concurrent updates (update_update), conflicting inserts, etc.'; + +CREATE TYPE bdr_conflict_resolution AS ENUM +( + 'conflict_trigger_skip_change', + 'conflict_trigger_returned_tuple', + 'last_update_wins_keep_local', + 'last_update_wins_keep_remote', + 'unhandled_tx_abort' +); + +COMMENT ON TYPE bdr_conflict_resolution IS 'Resolution of a bdr conflict - if a conflict was resolved by a conflict trigger, by last-update-wins tests on commit timestamps, etc.'; + +-- +-- bdr_conflict_history records apply conflicts so they can be queried and +-- analysed by administrators. +-- +-- This must remain in sync with bdr_log_handled_conflict(...) and +-- struct BdrApplyConflict +-- +CREATE SEQUENCE bdr_conflict_history_id_seq; + +CREATE TABLE bdr_conflict_history ( + conflict_id bigint not null default nextval('bdr_conflict_history_id_seq'), + local_node_sysid text not null, -- really uint64 but we don't have the type for it + PRIMARY KEY (local_node_sysid, conflict_id), + + local_conflict_xid xid not null, -- xid of conflicting apply tx + local_conflict_lsn pg_lsn not null, -- lsn of local node at the time the conflict was detected + local_conflict_time timestamptz not null, + object_schema text, + object_name text, + remote_node_sysid text not null, -- again, really uint64 + remote_txid xid not null, + remote_commit_time timestamptz not null, + remote_commit_lsn pg_lsn not null, + conflict_type bdr_conflict_type not null, + conflict_resolution bdr_conflict_resolution not null, + local_tuple json, + remote_tuple json, + local_tuple_xmin xid, + local_tuple_origin_sysid text, -- also really uint64 + + -- The following apply only for unhandled apply errors and + -- correspond to fields in ErrorData in elog.h . + error_message text, + error_sqlstate text CHECK (length(error_sqlstate) = 5), + error_querystring text, + error_cursorpos integer, + error_detail text, + error_hint text, + error_context text, + error_columnname text, -- schema and table in object_schema, object_name above + error_typename text, + error_constraintname text, + error_filename text, + error_lineno integer, + error_funcname text +); + +ALTER SEQUENCE bdr_conflict_history_id_seq OWNED BY bdr_conflict_history.conflict_id; + +COMMENT ON TABLE bdr_conflict_history IS 'Log of all conflicts in this BDR group'; +COMMENT ON COLUMN bdr_conflict_history.local_node_sysid IS 'sysid of the local node where the apply conflict occurred'; +COMMENT ON COLUMN bdr_conflict_history.remote_node_sysid IS 'sysid of the remote node the conflicting transaction originated from'; +COMMENT ON COLUMN bdr_conflict_history.object_schema IS 'Schema of the object involved in the conflict'; +COMMENT ON COLUMN bdr_conflict_history.object_name IS 'Name of the object (table, etc) involved in the conflict'; +COMMENT ON COLUMN bdr_conflict_history.local_conflict_xid IS 'Transaction ID of the apply transaction that encountered the conflict'; +COMMENT ON COLUMN bdr_conflict_history.local_conflict_lsn IS 'xlog position at the time the conflict occured on the applying node'; +COMMENT ON COLUMN bdr_conflict_history.local_conflict_time IS 'The time the conflict was detected on the applying node'; +COMMENT ON COLUMN bdr_conflict_history.remote_txid IS 'xid of the remote transaction involved in the conflict'; +COMMENT ON COLUMN bdr_conflict_history.remote_commit_time IS 'The time the remote transaction involved in this conflict committed'; +COMMENT ON COLUMN bdr_conflict_history.remote_commit_lsn IS 'LSN on remote node at which conflicting transaction committed'; +COMMENT ON COLUMN bdr_conflict_history.conflict_type IS 'Nature of the conflict - insert/insert, update/delete, etc'; +COMMENT ON COLUMN bdr_conflict_history.local_tuple IS 'For DML conflicts, the conflicting tuple from the local DB (as json), if logged'; +COMMENT ON COLUMN bdr_conflict_history.local_tuple_xmin IS 'If local_tuple is set, the xmin of the conflicting local tuple'; +COMMENT ON COLUMN bdr_conflict_history.local_tuple_origin_sysid IS 'The node id for the true origin of the local tuple. Differs from local_node_sysid if the tuple was originally replicated from another node.'; +COMMENT ON COLUMN bdr_conflict_history.remote_tuple IS 'For DML conflicts, the conflicting tuple from the remote DB (as json), if logged'; +COMMENT ON COLUMN bdr_conflict_history.conflict_resolution IS 'How the conflict was resolved/handled; see the enum definition'; +COMMENT ON COLUMN bdr_conflict_history.error_message IS 'On apply error, the error message from ereport/elog. Other error fields match.'; + +SELECT pg_catalog.pg_extension_config_dump('bdr_conflict_history', ''); +REVOKE ALL ON TABLE bdr_conflict_history FROM PUBLIC; + + -- This type is tailored to use as input to get_object_address CREATE TYPE bdr.dropped_object AS (objtype text, objnames text[], objargs text[]); diff --git a/contrib/bdr/bdr.c b/contrib/bdr/bdr.c index c4e24f1d3a..349444d265 100644 --- a/contrib/bdr/bdr.c +++ b/contrib/bdr/bdr.c @@ -48,6 +48,7 @@ #include "storage/shmem.h" #include "utils/builtins.h" +#include "utils/elog.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -64,6 +65,7 @@ static int n_configured_bdr_nodes = 0; ResourceOwner bdr_saved_resowner; static bool bdr_is_restart = false; Oid BdrNodesRelid; +Oid BdrConflictHistoryRelId; BdrConnectionConfig **bdr_connection_configs; /* GUC storage */ @@ -584,6 +586,8 @@ bdr_apply_main(Datum main_arg) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + bdr_conflict_logging_startup(); + while (!exit_worker) { /* int ret; */ @@ -1387,6 +1391,8 @@ _PG_init(void) 0, NULL, NULL, NULL); + bdr_conflict_logging_create_gucs(); + /* if nothing is configured, we're done */ if (connections == NULL) { @@ -1630,6 +1636,8 @@ bdr_maintain_schema(void) BdrNodesRelid = bdr_lookup_relid("bdr_nodes", schema_oid); + BdrConflictHistoryRelId = bdr_lookup_relid("bdr_conflict_history", schema_oid); + QueuedDropsRelid = bdr_lookup_relid("bdr_queued_drops", schema_oid); } else diff --git a/contrib/bdr/bdr.h b/contrib/bdr/bdr.h index dd29f9201b..776e0ebae0 100644 --- a/contrib/bdr/bdr.h +++ b/contrib/bdr/bdr.h @@ -34,7 +34,6 @@ struct EState; /* from nodes/execnodes.h */ struct ScanKeyData; /* from access/skey.h for ScanKey */ enum LockTupleMode; /* from access/heapam.h */ - /* * Flags to indicate which fields are present in a commit record sent by the * output plugin. @@ -52,6 +51,57 @@ typedef enum BDRConflictHandlerType BDRInsertUpdateConflictHandler } BDRConflictHandlerType; +/* + * BDR conflict detection: type of conflict that was identified. + * + * Must correspond to bdr.bdr_conflict_type SQL enum and + * bdr_conflict_type_get_datum (...) + */ +typedef enum BdrConflictType +{ + BdrConflictType_InsertInsert, + BdrConflictType_UpdateUpdate, + BdrConflictType_UpdateDelete, + BdrConflictType_UnhandledTxAbort +} BdrConflictType; + +/* + * BDR conflict detection: how the conflict was resolved (if it was). + * + * Must correspond to bdr.bdr_conflict_resolution SQL enum and + * bdr_conflict_resolution_get_datum(...) + */ +typedef enum BdrConflictResolution +{ + BdrConflictResolution_ConflictTriggerSkipChange, + BdrConflictResolution_ConflictTriggerReturnedTuple, + BdrConflictResolution_LastUpdateWins_KeepLocal, + BdrConflictResolution_LastUpdateWins_KeepRemote, + BdrConflictResolution_UnhandledTxAbort +} BdrConflictResolution; + +typedef struct BDRConflictHandler +{ + Oid handler_oid; + BDRConflictHandlerType handler_type; + uint64 timeframe; +} BDRConflictHandler; + +/* + * This structure is for caching relation specific information, such as + * conflict handlers. + */ +typedef struct BDRRelation +{ + /* hash key */ + Oid reloid; + + Relation rel; + + BDRConflictHandler *conflict_handlers; + size_t conflict_handlers_len; +} BDRRelation; + /* * BdrApplyWorker describes a BDR worker connection. * @@ -97,7 +147,6 @@ typedef struct BdrPerdbWorker } BdrPerdbWorker; - /* * Type of BDR worker in a BdrWorker struct */ @@ -149,29 +198,6 @@ typedef struct BdrConnectionConfig bool is_valid; } BdrConnectionConfig; -typedef struct BDRConflictHandler -{ - Oid handler_oid; - BDRConflictHandlerType handler_type; - uint64 timeframe; -} BDRConflictHandler; - -/* - * This structure is for caching relation specific information, such as - * conflict handlers. - */ -typedef struct BDRRelation -{ - /* hash key */ - Oid reloid; - - Relation rel; - - BDRConflictHandler *conflict_handlers; - size_t conflict_handlers_len; -} BDRRelation; - - /* * Params for every connection in bdr.connections. * @@ -206,6 +232,7 @@ extern ResourceOwner bdr_saved_resowner; /* bdr_nodes table oid */ extern Oid BdrNodesRelid; +extern Oid BdrConflictHistoryRelId; /* DDL replication support */ extern Oid QueuedDDLCommandsRelid; @@ -237,6 +264,19 @@ extern bool find_pkey_tuple(struct ScanKeyData *skey, BDRRelation *rel, Relation idxrel, struct TupleTableSlot *slot, bool lock, enum LockTupleMode mode); +/* conflict logging (usable in apply only) */ +extern void bdr_conflict_logging_startup(void); +extern void bdr_conflict_logging_create_gucs(void); + +extern void +bdr_conflict_log(BdrConflictType conflict_type, + BdrConflictResolution resolution, TransactionId remote_txid, + BDRRelation *conflict_relation, + struct TupleTableSlot *local_tuple, + RepNodeId local_tuple_origin_id, + struct TupleTableSlot *remote_tuple, + struct ErrorData *apply_error); + /* sequence support */ extern void bdr_sequencer_shmem_init(int nnodes, int sequencers); extern void bdr_sequencer_init(int seq_slot); diff --git a/contrib/bdr/bdr_apply.c b/contrib/bdr/bdr_apply.c index 5866a58c93..01aa295a4d 100644 --- a/contrib/bdr/bdr_apply.c +++ b/contrib/bdr/bdr_apply.c @@ -77,7 +77,7 @@ Oid QueuedDropsRelid = InvalidOid; bool exit_worker = false; /* During apply, holds xid of remote transaction */ -static TransactionId replication_origin_xid = InvalidTransactionId; +TransactionId replication_origin_xid = InvalidTransactionId; /* * This code only runs within an apply bgworker, so we can stash a pointer to our @@ -103,7 +103,8 @@ static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple static void check_apply_update(RepNodeId local_node_id, TimestampTz local_ts, BDRRelation *rel, HeapTuple local_tuple, HeapTuple remote_tuple, HeapTuple *new_tuple, - bool *perform_update, bool *log_update); + bool *perform_update, bool *log_update, + BdrConflictResolution *resolution); static void do_log_update(RepNodeId local_node_id, bool apply_update, TimestampTz ts, Relation idxrel, BDRRelation *rel, HeapTuple old_key, HeapTuple user_tuple); @@ -400,25 +401,26 @@ process_remote_insert(StringInfo s) bool apply_update; bool log_update; CommitExtraData local_node_id_raw; + BdrConflictResolution resolution; /* refetch tuple, check for old commit ts & origin */ xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data); /* - * We now need to determine whether to keep the original version of - * the row, or apply the insert (as an update) we received. We use - * the last-update-wins strategy for this, except when the new update - * comes from the same node that originated the previous version of - * the tuple. + * Use conflict triggers and/or last-update-wins to decide which tuple + * to retain. */ TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw); local_node_id = local_node_id_raw; check_apply_update(local_node_id, local_ts, rel, - NULL, NULL, NULL, &apply_update, &log_update); + NULL, NULL, NULL, &apply_update, &log_update, + &resolution); - elog(LOG, "insert vs insert conflict: %s", - apply_update ? "update" : "ignore"); + if (log_update) + /* TODO: Roll into conflict logging code */ + elog(DEBUG2, "bdr: insert vs insert conflict: %s", + apply_update ? "update" : "ignore"); if (apply_update) { @@ -428,6 +430,13 @@ process_remote_insert(StringInfo s) /* races will be resolved by abort/retry */ UserTableUpdateOpenIndexes(estate, slot); } + + if (log_update) + { + bdr_conflict_log(BdrConflictType_InsertInsert, resolution, + replication_origin_xid, rel, oldslot, + local_node_id, slot, NULL /*no error*/); + } } else { @@ -590,6 +599,7 @@ process_remote_update(StringInfo s) RepNodeId local_node_id; bool apply_update; bool log_update; + BdrConflictResolution resolution; CommitExtraData local_node_id_raw; @@ -614,18 +624,15 @@ process_remote_update(StringInfo s) xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data); /* - * We now need to determine whether to keep the original version of - * the row, or apply the update we received. We use the - * last-update-wins strategy for this, except when the new update - * comes from the same node that originated the previous version of - * the tuple. + * Use conflict triggers and/or last-update-wins to decide which tuple + * to retain. */ TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw); local_node_id = local_node_id_raw; check_apply_update(local_node_id, local_ts, rel, oldslot->tts_tuple, remote_tuple, &user_tuple, &apply_update, - &log_update); + &log_update, &resolution); if (log_update) do_log_update(local_node_id, apply_update, local_ts, @@ -653,6 +660,11 @@ process_remote_update(StringInfo s) } else { + /* + * Update target is missing. We don't know if this is an update-vs-delete + * conflict or if the target tuple came from some 3rd node and hasn't yet + * been applied to the local node. + */ initStringInfo(&o); tuple_to_stringinfo(&o, RelationGetDescr(rel->rel), oldslot->tts_tuple); @@ -782,15 +794,22 @@ process_remote_delete(StringInfo s) } /* - * Check whether a remote update conflicts with the local row version. + * Check whether a remote insert or update conflicts with the local row + * version. + * + * User-defined conflict triggers get invoked here. * * perform_update, log_update is set to true if the update should be performed * and logged respectively + * + * resolution is set to indicate how the conflict was resolved if log_update + * is true. Its value is undefined if log_update is false. */ static void check_apply_update(RepNodeId local_node_id, TimestampTz local_ts, BDRRelation *rel, HeapTuple local_tuple, HeapTuple remote_tuple, - HeapTuple *new_tuple, bool *perform_update, bool *log_update) + HeapTuple *new_tuple, bool *perform_update, bool *log_update, + BdrConflictResolution *resolution) { uint64 local_sysid, remote_sysid; @@ -823,19 +842,24 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts, else { /* - * Decide what update wins based on transaction timestamp difference. - * The later transaction wins. If the timestamps compare equal, use - * sysid + TLI to discern. + * Decide whether to keep the remote or local tuple based on a conflict + * trigger (if defined) or last-update-wins. + * + * If the caller doesn't provide storage for the conflict handler to + * store a new tuple in, don't fire any conflict triggers. */ + *log_update = true; if (new_tuple) { /* - * We let users decide how the conflict should be resolved; if no - * trigger could be found or if the trigger decided not to care, - * we fall back to „last update wins“ + * -------------- + * Conflict trigger conflict handling - let the user decide whether to: + * - Ignore the remote update; + * - Supply a new tuple to replace the current tuple; or + * - Take no action and fall through to the next handling option + * -------------- */ - TimestampDifference(replication_origin_timestamp, local_ts, &secs, µsecs); @@ -848,43 +872,52 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts, if (skip) { *perform_update = false; - *log_update = false; + *resolution = BdrConflictResolution_ConflictTriggerSkipChange; return; } else if (*new_tuple) { *perform_update = true; - *log_update = true; + *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple; return; } - /* * if user decided not to skip the conflict but didn't provide a * resolving tuple we fall back to default handling */ } - + /* Last update wins conflict handling */ cmp = timestamptz_cmp_internal(replication_origin_timestamp, local_ts); - if (cmp > 0) { + /* The most recent update is the remote one; apply it */ *perform_update = true; - *log_update = false; + *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote; + return; + } + else if (cmp < 0) + { + /* The most recent update is the local one; retain it */ + *perform_update = false; + *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal; return; } else if (cmp == 0) { + /* + * Timestamps are equal. Use sysid + timeline id to decide which + * tuple to retain. + */ fetch_sysid_via_node_id(local_node_id, &local_sysid, &local_tli); fetch_sysid_via_node_id(bdr_apply_worker->origin_id, &remote_sysid, &remote_tli); /* - * Always ignore this update if the user decides to; otherwise - * apply the user tuple or, if none supplied, fall back to „last - * update wins“ + * Apply the user tuple or, if none supplied, fall back to "last + * update wins" */ if (local_sysid < remote_sysid) *perform_update = true; @@ -898,13 +931,16 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts, /* shouldn't happen */ elog(ERROR, "unsuccessful node comparison"); - *log_update = true; - return; - } - else - { - *perform_update = false; - *log_update = true; + /* + * We don't log whether we used timestamp, sysid or timeline id to + * decide which tuple to retain. That'll be in the log record + * anyway, so we can reconstruct the decision from the log record + * later. + */ + if (*perform_update) + *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote; + else + *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal; return; } diff --git a/contrib/bdr/bdr_conflict_logging.c b/contrib/bdr/bdr_conflict_logging.c new file mode 100644 index 0000000000..4bf3b59d5c --- /dev/null +++ b/contrib/bdr/bdr_conflict_logging.c @@ -0,0 +1,544 @@ +#include "postgres.h" + +#include "bdr.h" + +#include "funcapi.h" + +#include "access/xact.h" + +#include "catalog/index.h" +#include "catalog/namespace.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" + +#include "commands/sequence.h" + +#include "replication/replication_identifier.h" + +#include "tcop/tcopprot.h" + +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/json.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/syscache.h" + +static Oid BdrConflictTypeOid = InvalidOid; +static Oid BdrConflictResolutionOid = InvalidOid; +static Oid BdrConflictHistorySeqId = InvalidOid; + +static bool bdr_log_conflicts_to_table = false; +static bool bdr_conflict_logging_include_tuples = false; + +/* + * All this code runs only in the context of an apply worker, so + * we can access the apply worker state global safely + */ +extern BdrApplyWorker *bdr_apply_worker; + +#define BDR_CONFLICT_HISTORY_COLS 30 +#define SYSID_DIGITS 33 + +/* + * Details of a conflict detected by an apply process, destined for logging + * output and/or conflict triggers. + * + * Closely related to bdr.bdr_conflict_history SQL table. + */ +typedef struct BdrApplyConflict +{ + TransactionId local_conflict_txid; + XLogRecPtr local_conflict_lsn; + TimestampTz local_conflict_time; + const char *object_schema; /* unused if apply_error */ + const char *object_name; /* unused if apply_error */ + uint64 remote_sysid; + TransactionId remote_txid; + TimestampTz remote_commit_time; + XLogRecPtr remote_commit_lsn; + BdrConflictType conflict_type; + BdrConflictResolution conflict_resolution; + bool local_tuple_null; + Datum local_tuple; /* composite */ + TransactionId local_tuple_xmin; + uint64 local_tuple_origin_sysid; /* init to 0 if unknown */ + bool remote_tuple_null; + Datum remote_tuple; /* composite */ + ErrorData *apply_error; +} BdrApplyConflict; + +/* + * Perform syscache lookups etc for BDR conflict logging. + * + * Must be called during apply worker startup, after schema + * maintenance. + * + * Runs even if !bdr_log_conflicts_to_table as that can be + * toggled at runtime. + */ +void +bdr_conflict_logging_startup() +{ + Oid schema_oid; + + StartTransactionCommand(); + + schema_oid = get_namespace_oid("bdr", false); + + BdrConflictTypeOid = GetSysCacheOid2(TYPENAMENSP, + CStringGetDatum("bdr_conflict_type"), ObjectIdGetDatum(schema_oid)); + + if (BdrConflictTypeOid == InvalidOid) + elog(ERROR, "Type cache lookup failed for bdr_conflict_type in schema %d", + schema_oid); + + BdrConflictResolutionOid = GetSysCacheOid2(TYPENAMENSP, + CStringGetDatum("bdr_conflict_resolution"), + ObjectIdGetDatum(schema_oid)); + + if (BdrConflictResolutionOid == InvalidOid) + elog(ERROR, "Type cache lookup failed for bdr_conflict_resolution in schema %d", + schema_oid); + + BdrConflictHistorySeqId = get_relname_relid("bdr_conflict_history_id_seq", + ObjectIdGetDatum(schema_oid)); + + if (BdrConflictHistorySeqId == InvalidOid) + elog(ERROR, "Relcache lookup failed for bdr_conflict_history_id_seq in schema %d", + schema_oid); + + CommitTransactionCommand(); +} + +void +bdr_conflict_logging_create_gucs() +{ + DefineCustomBoolVariable("bdr.log_conflicts_to_table", + "Log BDR conflicts to bdr.conflict_history table", + NULL, + &bdr_log_conflicts_to_table, + false, + PGC_SIGHUP, + 0, + NULL, NULL, NULL); + + DefineCustomBoolVariable("bdr.conflict_logging_include_tuples", + "Log whole tuples when logging BDR conflicts", + NULL, + &bdr_conflict_logging_include_tuples, + true, + PGC_SIGHUP, + 0, + NULL, NULL, NULL); +} + +/* Get the enum oid for a given BdrConflictType */ +static Datum +bdr_conflict_type_get_datum(BdrConflictType conflict_type) +{ + Oid conflict_type_oid; + char *enumname = NULL; + + switch(conflict_type) + { + case BdrConflictType_InsertInsert: + enumname = "insert_insert"; + break; + case BdrConflictType_UpdateUpdate: + enumname = "update_update"; + break; + case BdrConflictType_UpdateDelete: + enumname = "update_delete"; + break; + case BdrConflictType_UnhandledTxAbort: + enumname = "unhandled_tx_abort"; + break; + } + Assert(enumname != NULL); + conflict_type_oid = GetSysCacheOid2(ENUMTYPOIDNAME, + BdrConflictTypeOid, CStringGetDatum(enumname)); + if (conflict_type_oid == InvalidOid) + elog(ERROR, "syscache lookup for enum %s of type " + "bdr.bdr_conflict_type failed", enumname); + return conflict_type_oid; +} + +/* Get the enum oid for a given BdrConflictResolution */ +static Datum +bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution) +{ + Oid conflict_resolution_oid; + char *enumname = NULL; + + switch (conflict_resolution) + { + case BdrConflictResolution_ConflictTriggerSkipChange: + enumname = "conflict_trigger_skip_change"; + break; + case BdrConflictResolution_ConflictTriggerReturnedTuple: + enumname = "conflict_trigger_returned_tuple"; + break; + case BdrConflictResolution_LastUpdateWins_KeepLocal: + enumname = "last_update_wins_keep_local"; + break; + case BdrConflictResolution_LastUpdateWins_KeepRemote: + enumname = "last_update_wins_keep_remote"; + break; + case BdrConflictResolution_UnhandledTxAbort: + enumname = "unhandled_tx_abort"; + break; + } + Assert(enumname != NULL); + conflict_resolution_oid = GetSysCacheOid2(ENUMTYPOIDNAME, + BdrConflictResolutionOid, CStringGetDatum(enumname)); + if (conflict_resolution_oid == InvalidOid) + elog(ERROR, "syscache lookup for enum %s of type " + "bdr.bdr_conflict_resolution failed", enumname); + return conflict_resolution_oid; +} + +/* + * Convert the target row to json form if it isn't null. + */ +static Datum +bdr_conflict_row_to_json(Datum row, bool row_isnull, bool *ret_isnull) +{ + Datum row_json; + if (row_isnull) + { + row_json = (Datum) 0; + *ret_isnull = 1; + } + else + { + /* + * We don't handle errors with a PG_TRY / PG_CATCH here, because that's + * not sufficient to make the transaction usable given that we might + * fail in user defined casts, etc. We'd need a full savepoint, which + * is too expensive. So if this fails we'll just propagate the exception + * and abort the apply transaction. + * + * It shouldn't fail unless something's pretty broken anyway. + */ + row_json = DirectFunctionCall1(row_to_json, row); + *ret_isnull = 0; + } + return row_json; +} + +static void +bdr_conflict_strtodatum(bool *nulls, Datum *values, int idx, + const char *in_str) +{ + if (in_str == NULL) + { + nulls[idx] = true; + values[idx] = (Datum) 0; + } + else + { + nulls[idx] = false; + values[idx] = CStringGetTextDatum(in_str); + } +} + +/* + * Log a BDR apply conflict to the bdr.bdr_conflict_history table. + * + * The change will then be replicated to other nodes. + */ +static void +bdr_conflict_log_table(BdrApplyConflict *conflict) +{ + Datum values[BDR_CONFLICT_HISTORY_COLS]; + bool nulls[BDR_CONFLICT_HISTORY_COLS]; + int attno; + int object_schema_attno, object_name_attno; + char sqlstate[12]; + Relation log_rel; + HeapTuple log_tup; + TupleTableSlot *log_slot; + EState *log_estate; + char local_sysid[SYSID_DIGITS]; + char remote_sysid[SYSID_DIGITS]; + char origin_sysid[SYSID_DIGITS]; + + if (!bdr_log_conflicts_to_table) + /* No logging enabled and we don't own any memory, just bail */ + return; + + /* Pg has no uint64 SQL type so we have to store all them as text */ + snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + + snprintf(remote_sysid, sizeof(remote_sysid), UINT64_FORMAT, + conflict->remote_sysid); + + if (conflict->local_tuple_origin_sysid != 0) + snprintf(origin_sysid, sizeof(origin_sysid), UINT64_FORMAT, + conflict->local_tuple_origin_sysid); + else + origin_sysid[0] = '\0'; + + memset(nulls, 0, sizeof(bool) * BDR_CONFLICT_HISTORY_COLS); + memset(values, 0, sizeof(Datum) * BDR_CONFLICT_HISTORY_COLS); + + /* Begin forming the tuple. See the extension SQL file for field info. */ + attno = 0; + values[attno++] = DirectFunctionCall1(nextval_oid, + BdrConflictHistorySeqId); + values[attno++] = CStringGetTextDatum(local_sysid); + values[attno++] = TransactionIdGetDatum(conflict->local_conflict_txid); + values[attno++] = LSNGetDatum(conflict->local_conflict_lsn); + values[attno++] = TimestampTzGetDatum(conflict->local_conflict_time); + + object_schema_attno = attno; + bdr_conflict_strtodatum(nulls, values, attno++, conflict->object_schema); + + object_name_attno = attno; + bdr_conflict_strtodatum(nulls, values, attno++, conflict->object_name); + + values[attno++] = CStringGetTextDatum(remote_sysid); + if (conflict->remote_txid != InvalidTransactionId) + values[attno] = TransactionIdGetDatum(conflict->remote_txid); + else + nulls[attno] = 1; + attno++; + + values[attno++] = TimestampTzGetDatum(conflict->remote_commit_time); + values[attno++] = LSNGetDatum(conflict->remote_commit_lsn); + values[attno++] = bdr_conflict_type_get_datum(conflict->conflict_type); + + values[attno++] = + bdr_conflict_resolution_get_datum(conflict->conflict_resolution); + + values[attno] = bdr_conflict_row_to_json(conflict->local_tuple, + conflict->local_tuple_null, &nulls[attno]); + attno++; + + values[attno] = bdr_conflict_row_to_json(conflict->remote_tuple, + conflict->remote_tuple_null, &nulls[attno]); + attno++; + + if (conflict->local_tuple_xmin != InvalidTransactionId) + values[attno] = TransactionIdGetDatum(conflict->local_tuple_xmin); + else + nulls[attno] = 1; + attno++; + + if (conflict->local_tuple_origin_sysid != 0) + values[attno] = CStringGetTextDatum(origin_sysid); + else + nulls[attno] = 1; + attno++; + + if (conflict->apply_error == NULL) + { + /* all the 13 remaining cols are error_ cols and are all null */ + memset(&nulls[attno], 1, sizeof(bool) * 13); + attno += 13; + } + else + { + /* + * There's error data to log. We don't attempt to log it selectively, + * as bdr apply errors are not supposed to be routine anyway. + */ + ErrorData *edata = conflict->apply_error; + + bdr_conflict_strtodatum(nulls, values, attno++, edata->message); + + /* + * Always log the SQLSTATE. If it's ERRCODE_INTERNAL_ERROR - like after + * an elog(...) - we'll just be writing XX0000, but that's still better + * than nothing. + */ + strncpy(sqlstate, unpack_sql_state(edata->sqlerrcode), 12); + sqlstate[sizeof(sqlstate)-1] = '\0'; + values[attno] = CStringGetTextDatum(sqlstate); + + /* + * We'd like to log the statement running at the time of the ERROR (for + * DDL apply errors) but have no reliable way to acquire it yet. So for + * now... + */ + nulls[attno] = 1; + attno++; + + if (edata->cursorpos != 0) + values[attno] = Int32GetDatum(edata->cursorpos); + else + nulls[attno] = 1; + attno++; + + bdr_conflict_strtodatum(nulls, values, attno++, edata->detail); + bdr_conflict_strtodatum(nulls, values, attno++, edata->hint); + bdr_conflict_strtodatum(nulls, values, attno++, edata->context); + bdr_conflict_strtodatum(nulls, values, attno++, edata->column_name); + bdr_conflict_strtodatum(nulls, values, attno++, edata->datatype_name); + bdr_conflict_strtodatum(nulls, values, attno++, edata->constraint_name); + bdr_conflict_strtodatum(nulls, values, attno++, edata->filename); + values[attno++] = Int32GetDatum(edata->lineno); + bdr_conflict_strtodatum(nulls, values, attno++, edata->funcname); + + /* Set schema and table name based on the error, not arg values */ + bdr_conflict_strtodatum(nulls, values, object_schema_attno, + edata->schema_name); + bdr_conflict_strtodatum(nulls, values, object_name_attno, + edata->table_name); + + /* note: do NOT free the errordata, it's the caller's responsibility */ + } + + /* Make sure assignments match allocated tuple size */ + Assert(attno == BDR_CONFLICT_HISTORY_COLS); + + /* + * Construct a bdr.bdr_conflict_history tuple from the conflict info we've + * been passed and insert it into bdr.bdr_conflict_history. + */ + log_rel = heap_open(BdrConflictHistoryRelId, RowExclusiveLock); + + /* Prepare executor state for index updates */ + log_estate = bdr_create_rel_estate(log_rel); + log_slot = ExecInitExtraTupleSlot(log_estate); + ExecSetSlotDescriptor(log_slot, RelationGetDescr(log_rel)); + /* Construct the tuple and insert it */ + log_tup = heap_form_tuple(RelationGetDescr(log_rel), values, nulls); + ExecStoreTuple(log_tup, log_slot, InvalidBuffer, true); + simple_heap_insert(log_rel, log_slot->tts_tuple); + /* Then do any index maintanence required */ + UserTableUpdateIndexes(log_estate, log_slot); + /* and finish up */ + heap_close(log_rel, RowExclusiveLock); + FreeExecutorState(log_estate); +} + +/* + * Log a BDR apply conflict to the bdr.bdr_conflict_history table and/or + * system log. + * + * If a transaction is in progress, the current transaction is used, in which + * case the write is lost if that transaction subsequently aborts. If no + * transaction is already open the caller must open one and commit it after + * calling. + * + * Any open aborted transaction must be rolled back before calling. + * + * If apply_error is passed then this call is because of an unhandled error. In + * this case the passed object_schema and object_name are ignored in favour of + * those in the ErrorData struct and all the error_ fields are populated from + * the ErrorData struct. + * + * Any palloc'd or copied values passed must be freed by the caller after + * bdr_conflict_log returns. It won't retain references to any values and won't + * free any of the values its self. + * + * The origin id - i.e. the id of the node the local tuple was first created + * on, in case the local tuple was originally replicated from another node - + * may be obtained with TransactionIdGetCommitTsData on the xmin of the + * TupleTableSlot for the tuple. It isn't done here because the caller + * frequently already has the node id to hand. + */ +void +bdr_conflict_log(BdrConflictType conflict_type, + BdrConflictResolution resolution, TransactionId remote_txid, + BDRRelation *conflict_relation, TupleTableSlot *local_tuple, + RepNodeId local_tuple_origin_id, TupleTableSlot *remote_tuple, + ErrorData *apply_error) +{ + MemoryContext log_context, old_context; + BdrApplyConflict conflict; + TimeLineID tli; + + if (IsAbortedTransactionBlockState()) + elog(ERROR, "bdr: attempt to log conflict in aborted transaction"); + + if (!IsTransactionState()) + elog(ERROR, "bdr: attempt to log conflict without surrounding transaction"); + + /* We want our own memory ctx to clean up easily & reliably */ + log_context = AllocSetContextCreate(CurrentMemoryContext, + "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + old_context = MemoryContextSwitchTo(log_context); + + /* Populate the conflict record we're going to log */ + conflict.conflict_type = conflict_type; + conflict.conflict_resolution = resolution; + + conflict.local_conflict_txid = GetTopTransactionIdIfAny(); + conflict.local_conflict_lsn = GetXLogInsertRecPtr(); + conflict.local_conflict_time = GetCurrentTimestamp(); + conflict.remote_txid = remote_txid; + + /* set using bdr_conflict_setrel */ + if (conflict_relation == NULL) + { + conflict.object_schema = NULL; + conflict.object_name = NULL; + } + else + { + conflict.object_name = RelationGetRelationName(conflict_relation->rel); + conflict.object_schema = + get_namespace_name(RelationGetNamespace(conflict_relation->rel)); + } + + /* TODO: May make sense to cache the remote sysid in a global too... */ + fetch_sysid_via_node_id(bdr_apply_worker->origin_id, + &conflict.remote_sysid, &tli); + conflict.remote_commit_time = replication_origin_timestamp; + conflict.remote_txid = remote_txid; + conflict.remote_commit_lsn = replication_origin_lsn; + + if (local_tuple != NULL) + { + /* Log local tuple xmin even if actual tuple value logging is off */ + conflict.local_tuple_xmin = + HeapTupleHeaderGetXmin(local_tuple->tts_tuple->t_data); + Assert(conflict.local_tuple_xmin >= FirstNormalTransactionId || + conflict.local_tuple_xmin == FrozenTransactionId); + if (bdr_conflict_logging_include_tuples) + { + conflict.local_tuple = ExecFetchSlotTupleDatum(local_tuple); + conflict.local_tuple_null = false; + } + } + else + { + conflict.local_tuple_null = true; + conflict.local_tuple = (Datum) 0; + conflict.local_tuple_xmin = InvalidTransactionId; + } + + if (local_tuple_origin_id != InvalidRepNodeId) + { + fetch_sysid_via_node_id(local_tuple_origin_id, + &conflict.local_tuple_origin_sysid, &tli); + } + else + { + conflict.local_tuple_origin_sysid = 0; + } + + if (remote_tuple != NULL && bdr_conflict_logging_include_tuples) + { + conflict.remote_tuple = ExecFetchSlotTupleDatum(remote_tuple); + conflict.remote_tuple_null = false; + } + else + { + conflict.remote_tuple_null = true; + conflict.remote_tuple = (Datum) 0; + } + + conflict.apply_error = apply_error; + + bdr_conflict_log_table(&conflict); + + MemoryContextSwitchTo(old_context); + MemoryContextDelete(log_context); +} diff --git a/contrib/bdr/bdr_executor.c b/contrib/bdr/bdr_executor.c index 70b1098f1a..cce81bac04 100644 --- a/contrib/bdr/bdr_executor.c +++ b/contrib/bdr/bdr_executor.c @@ -262,4 +262,3 @@ retry: return found; } - diff --git a/contrib/bdr/worker.mk b/contrib/bdr/worker.mk index 042863f5f2..f70f0454c5 100644 --- a/contrib/bdr/worker.mk +++ b/contrib/bdr/worker.mk @@ -3,7 +3,7 @@ MODULE_big = bdr OBJS = bdr.o bdr_apply.o bdr_compat.o bdr_commandfilter.o bdr_count.o \ bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o \ - bdr_executor.o + bdr_conflict_logging.o bdr_executor.o EXTENSION = bdr DATA = bdr--0.5.sql diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 3148d9d560..6543d41cd6 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -521,7 +521,7 @@ sub mkvcbuild 'bdr_commandfilter.c', 'bdr_compat.c', 'bdr_count.c', 'bdr_seq.c', 'bdr_init_replica.c', 'bdr_relcache.c', 'bdr_conflict_handlers.c', - 'bdr_executor.c'); + 'bdr_conflict_logging.c', 'bdr_executor.c'); $bdr_apply->AddReference($postgres); $bdr_apply->AddLibrary('wsock32.lib'); $bdr_apply->AddIncludeDir('src\interfaces\libpq');