From: Andres Freund Date: Tue, 28 Jan 2014 19:16:27 +0000 (+0100) Subject: slot: Separate data stored on-disk from purely in-memory state X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/?a=commitdiff_plain;h=3c31fa0514406a618bb1cb1a312b266f7b76a665;p=users%2Frhaas%2Fpostgres.git slot: Separate data stored on-disk from purely in-memory state --- diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index e17eeeb9ae..78d2bf15e6 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -621,7 +621,7 @@ CREATE VIEW pg_replication_slots AS D.datname AS database, L.active, L.data_xmin, - L.restart_decoding_lsn + L.restart_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cc376b8b0a..322b79a9c9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -60,13 +60,12 @@ typedef struct ReplicationSlotOnDisk uint32 version; uint32 length; - /* data with potentially evolving format */ - ReplicationSlot slot; + ReplicationSlotPersistentData slotdata; } ReplicationSlotOnDisk; /* size of the part of the slot that is version independent */ #define ReplicationSlotOnDiskConstantSize \ - offsetof(ReplicationSlotOnDisk, slot) + offsetof(ReplicationSlotOnDisk, slotdata) /* size of the slots that is not version indepenent */ #define ReplicationSlotOnDiskDynamicSize \ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize @@ -222,7 +221,7 @@ ReplicationSlotCreate(const char *name, bool db_specific) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - if (s->in_use && strcmp(name, NameStr(s->name)) == 0) + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("replication slot \"%s\" already exists", name))); @@ -246,12 +245,12 @@ ReplicationSlotCreate(const char *name, bool db_specific) */ Assert(!slot->in_use); Assert(!slot->active); - slot->data_xmin = InvalidTransactionId; + slot->data.data_xmin = InvalidTransactionId; slot->effective_data_xmin = InvalidTransactionId; - strncpy(NameStr(slot->name), name, NAMEDATALEN); - NameStr(slot->name)[NAMEDATALEN - 1] = '\0'; - slot->database = db_specific ? MyDatabaseId : InvalidOid; - slot->restart_decoding = InvalidXLogRecPtr; + strncpy(NameStr(slot->data.name), name, NAMEDATALEN); + NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0'; + slot->data.database = db_specific ? MyDatabaseId : InvalidOid; + slot->data.restart_lsn = InvalidXLogRecPtr; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -309,7 +308,7 @@ ReplicationSlotAcquire(const char *name) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - if (s->in_use && strcmp(name, NameStr(s->name)) == 0) + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { volatile ReplicationSlot *vslot = s; @@ -396,7 +395,7 @@ ReplicationSlotDrop(const char *name) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - if (s->in_use && strcmp(name, NameStr(s->name)) == 0) + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { volatile ReplicationSlot *vslot = s; @@ -421,8 +420,8 @@ ReplicationSlotDrop(const char *name) errmsg("replication slot \"%s\" is already active", name))); /* Generate pathnames. */ - sprintf(path, "pg_replslot/%s", NameStr(slot->name)); - sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name)); + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); /* * Rename the slot directory on disk, so that we'll no longer recognize @@ -498,7 +497,7 @@ ReplicationSlotSave(void) Assert(MyReplicationSlot != NULL); - sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->name)); + sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); SaveSlotToPath(MyReplicationSlot, path); } @@ -560,7 +559,7 @@ ReplicationSlotsComputeRequiredLSN(void) for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_decoding; + XLogRecPtr restart_lsn; if (!s->in_use) continue; @@ -569,14 +568,14 @@ ReplicationSlotsComputeRequiredLSN(void) volatile ReplicationSlot *vslot = s; SpinLockAcquire(&s->mutex); - restart_decoding = vslot->restart_decoding; + restart_lsn = vslot->data.restart_lsn; SpinLockRelease(&s->mutex); } - if (restart_decoding != InvalidXLogRecPtr && + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || - restart_decoding < min_required)) - min_required = restart_decoding; + restart_lsn < min_required)) + min_required = restart_lsn; } LWLockRelease(ReplicationSlotControlLock); @@ -692,8 +691,8 @@ CreateSlotOnDisk(ReplicationSlot *slot) char tmppath[MAXPGPATH]; char path[MAXPGPATH]; - sprintf(path, "pg_replslot/%s", NameStr(slot->name)); - sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name)); + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); if (mkdir(tmppath, S_IRWXU) < 0) ereport(ERROR, @@ -758,15 +757,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir) SpinLockAcquire(&slot->mutex); - cp.slot.data_xmin = slot->data_xmin; - cp.slot.effective_data_xmin = slot->effective_data_xmin; + cp.slotdata.data_xmin = slot->data.data_xmin; - strcpy(NameStr(cp.slot.name), NameStr(slot->name)); + strcpy(NameStr(cp.slotdata.name), NameStr(slot->data.name)); - cp.slot.database = slot->database; - cp.slot.restart_decoding = slot->restart_decoding; - cp.slot.in_use = slot->in_use; - cp.slot.active = false; + cp.slotdata.database = slot->data.database; + cp.slotdata.restart_lsn = slot->data.restart_lsn; SpinLockRelease(&slot->mutex); @@ -943,18 +939,15 @@ RestoreSlotFromDisk(const char *name) if (slot->in_use) continue; - slot->data_xmin = cp.slot.data_xmin; - /* - * after a crash, always use xmin, not effective_xmin, the - * slot obviously survived - */ - slot->effective_data_xmin = cp.slot.data_xmin; - strcpy(NameStr(slot->name), NameStr(cp.slot.name)); - slot->database = cp.slot.database; - slot->restart_decoding = cp.slot.restart_decoding; - /* ignore previous values, they are transient */ + /* restore the entire set of persistent data */ + memcpy(&slot->data, &cp.slotdata, + sizeof(ReplicationSlotPersistentData)); + + /* initialize in memory state */ + slot->effective_data_xmin = cp.slotdata.data_xmin; slot->in_use = true; slot->active = false; + restored = true; break; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 5abac8debf..e89903ec62 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -60,13 +60,10 @@ create_physical_replication_slot(PG_FUNCTION_ARGS) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - /* - * Acquire a logical decoding slot, this will check for conflicting - * names. - */ + /* acquire replication slot, this will check for conflicting names*/ ReplicationSlotCreate(NameStr(*name), false); - values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->name)); + values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); nulls[0] = false; nulls[1] = true; @@ -142,12 +139,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; Datum values[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS]; bool nulls[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS]; - char location[MAXFNAMELEN]; - const char *slot_name; + TransactionId data_xmin; - XLogRecPtr last_lsn; + XLogRecPtr restart_lsn; bool active; Oid database; + const char *slot_name; + + char restart_lsn_s[MAXFNAMELEN]; int i; SpinLockAcquire(&slot->mutex); @@ -158,18 +157,19 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) } else { - data_xmin = slot->data_xmin; + data_xmin = slot->data.data_xmin; + database = slot->data.database; + restart_lsn = slot->data.restart_lsn; + slot_name = pstrdup(NameStr(slot->data.name)); + active = slot->active; - database = slot->database; - last_lsn = slot->restart_decoding; - slot_name = pstrdup(NameStr(slot->name)); } SpinLockRelease(&slot->mutex); memset(nulls, 0, sizeof(nulls)); - snprintf(location, sizeof(location), "%X/%X", - (uint32) (last_lsn >> 32), (uint32) last_lsn); + snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X", + (uint32) (restart_lsn >> 32), (uint32) restart_lsn); i = 0; values[i++] = CStringGetTextDatum(slot_name); @@ -180,7 +180,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = database; values[i++] = BoolGetDatum(active); values[i++] = TransactionIdGetDatum(data_xmin); - values[i++] = CStringGetTextDatum(location); + values[i++] = CStringGetTextDatum(restart_lsn_s); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1c593cefe3..bb51e4ee91 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -430,7 +430,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { ReplicationSlotAcquire(cmd->slotname); - if (MyReplicationSlot->database != InvalidOid) + if (MyReplicationSlot->data.database != InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a replication slot created for changeset extraction for streaming replication")))); @@ -660,7 +660,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) initStringInfo(&output_message); - slot_name = NameStr(MyReplicationSlot->name); + slot_name = NameStr(MyReplicationSlot->data.name); pq_beginmessage(&buf, 'T'); pq_sendint(&buf, 1, 2); /* 1 field */ @@ -936,10 +936,10 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) Assert(lsn != InvalidXLogRecPtr); SpinLockAcquire(&slot->mutex); - if (slot->restart_decoding != lsn) + if (slot->data.restart_lsn != lsn) { changed = true; - slot->restart_decoding = lsn; + slot->data.restart_lsn = lsn; } SpinLockRelease(&slot->mutex); @@ -1005,7 +1005,7 @@ ProcessStandbyReplyMessage(void) */ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) { - if (MyReplicationSlot->database != InvalidOid) + if (MyReplicationSlot->data.database != InvalidOid) elog(ERROR, "cannot handle changeset extraction yet"); else PhysicalConfirmReceivedLocation(flushPtr); @@ -1027,12 +1027,12 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) * by data_xmin and effective_data_xmin since the consequences of a * missed increase aren't bad, so set both at once. */ - if (!TransactionIdIsNormal(slot->data_xmin) || + if (!TransactionIdIsNormal(slot->data.data_xmin) || !TransactionIdIsNormal(feedbackXmin) || - TransactionIdPrecedes(slot->data_xmin, feedbackXmin)) + TransactionIdPrecedes(slot->data.data_xmin, feedbackXmin)) { changed = true; - slot->data_xmin = feedbackXmin; + slot->data.data_xmin = feedbackXmin; slot->effective_data_xmin = feedbackXmin; } SpinLockRelease(&slot->mutex); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 48d4123c04..139128b627 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -4757,7 +4757,7 @@ DATA(insert OID = 3780 ( create_physical_replication_slot PGNSP PGUID 12 1 0 0 DESCR("create a physical replication slot"); DATA(insert OID = 3781 ( drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 23 "19" _null_ _null_ _null_ _null_ drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3475 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_decoding_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3475 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); /* event triggers */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 47132d8f52..dffbc5462f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,36 +15,20 @@ #include "storage/shmem.h" #include "storage/spin.h" - -/* - * Shared memory state of a single replication slot. - */ -typedef struct ReplicationSlot +typedef struct ReplicationSlotPersistentData { - /* lock, on same cacheline as effective_xmin */ - slock_t mutex; - - /* on-disk xmin horizon, updated first */ - TransactionId data_xmin; - - /* in-memory xmin horizon, updated after syncing to disk, used for computations */ - TransactionId effective_data_xmin; - - /* is this slot defined */ - bool in_use; - - /* is somebody streaming out changes for this slot */ - bool active; - /* The slot's identifier */ NameData name; /* database the slot is active on */ Oid database; + /* xmin horizon for data */ + TransactionId data_xmin; + /* ---- - * For logical decoding, this contains the point where, after a shutdown, - * crash, whatever where do we have to restart decoding from to + * For logical decoding, this contains the point where, after a shutdown + * or crash, the to have to restart decoding from to * a) find a valid & ready snapshot * b) the complete content for all in-progress xacts * @@ -55,7 +39,30 @@ typedef struct ReplicationSlot * be removed. * ---- */ - XLogRecPtr restart_decoding; + XLogRecPtr restart_lsn; + +} ReplicationSlotPersistentData; + +/* + * Shared memory state of a single replication slot. + */ +typedef struct ReplicationSlot +{ + /* lock, on same cacheline as effective_xmin */ + slock_t mutex; + + /* is this slot defined */ + bool in_use; + + /* is somebody streaming out changes for this slot */ + bool active; + + /* data surviving shutdowns and crashes */ + ReplicationSlotPersistentData data; + + /* in-memory xmin horizon, updated after syncing to disk, used for computations */ + TransactionId effective_data_xmin; + } ReplicationSlot; /*