From 050cbe6ad4a39d8d3800965b3383f09ff5b7b2c9 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 27 Jan 2014 16:14:28 -0500 Subject: [PATCH] Hack on slot error recovery. --- src/backend/replication/slot.c | 467 ++++++++++++++------------------- src/include/storage/lwlock.h | 14 +- 2 files changed, 200 insertions(+), 281 deletions(-) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index ca51eef433..f69af0a075 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -12,46 +12,29 @@ * * NOTES * - * Replication slots are used to keep state about replicas of the cluster - * they are allocated on, primarily to avoid removing resources that are - * still required on replicas, but also to keep information for monitoring - * purposes. - * They need to be permanent (to allow restarts), crash-safe and - * allocatable on standbys (to support cascading setups). Thus they need to - * be stored outside the catalog as writing to the catalog in a crashsafe - * manner isn't possible on standbys. - * Such slots are used both for streaming replication and changeset - * extraction. For the latter sometimes slot specific data needs to be - * serialized to disk to avoid exhausting memory. - * For both, changeset extraction and streaming replication we need to - * prevent that still required WAL will be removed/recycled and that rows - * we still need get vacuumed away. + * Replication slots are used to keep state about replication streams + * originating from this cluster. Their primary purpose is to prevent the + * premature removal of WAL or of old tuple versions in a manner that would + * interfere with replication; they also useful for monitoring purposes. + * Slots need to be permanent (to allow restarts), crash-safe, and allocatable + * on standbys (to support cascading setups). The requirement that slots be + * usable on standbys precludes storing them in the system catalogs. * - * To allow slots to be created on standbys and to allow additional data to - * be stored per slot, each replication slot gets its own directory inside - * the $PGDATA/pg_replslot directory. Inside that directory the /state file - * will contain the slot's own data. Additional data can be stored - * alongside that file if required. + * Each replication slot gets its own directory inside the $PGDATA/pg_replslot + * directory. Inside that directory the /state file will contain the slot's + * own data. Additional data can be stored alongside that file if required. + * While the server is running, the state data is also cached in memory for + * efficiency. * - * While the server is running it would be inefficient always read the - * state files from disk, instead the data is loaded into memory at startup - * and most of the time only that data is accessed. Only when it is - * required that a certain value needs to be the same after a restart - * individual slots are serialized to disk. + * ReplicationSlotCtlLock must be taken in exclusive mode to change the name + * or in_use flag of a slot. + * name of a slot or the state of a slot which is currently marked RSS_FREE; + * otherwise, the state can be changed by a process that holds the slot's + * spinlock. Thus, a process which acquires ReplicationSlotCtlLock in shared + * mode can iterate over the slot array and check whether each slot is free. + * Any other examination of or change to the slot requires taking the spinlock. * - * Since the individual resources that need to be managed can be described - * by a simple number (xmin horizon, minimal required LSN), we compute the - * minimum value across all slots and store it in a separate struct, so we - * don't have to access all slots when accessing the global minimum. - * - * The shared memory data structures are protected by the - * ReplicationSlotCtlLock lwlock protecting the global values in - * ReplicationSlotCtlData and each slot's name and in_use - * fields. Additionally each slot has a spinlock protecting the remaining - * values. That means that the existance of slots and their names can be - * tested while holding the lwlock in shared mode, without holding the - * individual spinlocks. - * ------------------------------------------------------------------------- + *------------------------------------------------------------------------- */ #include "postgres.h" @@ -116,7 +99,6 @@ int max_replication_slots = 0; /* the maximum number of replication slots */ static void RestoreSlot(const char *name); static void CreateSlot(ReplicationSlot *slot); static void SaveSlotGuts(ReplicationSlot *slot, const char *path); -static void DeleteSlot(ReplicationSlot *slot); static void SaveSlot(ReplicationSlot *slot); /* @@ -229,8 +211,7 @@ ReplicationSlotValidateName(const char *name, int elevel) void ReplicationSlotCreate(const char *name, bool db_specific) { - ReplicationSlot *slot; - bool name_in_use; + ReplicationSlot *slot = NULL; int i; Assert(MyReplicationSlot == NULL); @@ -238,103 +219,85 @@ ReplicationSlotCreate(const char *name, bool db_specific) ReplicationSlotValidateName(name, ERROR); /* - * Prevent concurrent creation of slots, so we can safely prevent - * duplicate names. + * If some other backend ran this code currently with us, we'd likely + * both allocate the same slot, and that would be bad. We'd also be + * at risk of missing a name collision. Also, we don't want to try to + * create a new slot while somebody's busy cleaning up an old one, because + * we might both be monkeying with the same directory. */ - LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE); + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); /* - * First, make sure the requested name is not in use. No other slots can - * be created while we're holding ReplicationSlotCtlLock. + * Check for name collision, and identify an allocatable slot. We need + * to hold ReplicationSlotControlLock in shared mode for this, so that + * nobody else can change the in_use flags while we're looking at them. */ - name_in_use = false; - for (i = 0; i < max_replication_slots && !name_in_use; i++) - { - ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - - if (s->in_use && strcmp(name, NameStr(s->name)) == 0) - name_in_use = true; - } - - if (name_in_use) - { - LWLockRelease(ReplicationSlotCtlLock); - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("replication slot \"%s\" already exists", name))); - } - - /* - * Find the first slot which is not in use (and thus not active either). - */ - slot = NULL; + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - if (!s->in_use) - { - Assert(!s->active); - /* NOT releasing the lock yet */ + if (s->in_use && strcmp(name, NameStr(s->name)) == 0) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication slot \"%s\" already exists", name))); + if (!s->in_use && slot == NULL) slot = s; - break; - } } + LWLockRelease(ReplicationSlotControlLock); - if (!slot) - { - LWLockRelease(ReplicationSlotCtlLock); + /* If all slots are in use, we're out of luck. */ + if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("all replication slots are in use"), errhint("Free one or increase max_replication_slots."))); - } /* - * we can be sure that creation of the slot succeeds now, so mark it as - * created + * Since this slot is not in use, nobody should be looking at any + * part of it other than the in_use field unless they're trying to allocate + * it. And since we hold ReplicationSlotAllocationLock, nobody except us + * can be doing that. So it's safe to initialize the slot. */ - SpinLockAcquire(&slot->mutex); - - slot->in_use = true; - slot->active = true; - if (db_specific) - slot->database = MyDatabaseId; - else - slot->database = InvalidOid; - + Assert(!slot->in_use); + Assert(!slot->active); + slot->data_xmin = InvalidTransactionId; + slot->effective_data_xmin = InvalidTransactionId; strncpy(NameStr(slot->name), name, NAMEDATALEN); NameStr(slot->name)[NAMEDATALEN - 1] = '\0'; + slot->database = db_specific ? MyDatabaseId : InvalidOid; + slot->restart_decoding = InvalidXLogRecPtr; + slot->confirmed_flush = InvalidXLogRecPtr; - /* release spinlock so it can be examined by others */ - SpinLockRelease(&slot->mutex); + /* + * Create the slot on disk. We haven't actually marked the slot allocated + * yet, so no special cleanup is required if this errors out. + */ + CreateSlot(slot); - LWLockRelease(ReplicationSlotCtlLock); + /* + * We need to briefly prevent any other backend from iterating over the + * slots while we flip the in_use flag. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + slot->in_use = true; + LWLockRelease(ReplicationSlotControlLock); /* - * Now create the on-disk data structures for this replication slot, - * thereby guaranteeing it's persistency. If we fail while manipulating - * the on-disk state in a acceptable manner, we cleanup and mark this slot - * as unused. + * Now that the slot has been marked as in_use, it's safe to let somebody + * else try to allocate a slot. */ - PG_TRY(); - { - CreateSlot(slot); - } - PG_CATCH(); + LWLockRelease(ReplicationSlotAllocationLock); + + /* We can now mark the slot active, and that makes it our slot. */ { - LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE); + volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); - slot->in_use = false; - slot->active = false; + vslot->active = true; SpinLockRelease(&slot->mutex); - LWLockRelease(ReplicationSlotCtlLock); - - PG_RE_THROW(); + MyReplicationSlot = slot; } - PG_END_TRY(); - - MyReplicationSlot = slot; } /* @@ -343,69 +306,54 @@ ReplicationSlotCreate(const char *name, bool db_specific) void ReplicationSlotAcquire(const char *name) { - ReplicationSlot *slot; + ReplicationSlot *slot = NULL; int i; + bool active = false; Assert(MyReplicationSlot == NULL); ReplicationSlotValidateName(name, ERROR); - LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED); - + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { slot = &ReplicationSlotCtl->replication_slots[i]; if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0) { - /* NOT releasing the lock yet */ + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&slot->mutex); + active = vslot->active; + vslot->active = true; + SpinLockRelease(&slot->mutex); break; } } + LWLockRelease(ReplicationSlotControlLock); + /* If we did not find the slot or it was already active, error out. */ if (slot == NULL) - { - LWLockRelease(ReplicationSlotCtlLock); ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); - } - - /* acquire spinlock so we can test and set ->active safely */ - SpinLockAcquire(&slot->mutex); - - if (slot->active) - { - SpinLockRelease(&slot->mutex); - LWLockRelease(ReplicationSlotCtlLock); + if (active) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name))); - } - - /* we definitely have the slot, no errors possible anymore */ - slot->active = true; - SpinLockRelease(&slot->mutex); - - LWLockRelease(ReplicationSlotCtlLock); - - PG_TRY(); - { - /* - * We don't really need to save here, but doing so guarantees the slot - * is in a good state. - */ - SaveSlot(slot); - } - PG_CATCH(); - { - SpinLockAcquire(&slot->mutex); - slot->active = false; - SpinLockRelease(&slot->mutex); - PG_RE_THROW(); - } - PG_END_TRY(); + /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; + + /* + * XXX. There used to be code to save the state here with the following + * comment: "We don't really need to save here, but doing so guarantees + * the slot is in a good state." It's not clear to me what is meant by + * a good state, and it seems like the slot had better already be in + * such a state, or we're screwed anyway. From the point of view of + * crash recovery, merely acquiring the slot (without updating its + * contents) is a non-event. + */ } /* @@ -415,20 +363,18 @@ ReplicationSlotAcquire(const char *name) void ReplicationSlotRelease(void) { - ReplicationSlot *slot; - - slot = MyReplicationSlot; + ReplicationSlot *slot = MyReplicationSlot; Assert(slot != NULL && slot->active); - /* - * Note that we do not need to aquire the lwlock here, we're only marking - * the slot as inactive, not as unused. - */ - SpinLockAcquire(&slot->mutex); - slot->active = false; - MyReplicationSlot = NULL; - SpinLockRelease(&slot->mutex); + /* Mark slot inactive. We're not freeing it, just disconnecting. */ + { + volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); + vslot->active = false; + SpinLockRelease(&slot->mutex); + MyReplicationSlot = NULL; + } /* * XXX: There's not actually any need to save the slot to disk, it will be @@ -446,87 +392,113 @@ ReplicationSlotDrop(const char *name) { ReplicationSlot *slot = NULL; int i; + bool active; + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; ReplicationSlotValidateName(name, ERROR); - LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE); + /* + * If some other backend ran this code currently with us, we might both + * try to free the same slot at the same time. Or we might try to delete + * a slot with a certain name while someone else was trying to create a + * slot with the same name. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { slot = &ReplicationSlotCtl->replication_slots[i]; - if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0) - break; + { + volatile ReplicationSlot *vslot = slot; - slot = NULL; + SpinLockAcquire(&slot->mutex); + active = vslot->active; + vslot->active = true; + SpinLockRelease(&slot->mutex); + break; + } } + LWLockRelease(ReplicationSlotControlLock); + /* If we did not find the slot or it was already active, error out. */ if (slot == NULL) - { - LWLockRelease(ReplicationSlotCtlLock); ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); - } - - /* - * Test whether the slot's currently acquired, we only need the spinlock - * for that test it cannot be newly acquired as that would require taking - * the lwlock. - */ - SpinLockAcquire(&slot->mutex); - if (slot->active) - { - SpinLockRelease(&slot->mutex); - LWLockRelease(ReplicationSlotCtlLock); + if (active) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not drop active replication slot \"%s\"", name))); - } - - /* mark slot as active while we're releasing the SlotCtl lock */ - slot->active = true; - SpinLockRelease(&slot->mutex); + errmsg("replication slot \"%s\" is already active", name))); - /* - * Intermittently release the slot control lock. We've marked the slot as - * active by this slot so it is safe to do so. We can't easily hold the - * ctl lock across the PG_TRY/CATCH below since elog.c manipulates - * InterruptHoldoffCount. - */ - LWLockRelease(ReplicationSlotCtlLock); + /* Generate pathnames. */ + sprintf(path, "pg_replslot/%s", NameStr(slot->name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name)); /* - * Now try to delete the slot's on-disk state. If we fail but not PANIC - * the in-memory state will still be valid. + * Rename the slot directory on disk, so that we'll no longer recognize + * this as a valid slot. Note that if this fails, we've got to mark the + * slot inactive again before bailing out. */ - PG_TRY(); - { - DeleteSlot(slot); - } - PG_CATCH(); + if (rename(path, tmppath) != 0) { + volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); - slot->active = false; + vslot->active = false; SpinLockRelease(&slot->mutex); - PG_RE_THROW(); + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename \"%s\" to \"%s\": %m", + path, tmppath))); } - PG_END_TRY(); + /* + * We need to fsync() the directory we just renamed and its parent to make + * sure that our changes are on disk in a crash-safe fashion. If fsync() + * fails, we can't be sure whether the changes are on disk or not. For + * now, we handle that by panicking; StartupReplicationSlots() will + * try to straighten it out after restart. + */ + START_CRIT_SECTION(); + fsync_fname(tmppath, true); + fsync_fname("pg_replslot", true); + END_CRIT_SECTION(); /* - * Ok, everything gone, after a crash we now wouldn't restore this slot, - * so remove the in-memory state as well. + * The slot is definitely gone. Lock out concurrent scans of the array + * long enough to kill it. It's OK to clear the active flag here without + * grabbing the mutex because nobody else can be scanning the array here, + * and nobody can be attached to this slot and thus access it without + * scanning the array. */ - LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE); - SpinLockAcquire(&slot->mutex); + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); slot->active = false; slot->in_use = false; - SpinLockRelease(&slot->mutex); - LWLockRelease(ReplicationSlotCtlLock); + LWLockRelease(ReplicationSlotControlLock); /* slot is dead and doesn't nail the xmin anymore, recompute horizon */ ReplicationSlotsComputeRequiredXmin(false); + + /* + * If removing the directory fails, the worst thing that will happen is + * that the user won't be able to create a new slot with the same name + * until the next server restart. We warn about it, but that's all. + */ + if (!rmtree(tmppath, true)) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove directory \"%s\"", tmppath))); + + /* + * We release this at the very end, so that nobody starts trying to create + * a slot while we're still cleaning up the detritus of the old one. + */ + LWLockRelease(ReplicationSlotAllocationLock); } /* @@ -558,23 +530,21 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) Assert(ReplicationSlotCtl != NULL); - /* make sure slots aren't concurrently dropped/created */ - LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED); - + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; TransactionId effective_data_xmin; - /* extra block to ensure nothing is accessed without spinlock */ + if (!s->in_use) + continue; + { - ReplicationSlot *slot; - slot = &ReplicationSlotCtl->replication_slots[i]; - if (!slot->in_use) - continue; + volatile ReplicationSlot *vslot = s; - SpinLockAcquire(&slot->mutex); - effective_data_xmin = slot->effective_data_xmin; - SpinLockRelease(&slot->mutex); + SpinLockAcquire(&s->mutex); + effective_data_xmin = vslot->effective_data_xmin; + SpinLockRelease(&s->mutex); } /* check the data xmin */ @@ -583,7 +553,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) TransactionIdPrecedes(effective_data_xmin, agg_data_xmin))) agg_data_xmin = effective_data_xmin; } - LWLockRelease(ReplicationSlotCtlLock); + LWLockRelease(ReplicationSlotControlLock); ProcArraySetPeggedXmin(agg_data_xmin, already_locked); } @@ -600,23 +570,21 @@ ReplicationSlotsComputeRequiredLSN(void) Assert(ReplicationSlotCtl != NULL); - /* make sure slots aren't concurrently dropped/created */ - LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED); - + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_decoding; - /* extra block to ensure nothing is accessed without spinlock */ + if (!s->in_use) + continue; + { - ReplicationSlot *slot; - slot = &ReplicationSlotCtl->replication_slots[i]; - if (!slot->in_use) - continue; + volatile ReplicationSlot *vslot = s; - SpinLockAcquire(&slot->mutex); - restart_decoding = slot->restart_decoding; - SpinLockRelease(&slot->mutex); + SpinLockAcquire(&s->mutex); + restart_decoding = vslot->restart_decoding; + SpinLockRelease(&s->mutex); } if (restart_decoding != InvalidXLogRecPtr && @@ -624,8 +592,7 @@ ReplicationSlotsComputeRequiredLSN(void) restart_decoding < min_required)) min_required = restart_decoding; } - - LWLockRelease(ReplicationSlotCtlLock); + LWLockRelease(ReplicationSlotControlLock); XLogSetPeggedLSN(min_required); } @@ -897,46 +864,6 @@ SaveSlotGuts(ReplicationSlot *slot, const char *dir) END_CRIT_SECTION(); } -/* - * Delete a single slot from disk, not touching the in-memory state. - */ -static void -DeleteSlot(ReplicationSlot *slot) -{ - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; - - sprintf(path, "pg_replslot/%s", NameStr(slot->name)); - sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name)); - - if (rename(path, tmppath) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rename \"%s\" to \"%s\": %m", - path, tmppath))); - - /* Check CreateSlot() for the reasoning of using a crit. section. */ - START_CRIT_SECTION(); - - fsync_fname(tmppath, true); - fsync_fname("pg_replslot", true); - - END_CRIT_SECTION(); - - /* - * If we fail during removal of the directory, we'll just be unable to - * create new slots of this name till a restart of the server. Don't - * panic. - */ - if (!rmtree(tmppath, true)) - { - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not remove directory \"%s\"", tmppath), - errmsg("some useless files may be left behind in that directory preventing creation of new slots"))); - } -} - /* * Load a single slot from disk into memory. */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index cc63836081..83ccb15df8 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -125,8 +125,9 @@ extern LWLockPadded *MainLWLockArray; #define BackgroundWorkerLock (&MainLWLockArray[33].lock) #define DynamicSharedMemoryControlLock (&MainLWLockArray[34].lock) #define AutoFileLock (&MainLWLockArray[35].lock) -#define ReplicationSlotCtlLock (&MainLWLockArray[36].lock) -#define NUM_INDIVIDUAL_LWLOCKS 37 +#define ReplicationSlotAllocationLock (&MainLWLockArray[36].lock) +#define ReplicationSlotControlLock (&MainLWLockArray[37].lock) +#define NUM_INDIVIDUAL_LWLOCKS 38 /* * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS @@ -145,15 +146,6 @@ extern LWLockPadded *MainLWLockArray; #define LOG2_NUM_PREDICATELOCK_PARTITIONS 4 #define NUM_PREDICATELOCK_PARTITIONS (1 << LOG2_NUM_PREDICATELOCK_PARTITIONS) -/* Offsets for various chunks of preallocated lwlocks. */ -#define BUFFER_MAPPING_LWLOCK_OFFSET NUM_INDIVIDUAL_LWLOCKS -#define LOCK_MANAGER_LWLOCK_OFFSET \ - (BUFFER_MAPPING_LWLOCK_OFFSET + NUM_BUFFER_PARTITIONS) -#define PREDICATELOCK_MANAGER_LWLOCK_OFFSET \ - (NUM_INDIVIDUAL_LWLOCKS + NUM_LOCK_PARTITIONS) -#define NUM_FIXED_LWLOCKS \ - (PREDICATELOCK_MANAGER_LWLOCK_OFFSET + NUM_PREDICATELOCK_PARTITIONS) - typedef enum LWLockMode { LW_EXCLUSIVE, -- 2.39.5