Support tracking the dirty state of replication slots.
authorAndres Freund <andres@anarazel.de>
Tue, 28 Jan 2014 20:51:38 +0000 (21:51 +0100)
committerAndres Freund <andres@anarazel.de>
Tue, 28 Jan 2014 21:59:51 +0000 (22:59 +0100)
src/backend/replication/slot.c
src/backend/replication/walsender.c
src/include/replication/slot.h

index e4c12b9b1bae93c68cbdd7ccbc542fe0ba7b8b77..91ebc231ce37c9a80a62c0c0a2925c0e80bf2fc2 100644 (file)
@@ -492,6 +492,28 @@ ReplicationSlotSave(void)
        SaveSlotToPath(MyReplicationSlot, path);
 }
 
+/*
+ * Signal that it would be useful if the currently acquired slot would be
+ * flushed out to disk.
+ *
+ * Note that the actual flush to disk can be delayed for a long time, if
+ * required for correctness explicitly do a ReplicationSlotSave().
+ */
+void
+ReplicationSlotMarkDirty(void)
+{
+       Assert(MyReplicationSlot != NULL);
+
+       {
+               volatile ReplicationSlot *vslot = MyReplicationSlot;
+
+               SpinLockAcquire(&vslot->mutex);
+               MyReplicationSlot->just_dirtied = true;
+               MyReplicationSlot->dirty = true;
+               SpinLockRelease(&vslot->mutex);
+       }
+}
+
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  */
@@ -731,6 +753,7 @@ CreateSlotOnDisk(ReplicationSlot *slot)
 
        fsync_fname(tmppath, true);
 
+       slot->dirty = true; /* signal that we really need to write */
        SaveSlotToPath(slot, tmppath);
 
        if (rename(tmppath, path) != 0)
@@ -762,6 +785,20 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir)
        char            path[MAXPGPATH];
        int                     fd;
        ReplicationSlotOnDisk cp;
+       bool            was_dirty;
+
+       /* first check whether there's something to write out */
+       {
+               volatile ReplicationSlot *vslot = slot;
+
+               SpinLockAcquire(&vslot->mutex);
+               was_dirty = vslot->dirty;
+               vslot->just_dirtied = false;
+               SpinLockRelease(&vslot->mutex);
+       }
+
+       if (!was_dirty)
+               return;
 
        LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
 
@@ -834,6 +871,19 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir)
 
        END_CRIT_SECTION();
 
+       /*
+        * Successfully wrote, unset dirty bit, unless somebody dirtied again
+        * already.
+        */
+       {
+               volatile ReplicationSlot *vslot = slot;
+
+               SpinLockAcquire(&vslot->mutex);
+               if (!vslot->just_dirtied)
+                       vslot->dirty = false;
+               SpinLockRelease(&vslot->mutex);
+       }
+
        LWLockRelease(slot->io_in_progress_lock);
 }
 
index 87e4abf1f27500465af122226c34ea9e226ea4f5..d1edf7ed30674f941e45c39a131c2ae5e8eccc01 100644 (file)
@@ -951,7 +951,10 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
        SpinLockRelease(&slot->mutex);
 
        if (changed)
+       {
+               ReplicationSlotMarkDirty();
                ReplicationSlotsComputeRequiredLSN();
+       }
 
        /*
         * One could argue that the slot should saved to disk now, but that'd be
@@ -1046,7 +1049,7 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 
        if (changed)
        {
-               ReplicationSlotSave();
+               ReplicationSlotMarkDirty();
                ReplicationSlotsComputeRequiredXmin();
        }
 }
index 642a4d085bfe8a8ecabcef65f847e3d9aef20ea6..68c3a8019e85bb8764dfe266523851dccbebc234 100644 (file)
@@ -52,6 +52,7 @@ typedef struct ReplicationSlot
        bool            active;
 
        /* any outstanding modifications? */
+       bool            just_dirtied;
        bool            dirty;
 
        /*
@@ -101,6 +102,7 @@ extern void ReplicationSlotDrop(const char *name);
 extern void ReplicationSlotAcquire(const char *name);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotSave(void);
+extern void ReplicationSlotMarkDirty(void);
 
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);