ReplicationSlotReserveWal(void)
{
ReplicationSlot *slot = MyReplicationSlot;
+ XLogSegNo segno;
+ XLogRecPtr restart_lsn;
Assert(slot != NULL);
Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
/*
- * The replication slot mechanism is used to prevent removal of required
- * WAL. As there is no interlock between this routine and checkpoints, WAL
- * segments could concurrently be removed when a now stale return value of
- * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
- * this happens we'll just retry.
+ * The replication slot mechanism is used to prevent the removal of
+ * required WAL.
+ *
+ * Acquire an exclusive lock to prevent the checkpoint process from
+ * concurrently computing the minimum slot LSN (see
+ * CheckPointReplicationSlots). This ensures that the WAL reserved for
+ * replication cannot be removed during a checkpoint.
+ *
+ * The mechanism is reliable because if WAL reservation occurs first, the
+ * checkpoint must wait for the restart_lsn update before determining the
+ * minimum non-removable LSN. On the other hand, if the checkpoint happens
+ * first, subsequent WAL reservations will select positions at or beyond
+ * the redo pointer of that checkpoint.
*/
- while (true)
- {
- XLogSegNo segno;
- XLogRecPtr restart_lsn;
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
- /*
- * For logical slots log a standby snapshot and start logical decoding
- * at exactly that position. That allows the slot to start up more
- * quickly. But on a standby we cannot do WAL writes, so just use the
- * replay pointer; effectively, an attempt to create a logical slot on
- * standby will cause it to wait for an xl_running_xact record to be
- * logged independently on the primary, so that a snapshot can be
- * built using the record.
- *
- * None of this is needed (or indeed helpful) for physical slots as
- * they'll start replay at the last logged checkpoint anyway. Instead
- * return the location of the last redo LSN. While that slightly
- * increases the chance that we have to retry, it's where a base
- * backup has to start replay at.
- */
- if (SlotIsPhysical(slot))
- restart_lsn = GetRedoRecPtr();
- else if (RecoveryInProgress())
- restart_lsn = GetXLogReplayRecPtr(NULL);
- else
- restart_lsn = GetXLogInsertRecPtr();
+ /*
+ * For logical slots log a standby snapshot and start logical decoding at
+ * exactly that position. That allows the slot to start up more quickly.
+ * But on a standby we cannot do WAL writes, so just use the replay
+ * pointer; effectively, an attempt to create a logical slot on standby
+ * will cause it to wait for an xl_running_xact record to be logged
+ * independently on the primary, so that a snapshot can be built using the
+ * record.
+ *
+ * None of this is needed (or indeed helpful) for physical slots as
+ * they'll start replay at the last logged checkpoint anyway. Instead,
+ * return the location of the last redo LSN, where a base backup has to
+ * start replay at.
+ */
+ if (SlotIsPhysical(slot))
+ restart_lsn = GetRedoRecPtr();
+ else if (RecoveryInProgress())
+ restart_lsn = GetXLogReplayRecPtr(NULL);
+ else
+ restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
- /* prevent WAL removal as fast as possible */
- ReplicationSlotsComputeRequiredLSN();
+ /* prevent WAL removal as fast as possible */
+ ReplicationSlotsComputeRequiredLSN();
- /*
- * If all required WAL is still there, great, otherwise retry. The
- * slot should prevent further removal of WAL, unless there's a
- * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
- * the new restart_lsn above, so normally we should never need to loop
- * more than twice.
- */
- XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
- if (XLogGetLastRemovedSegno() < segno)
- break;
- }
+ /* Checkpoint shouldn't remove the required WAL. */
+ XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+ if (XLogGetLastRemovedSegno() >= segno)
+ elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+ NameStr(slot->data.name));
+
+ LWLockRelease(ReplicationSlotAllocationLock);
if (!RecoveryInProgress() && SlotIsLogical(slot))
{
* acquiring a slot we cannot take the control lock - but that's OK,
* because holding ReplicationSlotAllocationLock is strictly stronger, and
* enough to guarantee that nobody can change the in_use bits on us.
+ *
+ * Additionally, acquiring the Allocation lock is necessary to serialize
+ * the slot flush process with concurrent slot WAL reservation. This
+ * ensures that the WAL position being reserved is either flushed to disk
+ * or is beyond or equal to the redo pointer of the current checkpoint
+ * (See ReplicationSlotReserveWal for details).
*/
LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);