bdr: Log conflicts to a bdr.bdr_conflict_history table
authorCraig Ringer <craig@2ndquadrant.com>
Mon, 5 May 2014 06:54:27 +0000 (14:54 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:33 +0000 (17:55 +0200)
Creates a bdr.bdr_conflict_history table and logs conflicts to it when they're
detected, with details on both sides of the error.

Currently only logs insert/insert conflicts. The infrastructure for reporting
ERROR conflicts is present but not yet used.

contrib/bdr/bdr--0.5.sql
contrib/bdr/bdr.c
contrib/bdr/bdr.h
contrib/bdr/bdr_apply.c
contrib/bdr/bdr_conflict_logging.c [new file with mode: 0644]
contrib/bdr/bdr_executor.c
contrib/bdr/worker.mk
src/tools/msvc/Mkvcbuild.pm

index 45e7adfc137c91153b8a8552118fa8f22d66dd44..5b43052e04f52823e1497d45f2e97aac6b71356e 100644 (file)
@@ -321,6 +321,99 @@ COMMENT ON COLUMN bdr_nodes.node_status IS 'Readiness of the node: [i]nitializin
 
 SELECT pg_catalog.pg_extension_config_dump('bdr_nodes', '');
 
+CREATE TYPE bdr_conflict_type AS ENUM
+(
+    'insert_insert',
+    'update_update',
+    'update_delete',
+    'unhandled_tx_abort'
+);
+
+COMMENT ON TYPE bdr_conflict_type IS 'The nature of a BDR apply conflict - concurrent updates (update_update), conflicting inserts, etc.';
+
+CREATE TYPE bdr_conflict_resolution AS ENUM
+(
+    'conflict_trigger_skip_change',
+    'conflict_trigger_returned_tuple',
+    'last_update_wins_keep_local',
+    'last_update_wins_keep_remote',
+    'unhandled_tx_abort'
+);
+
+COMMENT ON TYPE bdr_conflict_resolution IS 'Resolution of a bdr conflict - if a conflict was resolved by a conflict trigger, by last-update-wins tests on commit timestamps, etc.';
+
+--
+-- bdr_conflict_history records apply conflicts so they can be queried and
+-- analysed by administrators.
+--
+-- This must remain in sync with bdr_log_handled_conflict(...) and
+-- struct BdrApplyConflict
+--
+CREATE SEQUENCE bdr_conflict_history_id_seq;
+
+CREATE TABLE bdr_conflict_history (
+    conflict_id         bigint not null default nextval('bdr_conflict_history_id_seq'),
+    local_node_sysid    text not null, -- really uint64 but we don't have the type for it
+    PRIMARY KEY (local_node_sysid, conflict_id),
+
+    local_conflict_xid  xid not null,     -- xid of conflicting apply tx
+    local_conflict_lsn  pg_lsn not null,  -- lsn of local node at the time the conflict was detected
+    local_conflict_time timestamptz not null,
+    object_schema       text,
+    object_name         text,
+    remote_node_sysid   text not null, -- again, really uint64
+    remote_txid         xid not null,
+    remote_commit_time  timestamptz not null,
+    remote_commit_lsn   pg_lsn not null,
+    conflict_type       bdr_conflict_type not null,
+    conflict_resolution bdr_conflict_resolution not null,
+    local_tuple         json,
+    remote_tuple        json,
+    local_tuple_xmin    xid,
+    local_tuple_origin_sysid text,        -- also really uint64
+
+    -- The following apply only for unhandled apply errors and
+    -- correspond to fields in ErrorData in elog.h .
+    error_message       text,
+    error_sqlstate      text CHECK (length(error_sqlstate) = 5),
+    error_querystring   text,
+    error_cursorpos     integer,
+    error_detail        text,
+    error_hint          text,
+    error_context       text,
+    error_columnname    text, -- schema and table in object_schema, object_name above
+    error_typename      text,
+    error_constraintname text,
+    error_filename      text,
+    error_lineno        integer,
+    error_funcname      text
+);
+
+ALTER SEQUENCE bdr_conflict_history_id_seq OWNED BY bdr_conflict_history.conflict_id;
+
+COMMENT ON TABLE bdr_conflict_history IS 'Log of all conflicts in this BDR group';
+COMMENT ON COLUMN bdr_conflict_history.local_node_sysid IS 'sysid of the local node where the apply conflict occurred';
+COMMENT ON COLUMN bdr_conflict_history.remote_node_sysid IS 'sysid of the remote node the conflicting transaction originated from';
+COMMENT ON COLUMN bdr_conflict_history.object_schema IS 'Schema of the object involved in the conflict';
+COMMENT ON COLUMN bdr_conflict_history.object_name IS 'Name of the object (table, etc) involved in the conflict';
+COMMENT ON COLUMN bdr_conflict_history.local_conflict_xid IS 'Transaction ID of the apply transaction that encountered the conflict';
+COMMENT ON COLUMN bdr_conflict_history.local_conflict_lsn IS 'xlog position at the time the conflict occured on the applying node';
+COMMENT ON COLUMN bdr_conflict_history.local_conflict_time IS 'The time the conflict was detected on the applying node';
+COMMENT ON COLUMN bdr_conflict_history.remote_txid IS 'xid of the remote transaction involved in the conflict';
+COMMENT ON COLUMN bdr_conflict_history.remote_commit_time IS 'The time the remote transaction involved in this conflict committed';
+COMMENT ON COLUMN bdr_conflict_history.remote_commit_lsn IS 'LSN on remote node at which conflicting transaction committed';
+COMMENT ON COLUMN bdr_conflict_history.conflict_type IS 'Nature of the conflict - insert/insert, update/delete, etc';
+COMMENT ON COLUMN bdr_conflict_history.local_tuple IS 'For DML conflicts, the conflicting tuple from the local DB (as json), if logged';
+COMMENT ON COLUMN bdr_conflict_history.local_tuple_xmin IS 'If local_tuple is set, the xmin of the conflicting local tuple';
+COMMENT ON COLUMN bdr_conflict_history.local_tuple_origin_sysid IS 'The node id for the true origin of the local tuple. Differs from local_node_sysid if the tuple was originally replicated from another node.';
+COMMENT ON COLUMN bdr_conflict_history.remote_tuple IS 'For DML conflicts, the conflicting tuple from the remote DB (as json), if logged';
+COMMENT ON COLUMN bdr_conflict_history.conflict_resolution IS 'How the conflict was resolved/handled; see the enum definition';
+COMMENT ON COLUMN bdr_conflict_history.error_message IS 'On apply error, the error message from ereport/elog. Other error fields match.';
+
+SELECT pg_catalog.pg_extension_config_dump('bdr_conflict_history', '');
+REVOKE ALL ON TABLE bdr_conflict_history FROM PUBLIC;
+
+
 -- This type is tailored to use as input to get_object_address
 CREATE TYPE bdr.dropped_object AS
   (objtype text, objnames text[], objargs text[]);
index c4e24f1d3a2f79f6c82629ca453caf3513277fbe..349444d2651af13b1b69a94aeff892010ea08770 100644 (file)
@@ -48,6 +48,7 @@
 #include "storage/shmem.h"
 
 #include "utils/builtins.h"
+#include "utils/elog.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -64,6 +65,7 @@ static int   n_configured_bdr_nodes = 0;
 ResourceOwner bdr_saved_resowner;
 static bool bdr_is_restart = false;
 Oid   BdrNodesRelid;
+Oid   BdrConflictHistoryRelId;
 BdrConnectionConfig  **bdr_connection_configs;
 
 /* GUC storage */
@@ -584,6 +586,8 @@ bdr_apply_main(Datum main_arg)
                                           ALLOCSET_DEFAULT_INITSIZE,
                                           ALLOCSET_DEFAULT_MAXSIZE);
 
+   bdr_conflict_logging_startup();
+
    while (!exit_worker)
    {
        /* int       ret; */
@@ -1387,6 +1391,8 @@ _PG_init(void)
                             0,
                             NULL, NULL, NULL);
 
+   bdr_conflict_logging_create_gucs();
+
    /* if nothing is configured, we're done */
    if (connections == NULL)
    {
@@ -1630,6 +1636,8 @@ bdr_maintain_schema(void)
 
        BdrNodesRelid = bdr_lookup_relid("bdr_nodes", schema_oid);
 
+       BdrConflictHistoryRelId = bdr_lookup_relid("bdr_conflict_history", schema_oid);
+
        QueuedDropsRelid = bdr_lookup_relid("bdr_queued_drops", schema_oid);
    }
    else
index dd29f9201b82638040e5458eeace0e1e3e62b558..776e0ebae0e319f05d1d9f289665f98204647e41 100644 (file)
@@ -34,7 +34,6 @@ struct EState; /* from nodes/execnodes.h */
 struct ScanKeyData; /* from access/skey.h for ScanKey */
 enum LockTupleMode; /* from access/heapam.h */
 
-
 /*
  * Flags to indicate which fields are present in a commit record sent by the
  * output plugin.
@@ -52,6 +51,57 @@ typedef enum BDRConflictHandlerType
    BDRInsertUpdateConflictHandler
 }  BDRConflictHandlerType;
 
+/*
+ * BDR conflict detection: type of conflict that was identified.
+ *
+ * Must correspond to bdr.bdr_conflict_type SQL enum and
+ * bdr_conflict_type_get_datum (...)
+ */
+typedef enum BdrConflictType
+{
+   BdrConflictType_InsertInsert,
+   BdrConflictType_UpdateUpdate,
+   BdrConflictType_UpdateDelete,
+   BdrConflictType_UnhandledTxAbort
+} BdrConflictType;
+
+/*
+ * BDR conflict detection: how the conflict was resolved (if it was).
+ *
+ * Must correspond to bdr.bdr_conflict_resolution SQL enum and
+ * bdr_conflict_resolution_get_datum(...)
+ */
+typedef enum BdrConflictResolution
+{
+   BdrConflictResolution_ConflictTriggerSkipChange,
+   BdrConflictResolution_ConflictTriggerReturnedTuple,
+   BdrConflictResolution_LastUpdateWins_KeepLocal,
+   BdrConflictResolution_LastUpdateWins_KeepRemote,
+   BdrConflictResolution_UnhandledTxAbort
+} BdrConflictResolution;
+
+typedef struct BDRConflictHandler
+{
+   Oid         handler_oid;
+   BDRConflictHandlerType handler_type;
+   uint64      timeframe;
+}  BDRConflictHandler;
+
+/*
+ * This structure is for caching relation specific information, such as
+ * conflict handlers.
+ */
+typedef struct BDRRelation
+{
+   /* hash key */
+   Oid         reloid;
+
+   Relation    rel;
+
+   BDRConflictHandler *conflict_handlers;
+   size_t      conflict_handlers_len;
+} BDRRelation;
+
 /*
  * BdrApplyWorker describes a BDR worker connection.
  *
@@ -97,7 +147,6 @@ typedef struct BdrPerdbWorker
 
 } BdrPerdbWorker;
 
-
 /*
  * Type of BDR worker in a BdrWorker struct
  */
@@ -149,29 +198,6 @@ typedef struct BdrConnectionConfig
    bool is_valid;
 } BdrConnectionConfig;
 
-typedef struct BDRConflictHandler
-{
-   Oid         handler_oid;
-   BDRConflictHandlerType handler_type;
-   uint64      timeframe;
-}  BDRConflictHandler;
-
-/*
- * This structure is for caching relation specific information, such as
- * conflict handlers.
- */
-typedef struct BDRRelation
-{
-   /* hash key */
-   Oid         reloid;
-
-   Relation    rel;
-
-   BDRConflictHandler *conflict_handlers;
-   size_t      conflict_handlers_len;
-}  BDRRelation;
-
-
 /*
  * Params for every connection in bdr.connections.
  *
@@ -206,6 +232,7 @@ extern ResourceOwner bdr_saved_resowner;
 
 /* bdr_nodes table oid */
 extern Oid BdrNodesRelid;
+extern Oid  BdrConflictHistoryRelId;
 
 /* DDL replication support */
 extern Oid QueuedDDLCommandsRelid;
@@ -237,6 +264,19 @@ extern bool find_pkey_tuple(struct ScanKeyData *skey, BDRRelation *rel,
                            Relation idxrel, struct TupleTableSlot *slot, bool
                            lock, enum LockTupleMode mode);
 
+/* conflict logging (usable in apply only) */
+extern void bdr_conflict_logging_startup(void);
+extern void bdr_conflict_logging_create_gucs(void);
+
+extern void
+bdr_conflict_log(BdrConflictType conflict_type,
+                BdrConflictResolution resolution, TransactionId remote_txid,
+                BDRRelation *conflict_relation,
+                struct TupleTableSlot *local_tuple,
+                RepNodeId local_tuple_origin_id,
+                struct TupleTableSlot *remote_tuple,
+                struct ErrorData *apply_error);
+
 /* sequence support */
 extern void bdr_sequencer_shmem_init(int nnodes, int sequencers);
 extern void bdr_sequencer_init(int seq_slot);
index 5866a58c931099b0232f340a7df36e11ec2e4acd..01aa295a4db175560a20aad04cf34d74bae2debc 100644 (file)
@@ -77,7 +77,7 @@ Oid           QueuedDropsRelid = InvalidOid;
 bool       exit_worker = false;
 
 /* During apply, holds xid of remote transaction */
-static TransactionId replication_origin_xid = InvalidTransactionId;
+TransactionId replication_origin_xid = InvalidTransactionId;
 
 /*
  * This code only runs within an apply bgworker, so we can stash a pointer to our
@@ -103,7 +103,8 @@ static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple
 static void check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
                   BDRRelation *rel, HeapTuple local_tuple,
                   HeapTuple remote_tuple, HeapTuple *new_tuple,
-                  bool *perform_update, bool *log_update);
+                  bool *perform_update, bool *log_update,
+                  BdrConflictResolution *resolution);
 static void do_log_update(RepNodeId local_node_id, bool apply_update,
              TimestampTz ts, Relation idxrel, BDRRelation *rel,
              HeapTuple old_key, HeapTuple user_tuple);
@@ -400,25 +401,26 @@ process_remote_insert(StringInfo s)
        bool        apply_update;
        bool        log_update;
        CommitExtraData local_node_id_raw;
+       BdrConflictResolution resolution;
 
        /* refetch tuple, check for old commit ts & origin */
        xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
 
        /*
-        * We now need to determine whether to keep the original version of
-        * the row, or apply the insert (as an update) we received.  We use
-        * the last-update-wins strategy for this, except when the new update
-        * comes from the same node that originated the previous version of
-        * the tuple.
+        * Use conflict triggers and/or last-update-wins to decide which tuple
+        * to retain.
         */
        TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
        local_node_id = local_node_id_raw;
 
        check_apply_update(local_node_id, local_ts, rel,
-                          NULL, NULL, NULL, &apply_update, &log_update);
+                          NULL, NULL, NULL, &apply_update, &log_update,
+                          &resolution);
 
-       elog(LOG, "insert vs insert conflict: %s",
-            apply_update ? "update" : "ignore");
+       if (log_update)
+           /* TODO: Roll into conflict logging code */
+           elog(DEBUG2, "bdr: insert vs insert conflict: %s",
+                apply_update ? "update" : "ignore");
 
        if (apply_update)
        {
@@ -428,6 +430,13 @@ process_remote_insert(StringInfo s)
            /* races will be resolved by abort/retry */
            UserTableUpdateOpenIndexes(estate, slot);
        }
+
+       if (log_update)
+       {
+           bdr_conflict_log(BdrConflictType_InsertInsert, resolution,
+                            replication_origin_xid, rel, oldslot,
+                            local_node_id, slot, NULL /*no error*/);
+       }
    }
    else
    {
@@ -590,6 +599,7 @@ process_remote_update(StringInfo s)
        RepNodeId   local_node_id;
        bool        apply_update;
        bool        log_update;
+       BdrConflictResolution resolution;
 
        CommitExtraData local_node_id_raw;
 
@@ -614,18 +624,15 @@ process_remote_update(StringInfo s)
        xmin = HeapTupleHeaderGetXmin(oldslot->tts_tuple->t_data);
 
        /*
-        * We now need to determine whether to keep the original version of
-        * the row, or apply the update we received.  We use the
-        * last-update-wins strategy for this, except when the new update
-        * comes from the same node that originated the previous version of
-        * the tuple.
+        * Use conflict triggers and/or last-update-wins to decide which tuple
+        * to retain.
         */
        TransactionIdGetCommitTsData(xmin, &local_ts, &local_node_id_raw);
        local_node_id = local_node_id_raw;
 
        check_apply_update(local_node_id, local_ts, rel, oldslot->tts_tuple,
                           remote_tuple, &user_tuple, &apply_update,
-                          &log_update);
+                          &log_update, &resolution);
 
        if (log_update)
            do_log_update(local_node_id, apply_update, local_ts,
@@ -653,6 +660,11 @@ process_remote_update(StringInfo s)
    }
    else
    {
+       /*
+        * Update target is missing. We don't know if this is an update-vs-delete
+        * conflict or if the target tuple came from some 3rd node and hasn't yet
+        * been applied to the local node.
+        */
        initStringInfo(&o);
        tuple_to_stringinfo(&o, RelationGetDescr(rel->rel),
                            oldslot->tts_tuple);
@@ -782,15 +794,22 @@ process_remote_delete(StringInfo s)
 }
 
 /*
- * Check whether a remote update conflicts with the local row version.
+ * Check whether a remote insert or update conflicts with the local row
+ * version.
+ *
+ * User-defined conflict triggers get invoked here.
  *
  * perform_update, log_update is set to true if the update should be performed
  * and logged respectively
+ *
+ * resolution is set to indicate how the conflict was resolved if log_update
+ * is true. Its value is undefined if log_update is false.
  */
 static void
 check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
                 BDRRelation *rel, HeapTuple local_tuple, HeapTuple remote_tuple,
-               HeapTuple *new_tuple, bool *perform_update, bool *log_update)
+               HeapTuple *new_tuple, bool *perform_update, bool *log_update,
+               BdrConflictResolution *resolution)
 {
    uint64      local_sysid,
                remote_sysid;
@@ -823,19 +842,24 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
    else
    {
        /*
-        * Decide what update wins based on transaction timestamp difference.
-        * The later transaction wins.  If the timestamps compare equal, use
-        * sysid + TLI to discern.
+        * Decide whether to keep the remote or local tuple based on a conflict
+        * trigger (if defined) or last-update-wins.
+        *
+        * If the caller doesn't provide storage for the conflict handler to
+        * store a new tuple in, don't fire any conflict triggers.
         */
+       *log_update = true;
 
        if (new_tuple)
        {
            /*
-            * We let users decide how the conflict should be resolved; if no
-            * trigger could be found or if the trigger decided not to care,
-            * we fall back to â€žlast update wins“
+            * --------------
+            * Conflict trigger conflict handling - let the user decide whether to:
+            * - Ignore the remote update;
+            * - Supply a new tuple to replace the current tuple; or
+            * - Take no action and fall through to the next handling option
+            * --------------
             */
-
            TimestampDifference(replication_origin_timestamp, local_ts,
                                &secs, &microsecs);
 
@@ -848,43 +872,52 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
            if (skip)
            {
                *perform_update = false;
-               *log_update = false;
+               *resolution = BdrConflictResolution_ConflictTriggerSkipChange;
                return;
            }
            else if (*new_tuple)
            {
                *perform_update = true;
-               *log_update = true;
+               *resolution = BdrConflictResolution_ConflictTriggerReturnedTuple;
                return;
            }
 
-
            /*
             * if user decided not to skip the conflict but didn't provide a
             * resolving tuple we fall back to default handling
             */
        }
 
-
+       /* Last update wins conflict handling */
        cmp = timestamptz_cmp_internal(replication_origin_timestamp, local_ts);
-
        if (cmp > 0)
        {
+           /* The most recent update is the remote one; apply it */
            *perform_update = true;
-           *log_update = false;
+           *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+           return;
+       }
+       else if (cmp < 0)
+       {
+           /* The most recent update is the local one; retain it */
+           *perform_update = false;
+           *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
            return;
        }
        else if (cmp == 0)
        {
+           /*
+            * Timestamps are equal. Use sysid + timeline id to decide which
+            * tuple to retain.
+            */
            fetch_sysid_via_node_id(local_node_id,
                                    &local_sysid, &local_tli);
            fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
                                    &remote_sysid, &remote_tli);
 
            /*
-            * Always ignore this update if the user decides to; otherwise
-            * apply the user tuple or, if none supplied, fall back to â€žlast
-            * update wins“
+            * Apply the user tuple or, if none supplied, fall back to "last
+            * update wins"
             */
            if (local_sysid < remote_sysid)
                *perform_update = true;
@@ -898,13 +931,16 @@ check_apply_update(RepNodeId local_node_id, TimestampTz local_ts,
                /* shouldn't happen */
                elog(ERROR, "unsuccessful node comparison");
 
-           *log_update = true;
-           return;
-       }
-       else
-       {
-           *perform_update = false;
-           *log_update = true;
+           /*
+            * We don't log whether we used timestamp, sysid or timeline id to
+            * decide which tuple to retain. That'll be in the log record
+            * anyway, so we can reconstruct the decision from the log record
+            * later.
+            */
+           if (*perform_update)
+               *resolution = BdrConflictResolution_LastUpdateWins_KeepRemote;
+           else
+               *resolution = BdrConflictResolution_LastUpdateWins_KeepLocal;
 
            return;
        }
diff --git a/contrib/bdr/bdr_conflict_logging.c b/contrib/bdr/bdr_conflict_logging.c
new file mode 100644 (file)
index 0000000..4bf3b59
--- /dev/null
@@ -0,0 +1,544 @@
+#include "postgres.h"
+
+#include "bdr.h"
+
+#include "funcapi.h"
+
+#include "access/xact.h"
+
+#include "catalog/index.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/sequence.h"
+
+#include "replication/replication_identifier.h"
+
+#include "tcop/tcopprot.h"
+
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/json.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
+
+static Oid BdrConflictTypeOid = InvalidOid;
+static Oid BdrConflictResolutionOid = InvalidOid;
+static Oid BdrConflictHistorySeqId = InvalidOid;
+
+static bool bdr_log_conflicts_to_table = false;
+static bool bdr_conflict_logging_include_tuples = false;
+
+/*
+ * All this code runs only in the context of an apply worker, so
+ * we can access the apply worker state global safely
+ */
+extern BdrApplyWorker *bdr_apply_worker;
+
+#define BDR_CONFLICT_HISTORY_COLS 30
+#define SYSID_DIGITS 33
+
+/*
+ * Details of a conflict detected by an apply process, destined for logging
+ * output and/or conflict triggers.
+ *
+ * Closely related to bdr.bdr_conflict_history SQL table.
+ */
+typedef struct BdrApplyConflict
+{
+   TransactionId           local_conflict_txid;
+   XLogRecPtr              local_conflict_lsn;
+   TimestampTz             local_conflict_time;
+   const char             *object_schema; /* unused if apply_error */
+   const char             *object_name;   /* unused if apply_error */
+   uint64                  remote_sysid;
+   TransactionId           remote_txid;
+   TimestampTz             remote_commit_time;
+   XLogRecPtr              remote_commit_lsn;
+   BdrConflictType         conflict_type;
+   BdrConflictResolution   conflict_resolution;
+   bool                    local_tuple_null;
+   Datum                   local_tuple;    /* composite */
+   TransactionId           local_tuple_xmin;
+   uint64                  local_tuple_origin_sysid; /* init to 0 if unknown */
+   bool                    remote_tuple_null;
+   Datum                   remote_tuple;   /* composite */
+   ErrorData              *apply_error;
+} BdrApplyConflict;
+
+/*
+ * Perform syscache lookups etc for BDR conflict logging.
+ *
+ * Must be called during apply worker startup, after schema
+ * maintenance.
+ *
+ * Runs even if !bdr_log_conflicts_to_table as that can be
+ * toggled at runtime.
+ */
+void
+bdr_conflict_logging_startup()
+{
+   Oid schema_oid;
+
+   StartTransactionCommand();
+
+   schema_oid = get_namespace_oid("bdr", false);
+
+   BdrConflictTypeOid = GetSysCacheOid2(TYPENAMENSP,
+       CStringGetDatum("bdr_conflict_type"), ObjectIdGetDatum(schema_oid));
+
+   if (BdrConflictTypeOid == InvalidOid)
+       elog(ERROR, "Type cache lookup failed for bdr_conflict_type in schema %d",
+            schema_oid);
+
+   BdrConflictResolutionOid = GetSysCacheOid2(TYPENAMENSP,
+       CStringGetDatum("bdr_conflict_resolution"),
+       ObjectIdGetDatum(schema_oid));
+
+   if (BdrConflictResolutionOid == InvalidOid)
+       elog(ERROR, "Type cache lookup failed for bdr_conflict_resolution in schema %d",
+            schema_oid);
+
+   BdrConflictHistorySeqId = get_relname_relid("bdr_conflict_history_id_seq",
+       ObjectIdGetDatum(schema_oid));
+
+   if (BdrConflictHistorySeqId == InvalidOid)
+       elog(ERROR, "Relcache lookup failed for bdr_conflict_history_id_seq in schema %d",
+            schema_oid);
+
+   CommitTransactionCommand();
+}
+
+void
+bdr_conflict_logging_create_gucs()
+{
+   DefineCustomBoolVariable("bdr.log_conflicts_to_table",
+                            "Log BDR conflicts to bdr.conflict_history table",
+                            NULL,
+                            &bdr_log_conflicts_to_table,
+                            false,
+                            PGC_SIGHUP,
+                            0,
+                            NULL, NULL, NULL);
+
+   DefineCustomBoolVariable("bdr.conflict_logging_include_tuples",
+                            "Log whole tuples when logging BDR conflicts",
+                            NULL,
+                            &bdr_conflict_logging_include_tuples,
+                            true,
+                            PGC_SIGHUP,
+                            0,
+                            NULL, NULL, NULL);
+}
+
+/* Get the enum oid for a given BdrConflictType */
+static Datum
+bdr_conflict_type_get_datum(BdrConflictType conflict_type)
+{
+   Oid conflict_type_oid;
+   char *enumname = NULL;
+
+   switch(conflict_type)
+   {
+       case BdrConflictType_InsertInsert:
+           enumname = "insert_insert";
+           break;
+       case BdrConflictType_UpdateUpdate:
+           enumname = "update_update";
+           break;
+       case BdrConflictType_UpdateDelete:
+           enumname = "update_delete";
+           break;
+       case BdrConflictType_UnhandledTxAbort:
+           enumname = "unhandled_tx_abort";
+           break;
+   }
+   Assert(enumname != NULL);
+   conflict_type_oid = GetSysCacheOid2(ENUMTYPOIDNAME,
+       BdrConflictTypeOid, CStringGetDatum(enumname));
+   if (conflict_type_oid == InvalidOid)
+       elog(ERROR, "syscache lookup for enum %s of type "
+            "bdr.bdr_conflict_type failed", enumname);
+   return conflict_type_oid;
+}
+
+/* Get the enum oid for a given BdrConflictResolution */
+static Datum
+bdr_conflict_resolution_get_datum(BdrConflictResolution conflict_resolution)
+{
+   Oid conflict_resolution_oid;
+   char *enumname = NULL;
+
+   switch (conflict_resolution)
+   {
+       case BdrConflictResolution_ConflictTriggerSkipChange:
+           enumname = "conflict_trigger_skip_change";
+           break;
+       case BdrConflictResolution_ConflictTriggerReturnedTuple:
+           enumname = "conflict_trigger_returned_tuple";
+           break;
+       case BdrConflictResolution_LastUpdateWins_KeepLocal:
+           enumname = "last_update_wins_keep_local";
+           break;
+       case BdrConflictResolution_LastUpdateWins_KeepRemote:
+           enumname = "last_update_wins_keep_remote";
+           break;
+       case BdrConflictResolution_UnhandledTxAbort:
+           enumname = "unhandled_tx_abort";
+           break;
+   }
+   Assert(enumname != NULL);
+   conflict_resolution_oid = GetSysCacheOid2(ENUMTYPOIDNAME,
+       BdrConflictResolutionOid, CStringGetDatum(enumname));
+   if (conflict_resolution_oid == InvalidOid)
+       elog(ERROR, "syscache lookup for enum %s of type "
+            "bdr.bdr_conflict_resolution failed", enumname);
+   return conflict_resolution_oid;
+}
+
+/*
+ * Convert the target row to json form if it isn't null.
+ */
+static Datum
+bdr_conflict_row_to_json(Datum row, bool row_isnull, bool *ret_isnull)
+{
+   Datum row_json;
+   if (row_isnull)
+   {
+       row_json = (Datum) 0;
+       *ret_isnull = 1;
+   }
+   else
+   {
+       /*
+        * We don't handle errors with a PG_TRY / PG_CATCH here, because that's
+        * not sufficient to make the transaction usable given that we might
+        * fail in user defined casts, etc. We'd need a full savepoint, which
+        * is too expensive. So if this fails we'll just propagate the exception
+        * and abort the apply transaction.
+        *
+        * It shouldn't fail unless something's pretty broken anyway.
+        */
+       row_json = DirectFunctionCall1(row_to_json, row);
+       *ret_isnull = 0;
+   }
+   return row_json;
+}
+
+static void
+bdr_conflict_strtodatum(bool *nulls, Datum *values, int idx,
+                       const char *in_str)
+{
+   if (in_str == NULL)
+   {
+       nulls[idx] = true;
+       values[idx] = (Datum) 0;
+   }
+   else
+   {
+       nulls[idx] = false;
+       values[idx] = CStringGetTextDatum(in_str);
+   }
+}
+
+/*
+ * Log a BDR apply conflict to the bdr.bdr_conflict_history table.
+ *
+ * The change will then be replicated to other nodes.
+ */
+static void
+bdr_conflict_log_table(BdrApplyConflict *conflict)
+{
+   Datum           values[BDR_CONFLICT_HISTORY_COLS];
+   bool            nulls[BDR_CONFLICT_HISTORY_COLS];
+   int             attno;
+   int             object_schema_attno, object_name_attno;
+   char            sqlstate[12];
+   Relation        log_rel;
+   HeapTuple       log_tup;
+   TupleTableSlot *log_slot;
+   EState         *log_estate;
+   char            local_sysid[SYSID_DIGITS];
+   char            remote_sysid[SYSID_DIGITS];
+   char            origin_sysid[SYSID_DIGITS];
+
+   if (!bdr_log_conflicts_to_table)
+       /* No logging enabled and we don't own any memory, just bail */
+       return;
+
+   /* Pg has no uint64 SQL type so we have to store all them as text */
+   snprintf(local_sysid, sizeof(local_sysid), UINT64_FORMAT,
+            GetSystemIdentifier());
+
+   snprintf(remote_sysid, sizeof(remote_sysid), UINT64_FORMAT,
+            conflict->remote_sysid);
+
+   if (conflict->local_tuple_origin_sysid != 0)
+       snprintf(origin_sysid, sizeof(origin_sysid), UINT64_FORMAT,
+                conflict->local_tuple_origin_sysid);
+   else
+       origin_sysid[0] = '\0';
+
+   memset(nulls, 0, sizeof(bool) * BDR_CONFLICT_HISTORY_COLS);
+   memset(values, 0, sizeof(Datum) * BDR_CONFLICT_HISTORY_COLS);
+
+   /* Begin forming the tuple. See the extension SQL file for field info. */
+   attno = 0;
+   values[attno++] = DirectFunctionCall1(nextval_oid,
+       BdrConflictHistorySeqId);
+   values[attno++] = CStringGetTextDatum(local_sysid);
+   values[attno++] = TransactionIdGetDatum(conflict->local_conflict_txid);
+   values[attno++] = LSNGetDatum(conflict->local_conflict_lsn);
+   values[attno++] = TimestampTzGetDatum(conflict->local_conflict_time);
+
+   object_schema_attno = attno;
+   bdr_conflict_strtodatum(nulls, values, attno++, conflict->object_schema);
+
+   object_name_attno = attno;
+   bdr_conflict_strtodatum(nulls, values, attno++, conflict->object_name);
+
+   values[attno++] = CStringGetTextDatum(remote_sysid);
+   if (conflict->remote_txid != InvalidTransactionId)
+       values[attno] = TransactionIdGetDatum(conflict->remote_txid);
+   else
+       nulls[attno] = 1;
+   attno++;
+
+   values[attno++] = TimestampTzGetDatum(conflict->remote_commit_time);
+   values[attno++] = LSNGetDatum(conflict->remote_commit_lsn);
+   values[attno++] = bdr_conflict_type_get_datum(conflict->conflict_type);
+
+   values[attno++] =
+       bdr_conflict_resolution_get_datum(conflict->conflict_resolution);
+
+   values[attno] = bdr_conflict_row_to_json(conflict->local_tuple,
+       conflict->local_tuple_null, &nulls[attno]);
+   attno++;
+
+   values[attno] = bdr_conflict_row_to_json(conflict->remote_tuple,
+       conflict->remote_tuple_null, &nulls[attno]);
+   attno++;
+
+   if (conflict->local_tuple_xmin != InvalidTransactionId)
+       values[attno] = TransactionIdGetDatum(conflict->local_tuple_xmin);
+   else
+       nulls[attno] = 1;
+   attno++;
+
+   if (conflict->local_tuple_origin_sysid != 0)
+       values[attno] = CStringGetTextDatum(origin_sysid);
+   else
+       nulls[attno] = 1;
+   attno++;
+
+   if (conflict->apply_error == NULL)
+   {
+       /* all the 13 remaining cols are error_ cols and are all null */
+       memset(&nulls[attno], 1, sizeof(bool) * 13);
+       attno += 13;
+   }
+   else
+   {
+       /*
+        * There's error data to log. We don't attempt to log it selectively,
+        * as bdr apply errors are not supposed to be routine anyway.
+        */
+       ErrorData *edata = conflict->apply_error;
+
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->message);
+
+       /*
+        * Always log the SQLSTATE. If it's ERRCODE_INTERNAL_ERROR - like after
+        * an elog(...) - we'll just be writing XX0000, but that's still better
+        * than nothing.
+        */
+       strncpy(sqlstate, unpack_sql_state(edata->sqlerrcode), 12);
+       sqlstate[sizeof(sqlstate)-1] = '\0';
+       values[attno] = CStringGetTextDatum(sqlstate);
+
+       /*
+        * We'd like to log the statement running at the time of the ERROR (for
+        * DDL apply errors) but have no reliable way to acquire it yet. So for
+        * now...
+        */
+       nulls[attno] = 1;
+       attno++;
+
+       if (edata->cursorpos != 0)
+           values[attno] = Int32GetDatum(edata->cursorpos);
+       else
+           nulls[attno] = 1;
+       attno++;
+
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->detail);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->hint);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->context);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->column_name);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->datatype_name);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->constraint_name);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->filename);
+       values[attno++] = Int32GetDatum(edata->lineno);
+       bdr_conflict_strtodatum(nulls, values, attno++, edata->funcname);
+
+       /* Set schema and table name based on the error, not arg values */
+       bdr_conflict_strtodatum(nulls, values, object_schema_attno,
+                               edata->schema_name);
+       bdr_conflict_strtodatum(nulls, values, object_name_attno,
+                               edata->table_name);
+
+       /* note: do NOT free the errordata, it's the caller's responsibility */
+   }
+
+   /* Make sure assignments match allocated tuple size */
+   Assert(attno == BDR_CONFLICT_HISTORY_COLS);
+
+   /*
+    * Construct a bdr.bdr_conflict_history tuple from the conflict info we've
+    * been passed and insert it into bdr.bdr_conflict_history.
+    */
+   log_rel = heap_open(BdrConflictHistoryRelId, RowExclusiveLock);
+
+   /* Prepare executor state for index updates */
+   log_estate = bdr_create_rel_estate(log_rel);
+   log_slot = ExecInitExtraTupleSlot(log_estate);
+   ExecSetSlotDescriptor(log_slot, RelationGetDescr(log_rel));
+   /* Construct the tuple and insert it */
+   log_tup = heap_form_tuple(RelationGetDescr(log_rel), values, nulls);
+   ExecStoreTuple(log_tup, log_slot, InvalidBuffer, true);
+   simple_heap_insert(log_rel, log_slot->tts_tuple);
+   /* Then do any index maintanence required */
+   UserTableUpdateIndexes(log_estate, log_slot);
+   /* and finish up */
+   heap_close(log_rel, RowExclusiveLock);
+   FreeExecutorState(log_estate);
+}
+
+/*
+ * Log a BDR apply conflict to the bdr.bdr_conflict_history table and/or
+ * system log.
+ *
+ * If a transaction is in progress, the current transaction is used, in which
+ * case the write is lost if that transaction subsequently aborts. If no
+ * transaction is already open the caller must open one and commit it after
+ * calling.
+ *
+ * Any open aborted transaction must be rolled back before calling.
+ *
+ * If apply_error is passed then this call is because of an unhandled error. In
+ * this case the passed object_schema and object_name are ignored in favour of
+ * those in the ErrorData struct and all the error_ fields are populated from
+ * the ErrorData struct.
+ *
+ * Any palloc'd or copied values passed must be freed by the caller after
+ * bdr_conflict_log returns. It won't retain references to any values and won't
+ * free any of the values its self.
+ *
+ * The origin id - i.e. the id of the node the local tuple was first created
+ * on, in case the local tuple was originally replicated from another node -
+ * may be obtained with TransactionIdGetCommitTsData on the xmin of the
+ * TupleTableSlot for the tuple. It isn't done here because the caller
+ * frequently already has the node id to hand.
+ */
+void
+bdr_conflict_log(BdrConflictType conflict_type,
+                BdrConflictResolution resolution, TransactionId remote_txid,
+                BDRRelation *conflict_relation, TupleTableSlot *local_tuple,
+                RepNodeId local_tuple_origin_id, TupleTableSlot *remote_tuple,
+                ErrorData *apply_error)
+{
+   MemoryContext   log_context, old_context;
+   BdrApplyConflict conflict;
+   TimeLineID tli;
+
+   if (IsAbortedTransactionBlockState())
+       elog(ERROR, "bdr: attempt to log conflict in aborted transaction");
+
+   if (!IsTransactionState())
+       elog(ERROR, "bdr: attempt to log conflict without surrounding transaction");
+
+   /* We want our own memory ctx to clean up easily & reliably */
+   log_context = AllocSetContextCreate(CurrentMemoryContext,
+       "bdr_log_conflict_ctx", ALLOCSET_DEFAULT_MINSIZE,
+       ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
+   old_context = MemoryContextSwitchTo(log_context);
+
+   /* Populate the conflict record we're going to log */
+   conflict.conflict_type = conflict_type;
+   conflict.conflict_resolution = resolution;
+
+   conflict.local_conflict_txid = GetTopTransactionIdIfAny();
+   conflict.local_conflict_lsn = GetXLogInsertRecPtr();
+   conflict.local_conflict_time = GetCurrentTimestamp();
+   conflict.remote_txid = remote_txid;
+
+   /* set using bdr_conflict_setrel */
+   if (conflict_relation == NULL)
+   {
+       conflict.object_schema = NULL;
+       conflict.object_name = NULL;
+   }
+   else
+   {
+       conflict.object_name = RelationGetRelationName(conflict_relation->rel);
+       conflict.object_schema =
+           get_namespace_name(RelationGetNamespace(conflict_relation->rel));
+   }
+
+   /* TODO: May make sense to cache the remote sysid in a global too... */
+   fetch_sysid_via_node_id(bdr_apply_worker->origin_id,
+           &conflict.remote_sysid, &tli);
+   conflict.remote_commit_time = replication_origin_timestamp;
+   conflict.remote_txid = remote_txid;
+   conflict.remote_commit_lsn = replication_origin_lsn;
+
+   if (local_tuple != NULL)
+   {
+       /* Log local tuple xmin even if actual tuple value logging is off */
+       conflict.local_tuple_xmin =
+           HeapTupleHeaderGetXmin(local_tuple->tts_tuple->t_data);
+       Assert(conflict.local_tuple_xmin >= FirstNormalTransactionId ||
+              conflict.local_tuple_xmin == FrozenTransactionId);
+       if (bdr_conflict_logging_include_tuples)
+       {
+           conflict.local_tuple = ExecFetchSlotTupleDatum(local_tuple);
+           conflict.local_tuple_null = false;
+       }
+   }
+   else
+   {
+       conflict.local_tuple_null = true;
+       conflict.local_tuple = (Datum) 0;
+       conflict.local_tuple_xmin = InvalidTransactionId;
+   }
+
+   if (local_tuple_origin_id != InvalidRepNodeId)
+   {
+       fetch_sysid_via_node_id(local_tuple_origin_id,
+                               &conflict.local_tuple_origin_sysid, &tli);
+   }
+   else
+   {
+       conflict.local_tuple_origin_sysid = 0;
+   }
+
+   if (remote_tuple != NULL && bdr_conflict_logging_include_tuples)
+   {
+       conflict.remote_tuple = ExecFetchSlotTupleDatum(remote_tuple);
+       conflict.remote_tuple_null = false;
+   }
+   else
+   {
+       conflict.remote_tuple_null = true;
+       conflict.remote_tuple = (Datum) 0;
+   }
+
+   conflict.apply_error = apply_error;
+
+   bdr_conflict_log_table(&conflict);
+
+   MemoryContextSwitchTo(old_context);
+   MemoryContextDelete(log_context);
+}
index 70b1098f1ae846ade6fe1579d6b54b2d5bd5162a..cce81bac04e2c405968d89c8481e4352d85779df 100644 (file)
@@ -262,4 +262,3 @@ retry:
 
    return found;
 }
-
index 042863f5f29ff9f906bf192745ffe84321375530..f70f0454c5639ac523e8a86587f146abdfc0a712 100644 (file)
@@ -3,7 +3,7 @@
 MODULE_big = bdr
 OBJS = bdr.o bdr_apply.o bdr_compat.o bdr_commandfilter.o bdr_count.o \
    bdr_seq.o bdr_init_replica.o bdr_relcache.o bdr_conflict_handlers.o \
-   bdr_executor.o
+   bdr_conflict_logging.o bdr_executor.o
 
 EXTENSION = bdr
 DATA = bdr--0.5.sql
index 3148d9d560f59bc66d3a5f2dd6f0856ee892bacc..6543d41cd6cbfc65bfc3816862c55f9ff86a273f 100644 (file)
@@ -521,7 +521,7 @@ sub mkvcbuild
                 'bdr_commandfilter.c', 'bdr_compat.c',
                 'bdr_count.c', 'bdr_seq.c', 'bdr_init_replica.c',
                 'bdr_relcache.c', 'bdr_conflict_handlers.c',
-                 'bdr_executor.c');
+                 'bdr_conflict_logging.c', 'bdr_executor.c');
    $bdr_apply->AddReference($postgres);
    $bdr_apply->AddLibrary('wsock32.lib');
    $bdr_apply->AddIncludeDir('src\interfaces\libpq');