*
* NOTES
*
- * Replication slots are used to keep state about replicas of the cluster
- * they are allocated on, primarily to avoid removing resources that are
- * still required on replicas, but also to keep information for monitoring
- * purposes.
- * They need to be permanent (to allow restarts), crash-safe and
- * allocatable on standbys (to support cascading setups). Thus they need to
- * be stored outside the catalog as writing to the catalog in a crashsafe
- * manner isn't possible on standbys.
- * Such slots are used both for streaming replication and changeset
- * extraction. For the latter sometimes slot specific data needs to be
- * serialized to disk to avoid exhausting memory.
- * For both, changeset extraction and streaming replication we need to
- * prevent that still required WAL will be removed/recycled and that rows
- * we still need get vacuumed away.
+ * Replication slots are used to keep state about replication streams
+ * originating from this cluster. Their primary purpose is to prevent the
+ * premature removal of WAL or of old tuple versions in a manner that would
+ * interfere with replication; they also useful for monitoring purposes.
+ * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
+ * on standbys (to support cascading setups). The requirement that slots be
+ * usable on standbys precludes storing them in the system catalogs.
*
- * To allow slots to be created on standbys and to allow additional data to
- * be stored per slot, each replication slot gets its own directory inside
- * the $PGDATA/pg_replslot directory. Inside that directory the /state file
- * will contain the slot's own data. Additional data can be stored
- * alongside that file if required.
+ * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
+ * directory. Inside that directory the /state file will contain the slot's
+ * own data. Additional data can be stored alongside that file if required.
+ * While the server is running, the state data is also cached in memory for
+ * efficiency.
*
- * While the server is running it would be inefficient always read the
- * state files from disk, instead the data is loaded into memory at startup
- * and most of the time only that data is accessed. Only when it is
- * required that a certain value needs to be the same after a restart
- * individual slots are serialized to disk.
+ * ReplicationSlotCtlLock must be taken in exclusive mode to change the name
+ * or in_use flag of a slot.
+ * name of a slot or the state of a slot which is currently marked RSS_FREE;
+ * otherwise, the state can be changed by a process that holds the slot's
+ * spinlock. Thus, a process which acquires ReplicationSlotCtlLock in shared
+ * mode can iterate over the slot array and check whether each slot is free.
+ * Any other examination of or change to the slot requires taking the spinlock.
*
- * Since the individual resources that need to be managed can be described
- * by a simple number (xmin horizon, minimal required LSN), we compute the
- * minimum value across all slots and store it in a separate struct, so we
- * don't have to access all slots when accessing the global minimum.
- *
- * The shared memory data structures are protected by the
- * ReplicationSlotCtlLock lwlock protecting the global values in
- * ReplicationSlotCtlData and each slot's name and in_use
- * fields. Additionally each slot has a spinlock protecting the remaining
- * values. That means that the existance of slots and their names can be
- * tested while holding the lwlock in shared mode, without holding the
- * individual spinlocks.
- * -------------------------------------------------------------------------
+ *-------------------------------------------------------------------------
*/
#include "postgres.h"
static void RestoreSlot(const char *name);
static void CreateSlot(ReplicationSlot *slot);
static void SaveSlotGuts(ReplicationSlot *slot, const char *path);
-static void DeleteSlot(ReplicationSlot *slot);
static void SaveSlot(ReplicationSlot *slot);
/*
void
ReplicationSlotCreate(const char *name, bool db_specific)
{
- ReplicationSlot *slot;
- bool name_in_use;
+ ReplicationSlot *slot = NULL;
int i;
Assert(MyReplicationSlot == NULL);
ReplicationSlotValidateName(name, ERROR);
/*
- * Prevent concurrent creation of slots, so we can safely prevent
- * duplicate names.
+ * If some other backend ran this code currently with us, we'd likely
+ * both allocate the same slot, and that would be bad. We'd also be
+ * at risk of missing a name collision. Also, we don't want to try to
+ * create a new slot while somebody's busy cleaning up an old one, because
+ * we might both be monkeying with the same directory.
*/
- LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
/*
- * First, make sure the requested name is not in use. No other slots can
- * be created while we're holding ReplicationSlotCtlLock.
+ * Check for name collision, and identify an allocatable slot. We need
+ * to hold ReplicationSlotControlLock in shared mode for this, so that
+ * nobody else can change the in_use flags while we're looking at them.
*/
- name_in_use = false;
- for (i = 0; i < max_replication_slots && !name_in_use; i++)
- {
- ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-
- if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
- name_in_use = true;
- }
-
- if (name_in_use)
- {
- LWLockRelease(ReplicationSlotCtlLock);
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("replication slot \"%s\" already exists", name)));
- }
-
- /*
- * Find the first slot which is not in use (and thus not active either).
- */
- slot = NULL;
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
- if (!s->in_use)
- {
- Assert(!s->active);
- /* NOT releasing the lock yet */
+ if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("replication slot \"%s\" already exists", name)));
+ if (!s->in_use && slot == NULL)
slot = s;
- break;
- }
}
+ LWLockRelease(ReplicationSlotControlLock);
- if (!slot)
- {
- LWLockRelease(ReplicationSlotCtlLock);
+ /* If all slots are in use, we're out of luck. */
+ if (slot == NULL)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("all replication slots are in use"),
errhint("Free one or increase max_replication_slots.")));
- }
/*
- * we can be sure that creation of the slot succeeds now, so mark it as
- * created
+ * Since this slot is not in use, nobody should be looking at any
+ * part of it other than the in_use field unless they're trying to allocate
+ * it. And since we hold ReplicationSlotAllocationLock, nobody except us
+ * can be doing that. So it's safe to initialize the slot.
*/
- SpinLockAcquire(&slot->mutex);
-
- slot->in_use = true;
- slot->active = true;
- if (db_specific)
- slot->database = MyDatabaseId;
- else
- slot->database = InvalidOid;
-
+ Assert(!slot->in_use);
+ Assert(!slot->active);
+ slot->data_xmin = InvalidTransactionId;
+ slot->effective_data_xmin = InvalidTransactionId;
strncpy(NameStr(slot->name), name, NAMEDATALEN);
NameStr(slot->name)[NAMEDATALEN - 1] = '\0';
+ slot->database = db_specific ? MyDatabaseId : InvalidOid;
+ slot->restart_decoding = InvalidXLogRecPtr;
+ slot->confirmed_flush = InvalidXLogRecPtr;
- /* release spinlock so it can be examined by others */
- SpinLockRelease(&slot->mutex);
+ /*
+ * Create the slot on disk. We haven't actually marked the slot allocated
+ * yet, so no special cleanup is required if this errors out.
+ */
+ CreateSlot(slot);
- LWLockRelease(ReplicationSlotCtlLock);
+ /*
+ * We need to briefly prevent any other backend from iterating over the
+ * slots while we flip the in_use flag.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+ slot->in_use = true;
+ LWLockRelease(ReplicationSlotControlLock);
/*
- * Now create the on-disk data structures for this replication slot,
- * thereby guaranteeing it's persistency. If we fail while manipulating
- * the on-disk state in a acceptable manner, we cleanup and mark this slot
- * as unused.
+ * Now that the slot has been marked as in_use, it's safe to let somebody
+ * else try to allocate a slot.
*/
- PG_TRY();
- {
- CreateSlot(slot);
- }
- PG_CATCH();
+ LWLockRelease(ReplicationSlotAllocationLock);
+
+ /* We can now mark the slot active, and that makes it our slot. */
{
- LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+ volatile ReplicationSlot *vslot = slot;
+
SpinLockAcquire(&slot->mutex);
- slot->in_use = false;
- slot->active = false;
+ vslot->active = true;
SpinLockRelease(&slot->mutex);
- LWLockRelease(ReplicationSlotCtlLock);
-
- PG_RE_THROW();
+ MyReplicationSlot = slot;
}
- PG_END_TRY();
-
- MyReplicationSlot = slot;
}
/*
void
ReplicationSlotAcquire(const char *name)
{
- ReplicationSlot *slot;
+ ReplicationSlot *slot = NULL;
int i;
+ bool active = false;
Assert(MyReplicationSlot == NULL);
ReplicationSlotValidateName(name, ERROR);
- LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
-
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
slot = &ReplicationSlotCtl->replication_slots[i];
if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0)
{
- /* NOT releasing the lock yet */
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&slot->mutex);
+ active = vslot->active;
+ vslot->active = true;
+ SpinLockRelease(&slot->mutex);
break;
}
}
+ LWLockRelease(ReplicationSlotControlLock);
+ /* If we did not find the slot or it was already active, error out. */
if (slot == NULL)
- {
- LWLockRelease(ReplicationSlotCtlLock);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", name)));
- }
-
- /* acquire spinlock so we can test and set ->active safely */
- SpinLockAcquire(&slot->mutex);
-
- if (slot->active)
- {
- SpinLockRelease(&slot->mutex);
- LWLockRelease(ReplicationSlotCtlLock);
+ if (active)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is already active", name)));
- }
-
- /* we definitely have the slot, no errors possible anymore */
- slot->active = true;
- SpinLockRelease(&slot->mutex);
-
- LWLockRelease(ReplicationSlotCtlLock);
-
- PG_TRY();
- {
- /*
- * We don't really need to save here, but doing so guarantees the slot
- * is in a good state.
- */
- SaveSlot(slot);
- }
- PG_CATCH();
- {
- SpinLockAcquire(&slot->mutex);
- slot->active = false;
- SpinLockRelease(&slot->mutex);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ /* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
+
+ /*
+ * XXX. There used to be code to save the state here with the following
+ * comment: "We don't really need to save here, but doing so guarantees
+ * the slot is in a good state." It's not clear to me what is meant by
+ * a good state, and it seems like the slot had better already be in
+ * such a state, or we're screwed anyway. From the point of view of
+ * crash recovery, merely acquiring the slot (without updating its
+ * contents) is a non-event.
+ */
}
/*
void
ReplicationSlotRelease(void)
{
- ReplicationSlot *slot;
-
- slot = MyReplicationSlot;
+ ReplicationSlot *slot = MyReplicationSlot;
Assert(slot != NULL && slot->active);
- /*
- * Note that we do not need to aquire the lwlock here, we're only marking
- * the slot as inactive, not as unused.
- */
- SpinLockAcquire(&slot->mutex);
- slot->active = false;
- MyReplicationSlot = NULL;
- SpinLockRelease(&slot->mutex);
+ /* Mark slot inactive. We're not freeing it, just disconnecting. */
+ {
+ volatile ReplicationSlot *vslot = slot;
+ SpinLockAcquire(&slot->mutex);
+ vslot->active = false;
+ SpinLockRelease(&slot->mutex);
+ MyReplicationSlot = NULL;
+ }
/*
* XXX: There's not actually any need to save the slot to disk, it will be
{
ReplicationSlot *slot = NULL;
int i;
+ bool active;
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
ReplicationSlotValidateName(name, ERROR);
- LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+ /*
+ * If some other backend ran this code currently with us, we might both
+ * try to free the same slot at the same time. Or we might try to delete
+ * a slot with a certain name while someone else was trying to create a
+ * slot with the same name.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
slot = &ReplicationSlotCtl->replication_slots[i];
-
if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0)
- break;
+ {
+ volatile ReplicationSlot *vslot = slot;
- slot = NULL;
+ SpinLockAcquire(&slot->mutex);
+ active = vslot->active;
+ vslot->active = true;
+ SpinLockRelease(&slot->mutex);
+ break;
+ }
}
+ LWLockRelease(ReplicationSlotControlLock);
+ /* If we did not find the slot or it was already active, error out. */
if (slot == NULL)
- {
- LWLockRelease(ReplicationSlotCtlLock);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", name)));
- }
-
- /*
- * Test whether the slot's currently acquired, we only need the spinlock
- * for that test it cannot be newly acquired as that would require taking
- * the lwlock.
- */
- SpinLockAcquire(&slot->mutex);
- if (slot->active)
- {
- SpinLockRelease(&slot->mutex);
- LWLockRelease(ReplicationSlotCtlLock);
+ if (active)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop active replication slot \"%s\"", name)));
- }
-
- /* mark slot as active while we're releasing the SlotCtl lock */
- slot->active = true;
- SpinLockRelease(&slot->mutex);
+ errmsg("replication slot \"%s\" is already active", name)));
- /*
- * Intermittently release the slot control lock. We've marked the slot as
- * active by this slot so it is safe to do so. We can't easily hold the
- * ctl lock across the PG_TRY/CATCH below since elog.c manipulates
- * InterruptHoldoffCount.
- */
- LWLockRelease(ReplicationSlotCtlLock);
+ /* Generate pathnames. */
+ sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
/*
- * Now try to delete the slot's on-disk state. If we fail but not PANIC
- * the in-memory state will still be valid.
+ * Rename the slot directory on disk, so that we'll no longer recognize
+ * this as a valid slot. Note that if this fails, we've got to mark the
+ * slot inactive again before bailing out.
*/
- PG_TRY();
- {
- DeleteSlot(slot);
- }
- PG_CATCH();
+ if (rename(path, tmppath) != 0)
{
+ volatile ReplicationSlot *vslot = slot;
+
SpinLockAcquire(&slot->mutex);
- slot->active = false;
+ vslot->active = false;
SpinLockRelease(&slot->mutex);
- PG_RE_THROW();
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ path, tmppath)));
}
- PG_END_TRY();
+ /*
+ * We need to fsync() the directory we just renamed and its parent to make
+ * sure that our changes are on disk in a crash-safe fashion. If fsync()
+ * fails, we can't be sure whether the changes are on disk or not. For
+ * now, we handle that by panicking; StartupReplicationSlots() will
+ * try to straighten it out after restart.
+ */
+ START_CRIT_SECTION();
+ fsync_fname(tmppath, true);
+ fsync_fname("pg_replslot", true);
+ END_CRIT_SECTION();
/*
- * Ok, everything gone, after a crash we now wouldn't restore this slot,
- * so remove the in-memory state as well.
+ * The slot is definitely gone. Lock out concurrent scans of the array
+ * long enough to kill it. It's OK to clear the active flag here without
+ * grabbing the mutex because nobody else can be scanning the array here,
+ * and nobody can be attached to this slot and thus access it without
+ * scanning the array.
*/
- LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
- SpinLockAcquire(&slot->mutex);
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
slot->active = false;
slot->in_use = false;
- SpinLockRelease(&slot->mutex);
- LWLockRelease(ReplicationSlotCtlLock);
+ LWLockRelease(ReplicationSlotControlLock);
/* slot is dead and doesn't nail the xmin anymore, recompute horizon */
ReplicationSlotsComputeRequiredXmin(false);
+
+ /*
+ * If removing the directory fails, the worst thing that will happen is
+ * that the user won't be able to create a new slot with the same name
+ * until the next server restart. We warn about it, but that's all.
+ */
+ if (!rmtree(tmppath, true))
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", tmppath)));
+
+ /*
+ * We release this at the very end, so that nobody starts trying to create
+ * a slot while we're still cleaning up the detritus of the old one.
+ */
+ LWLockRelease(ReplicationSlotAllocationLock);
}
/*
Assert(ReplicationSlotCtl != NULL);
- /* make sure slots aren't concurrently dropped/created */
- LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
-
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
TransactionId effective_data_xmin;
- /* extra block to ensure nothing is accessed without spinlock */
+ if (!s->in_use)
+ continue;
+
{
- ReplicationSlot *slot;
- slot = &ReplicationSlotCtl->replication_slots[i];
- if (!slot->in_use)
- continue;
+ volatile ReplicationSlot *vslot = s;
- SpinLockAcquire(&slot->mutex);
- effective_data_xmin = slot->effective_data_xmin;
- SpinLockRelease(&slot->mutex);
+ SpinLockAcquire(&s->mutex);
+ effective_data_xmin = vslot->effective_data_xmin;
+ SpinLockRelease(&s->mutex);
}
/* check the data xmin */
TransactionIdPrecedes(effective_data_xmin, agg_data_xmin)))
agg_data_xmin = effective_data_xmin;
}
- LWLockRelease(ReplicationSlotCtlLock);
+ LWLockRelease(ReplicationSlotControlLock);
ProcArraySetPeggedXmin(agg_data_xmin, already_locked);
}
Assert(ReplicationSlotCtl != NULL);
- /* make sure slots aren't concurrently dropped/created */
- LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
-
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_decoding;
- /* extra block to ensure nothing is accessed without spinlock */
+ if (!s->in_use)
+ continue;
+
{
- ReplicationSlot *slot;
- slot = &ReplicationSlotCtl->replication_slots[i];
- if (!slot->in_use)
- continue;
+ volatile ReplicationSlot *vslot = s;
- SpinLockAcquire(&slot->mutex);
- restart_decoding = slot->restart_decoding;
- SpinLockRelease(&slot->mutex);
+ SpinLockAcquire(&s->mutex);
+ restart_decoding = vslot->restart_decoding;
+ SpinLockRelease(&s->mutex);
}
if (restart_decoding != InvalidXLogRecPtr &&
restart_decoding < min_required))
min_required = restart_decoding;
}
-
- LWLockRelease(ReplicationSlotCtlLock);
+ LWLockRelease(ReplicationSlotControlLock);
XLogSetPeggedLSN(min_required);
}
END_CRIT_SECTION();
}
-/*
- * Delete a single slot from disk, not touching the in-memory state.
- */
-static void
-DeleteSlot(ReplicationSlot *slot)
-{
- char path[MAXPGPATH];
- char tmppath[MAXPGPATH];
-
- sprintf(path, "pg_replslot/%s", NameStr(slot->name));
- sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
-
- if (rename(path, tmppath) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not rename \"%s\" to \"%s\": %m",
- path, tmppath)));
-
- /* Check CreateSlot() for the reasoning of using a crit. section. */
- START_CRIT_SECTION();
-
- fsync_fname(tmppath, true);
- fsync_fname("pg_replslot", true);
-
- END_CRIT_SECTION();
-
- /*
- * If we fail during removal of the directory, we'll just be unable to
- * create new slots of this name till a restart of the server. Don't
- * panic.
- */
- if (!rmtree(tmppath, true))
- {
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not remove directory \"%s\"", tmppath),
- errmsg("some useless files may be left behind in that directory preventing creation of new slots")));
- }
-}
-
/*
* Load a single slot from disk into memory.
*/