slot: Separate data stored on-disk from purely in-memory state
authorAndres Freund <andres@anarazel.de>
Tue, 28 Jan 2014 19:16:27 +0000 (20:16 +0100)
committerAndres Freund <andres@anarazel.de>
Tue, 28 Jan 2014 19:17:28 +0000 (20:17 +0100)
src/backend/catalog/system_views.sql
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/catalog/pg_proc.h
src/include/replication/slot.h

index e17eeeb9aef0734d1716368b2f210896ff25877d..78d2bf15e68935b209a7b12972863c1e5586b8e0 100644 (file)
@@ -621,7 +621,7 @@ CREATE VIEW pg_replication_slots AS
             D.datname AS database,
             L.active,
             L.data_xmin,
-            L.restart_decoding_lsn
+            L.restart_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index cc376b8b0a53aad54d3b21370b0d9e6944b4b955..322b79a9c988f80489b141dce24adc86ceb06eab 100644 (file)
@@ -60,13 +60,12 @@ typedef struct ReplicationSlotOnDisk
        uint32          version;
        uint32          length;
 
-       /* data with potentially evolving format */
-       ReplicationSlot slot;
+       ReplicationSlotPersistentData slotdata;
 } ReplicationSlotOnDisk;
 
 /* size of the part of the slot that is version independent */
 #define ReplicationSlotOnDiskConstantSize \
-       offsetof(ReplicationSlotOnDisk, slot)
+       offsetof(ReplicationSlotOnDisk, slotdata)
 /* size of the slots that is not version indepenent */
 #define ReplicationSlotOnDiskDynamicSize \
        sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
@@ -222,7 +221,7 @@ ReplicationSlotCreate(const char *name, bool db_specific)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
-               if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
                        ereport(ERROR,
                                        (errcode(ERRCODE_DUPLICATE_OBJECT),
                                         errmsg("replication slot \"%s\" already exists", name)));
@@ -246,12 +245,12 @@ ReplicationSlotCreate(const char *name, bool db_specific)
         */
        Assert(!slot->in_use);
        Assert(!slot->active);
-       slot->data_xmin = InvalidTransactionId;
+       slot->data.data_xmin = InvalidTransactionId;
        slot->effective_data_xmin = InvalidTransactionId;
-       strncpy(NameStr(slot->name), name, NAMEDATALEN);
-       NameStr(slot->name)[NAMEDATALEN - 1] = '\0';
-       slot->database = db_specific ? MyDatabaseId : InvalidOid;
-       slot->restart_decoding = InvalidXLogRecPtr;
+       strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
+       NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
+       slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
+       slot->data.restart_lsn = InvalidXLogRecPtr;
 
        /*
         * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -309,7 +308,7 @@ ReplicationSlotAcquire(const char *name)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
-               if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
                {
                        volatile ReplicationSlot *vslot = s;
 
@@ -396,7 +395,7 @@ ReplicationSlotDrop(const char *name)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
-               if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
                {
                        volatile ReplicationSlot *vslot = s;
 
@@ -421,8 +420,8 @@ ReplicationSlotDrop(const char *name)
                                 errmsg("replication slot \"%s\" is already active", name)));
 
        /* Generate pathnames. */
-       sprintf(path, "pg_replslot/%s", NameStr(slot->name));
-       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+       sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
 
        /*
         * Rename the slot directory on disk, so that we'll no longer recognize
@@ -498,7 +497,7 @@ ReplicationSlotSave(void)
 
        Assert(MyReplicationSlot != NULL);
 
-       sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->name));
+       sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
        SaveSlotToPath(MyReplicationSlot, path);
 }
 
@@ -560,7 +559,7 @@ ReplicationSlotsComputeRequiredLSN(void)
        for (i = 0; i < max_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-               XLogRecPtr restart_decoding;
+               XLogRecPtr restart_lsn;
 
                if (!s->in_use)
                        continue;
@@ -569,14 +568,14 @@ ReplicationSlotsComputeRequiredLSN(void)
                        volatile ReplicationSlot *vslot = s;
 
                        SpinLockAcquire(&s->mutex);
-                       restart_decoding = vslot->restart_decoding;
+                       restart_lsn = vslot->data.restart_lsn;
                        SpinLockRelease(&s->mutex);
                }
 
-               if (restart_decoding != InvalidXLogRecPtr &&
+               if (restart_lsn != InvalidXLogRecPtr &&
                        (min_required == InvalidXLogRecPtr ||
-                        restart_decoding < min_required))
-                       min_required = restart_decoding;
+                        restart_lsn < min_required))
+                       min_required = restart_lsn;
        }
        LWLockRelease(ReplicationSlotControlLock);
 
@@ -692,8 +691,8 @@ CreateSlotOnDisk(ReplicationSlot *slot)
        char            tmppath[MAXPGPATH];
        char            path[MAXPGPATH];
 
-       sprintf(path, "pg_replslot/%s", NameStr(slot->name));
-       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+       sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
 
        if (mkdir(tmppath, S_IRWXU) < 0)
                ereport(ERROR,
@@ -758,15 +757,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir)
 
        SpinLockAcquire(&slot->mutex);
 
-       cp.slot.data_xmin = slot->data_xmin;
-       cp.slot.effective_data_xmin = slot->effective_data_xmin;
+       cp.slotdata.data_xmin = slot->data.data_xmin;
 
-       strcpy(NameStr(cp.slot.name), NameStr(slot->name));
+       strcpy(NameStr(cp.slotdata.name), NameStr(slot->data.name));
 
-       cp.slot.database = slot->database;
-       cp.slot.restart_decoding = slot->restart_decoding;
-       cp.slot.in_use = slot->in_use;
-       cp.slot.active = false;
+       cp.slotdata.database = slot->data.database;
+       cp.slotdata.restart_lsn = slot->data.restart_lsn;
 
        SpinLockRelease(&slot->mutex);
 
@@ -943,18 +939,15 @@ RestoreSlotFromDisk(const char *name)
                if (slot->in_use)
                        continue;
 
-               slot->data_xmin = cp.slot.data_xmin;
-               /*
-                * after a crash, always use xmin, not effective_xmin, the
-                * slot obviously survived
-                */
-               slot->effective_data_xmin = cp.slot.data_xmin;
-               strcpy(NameStr(slot->name), NameStr(cp.slot.name));
-               slot->database = cp.slot.database;
-               slot->restart_decoding = cp.slot.restart_decoding;
-               /* ignore previous values, they are transient */
+               /* restore the entire set of persistent data */
+               memcpy(&slot->data, &cp.slotdata,
+                          sizeof(ReplicationSlotPersistentData));
+
+               /* initialize in memory state */
+               slot->effective_data_xmin = cp.slotdata.data_xmin;
                slot->in_use = true;
                slot->active = false;
+
                restored = true;
                break;
        }
index 5abac8debf56b8da0fa642e30a3d3adb9fbd1dbe..e89903ec62cf7664e3d94266223e03dccc332b49 100644 (file)
@@ -60,13 +60,10 @@ create_physical_replication_slot(PG_FUNCTION_ARGS)
        if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
                elog(ERROR, "return type must be a row type");
 
-       /*
-        * Acquire a logical decoding slot, this will check for conflicting
-        * names.
-        */
+       /* acquire replication slot, this will check for conflicting names*/
        ReplicationSlotCreate(NameStr(*name), false);
 
-       values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->name));
+       values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
 
        nulls[0] = false;
        nulls[1] = true;
@@ -142,12 +139,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
                Datum           values[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS];
                bool            nulls[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS];
-               char            location[MAXFNAMELEN];
-               const char *slot_name;
+
                TransactionId data_xmin;
-               XLogRecPtr      last_lsn;
+               XLogRecPtr      restart_lsn;
                bool            active;
                Oid                     database;
+               const char *slot_name;
+
+               char            restart_lsn_s[MAXFNAMELEN];
                int                     i;
 
                SpinLockAcquire(&slot->mutex);
@@ -158,18 +157,19 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                }
                else
                {
-                       data_xmin = slot->data_xmin;
+                       data_xmin = slot->data.data_xmin;
+                       database = slot->data.database;
+                       restart_lsn = slot->data.restart_lsn;
+                       slot_name = pstrdup(NameStr(slot->data.name));
+
                        active = slot->active;
-                       database = slot->database;
-                       last_lsn = slot->restart_decoding;
-                       slot_name = pstrdup(NameStr(slot->name));
                }
                SpinLockRelease(&slot->mutex);
 
                memset(nulls, 0, sizeof(nulls));
 
-               snprintf(location, sizeof(location), "%X/%X",
-                                (uint32) (last_lsn >> 32), (uint32) last_lsn);
+               snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
+                                (uint32) (restart_lsn >> 32), (uint32) restart_lsn);
 
                i = 0;
                values[i++] = CStringGetTextDatum(slot_name);
@@ -180,7 +180,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                values[i++] = database;
                values[i++] = BoolGetDatum(active);
                values[i++] = TransactionIdGetDatum(data_xmin);
-               values[i++] = CStringGetTextDatum(location);
+               values[i++] = CStringGetTextDatum(restart_lsn_s);
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
index 1c593cefe3577449c63692d636123cff8390f7cb..bb51e4ee91ec8eecf60cd62ab797c079b93ebb43 100644 (file)
@@ -430,7 +430,7 @@ StartReplication(StartReplicationCmd *cmd)
        if (cmd->slotname)
        {
                ReplicationSlotAcquire(cmd->slotname);
-               if (MyReplicationSlot->database != InvalidOid)
+               if (MyReplicationSlot->data.database != InvalidOid)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         (errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
@@ -660,7 +660,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
        initStringInfo(&output_message);
 
-       slot_name = NameStr(MyReplicationSlot->name);
+       slot_name = NameStr(MyReplicationSlot->data.name);
 
        pq_beginmessage(&buf, 'T');
        pq_sendint(&buf, 1, 2);         /* 1 field */
@@ -936,10 +936,10 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 
        Assert(lsn != InvalidXLogRecPtr);
        SpinLockAcquire(&slot->mutex);
-       if (slot->restart_decoding != lsn)
+       if (slot->data.restart_lsn != lsn)
        {
                changed = true;
-               slot->restart_decoding = lsn;
+               slot->data.restart_lsn = lsn;
        }
        SpinLockRelease(&slot->mutex);
 
@@ -1005,7 +1005,7 @@ ProcessStandbyReplyMessage(void)
         */
        if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
        {
-               if (MyReplicationSlot->database != InvalidOid)
+               if (MyReplicationSlot->data.database != InvalidOid)
                        elog(ERROR, "cannot handle changeset extraction yet");
                else
                        PhysicalConfirmReceivedLocation(flushPtr);
@@ -1027,12 +1027,12 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
         * by data_xmin and effective_data_xmin since the consequences of a
         * missed increase aren't bad, so set both at once.
         */
-       if (!TransactionIdIsNormal(slot->data_xmin) ||
+       if (!TransactionIdIsNormal(slot->data.data_xmin) ||
                !TransactionIdIsNormal(feedbackXmin) ||
-               TransactionIdPrecedes(slot->data_xmin, feedbackXmin))
+               TransactionIdPrecedes(slot->data.data_xmin, feedbackXmin))
        {
                changed = true;
-               slot->data_xmin = feedbackXmin;
+               slot->data.data_xmin = feedbackXmin;
                slot->effective_data_xmin = feedbackXmin;
        }
        SpinLockRelease(&slot->mutex);
index 48d4123c04b125b81a81d30a7beeaa4901d00947..139128b627fee6c98e2299d8bb5321036ff820b4 100644 (file)
@@ -4757,7 +4757,7 @@ DATA(insert OID = 3780 (  create_physical_replication_slot PGNSP PGUID 12 1 0 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3781 (  drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 23 "19" _null_ _null_ _null_ _null_ drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3475 (  pg_get_replication_slots     PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_decoding_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3475 (  pg_get_replication_slots     PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 
 /* event triggers */
index 47132d8f52bd43b215ea53f69521575420c97485..dffbc5462fa8e383112262b0bfc164f826f08f0a 100644 (file)
 #include "storage/shmem.h"
 #include "storage/spin.h"
 
-
-/*
- * Shared memory state of a single replication slot.
- */
-typedef struct ReplicationSlot
+typedef struct ReplicationSlotPersistentData
 {
-       /* lock, on same cacheline as effective_xmin */
-       slock_t         mutex;
-
-       /* on-disk xmin horizon, updated first */
-       TransactionId data_xmin;
-
-       /* in-memory xmin horizon, updated after syncing to disk, used for computations */
-       TransactionId effective_data_xmin;
-
-       /* is this slot defined */
-       bool            in_use;
-
-       /* is somebody streaming out changes for this slot */
-       bool            active;
-
        /* The slot's identifier */
        NameData        name;
 
        /* database the slot is active on */
        Oid                     database;
 
+       /* xmin horizon for data */
+       TransactionId data_xmin;
+
        /* ----
-        * For logical decoding, this contains the point where, after a shutdown,
-        * crash, whatever where do we have to restart decoding from to
+        * For logical decoding, this contains the point where, after a shutdown
+        * or crash, the to have to restart decoding from to
         * a) find a valid & ready snapshot
         * b) the complete content for all in-progress xacts
         *
@@ -55,7 +39,30 @@ typedef struct ReplicationSlot
         * be removed.
         * ----
         */
-       XLogRecPtr      restart_decoding;
+       XLogRecPtr      restart_lsn;
+
+} ReplicationSlotPersistentData;
+
+/*
+ * Shared memory state of a single replication slot.
+ */
+typedef struct ReplicationSlot
+{
+       /* lock, on same cacheline as effective_xmin */
+       slock_t         mutex;
+
+       /* is this slot defined */
+       bool            in_use;
+
+       /* is somebody streaming out changes for this slot */
+       bool            active;
+
+       /* data surviving shutdowns and crashes */
+       ReplicationSlotPersistentData data;
+
+       /* in-memory xmin horizon, updated after syncing to disk, used for computations */
+       TransactionId effective_data_xmin;
+
 } ReplicationSlot;
 
 /*