D.datname AS database,
L.active,
L.data_xmin,
- L.restart_decoding_lsn
+ L.restart_lsn
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
uint32 version;
uint32 length;
- /* data with potentially evolving format */
- ReplicationSlot slot;
+ ReplicationSlotPersistentData slotdata;
} ReplicationSlotOnDisk;
/* size of the part of the slot that is version independent */
#define ReplicationSlotOnDiskConstantSize \
- offsetof(ReplicationSlotOnDisk, slot)
+ offsetof(ReplicationSlotOnDisk, slotdata)
/* size of the slots that is not version indepenent */
#define ReplicationSlotOnDiskDynamicSize \
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
- if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("replication slot \"%s\" already exists", name)));
*/
Assert(!slot->in_use);
Assert(!slot->active);
- slot->data_xmin = InvalidTransactionId;
+ slot->data.data_xmin = InvalidTransactionId;
slot->effective_data_xmin = InvalidTransactionId;
- strncpy(NameStr(slot->name), name, NAMEDATALEN);
- NameStr(slot->name)[NAMEDATALEN - 1] = '\0';
- slot->database = db_specific ? MyDatabaseId : InvalidOid;
- slot->restart_decoding = InvalidXLogRecPtr;
+ strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
+ NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
+ slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
+ slot->data.restart_lsn = InvalidXLogRecPtr;
/*
* Create the slot on disk. We haven't actually marked the slot allocated
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
- if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
{
volatile ReplicationSlot *vslot = s;
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
- if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
{
volatile ReplicationSlot *vslot = s;
errmsg("replication slot \"%s\" is already active", name)));
/* Generate pathnames. */
- sprintf(path, "pg_replslot/%s", NameStr(slot->name));
- sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
/*
* Rename the slot directory on disk, so that we'll no longer recognize
Assert(MyReplicationSlot != NULL);
- sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->name));
+ sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
SaveSlotToPath(MyReplicationSlot, path);
}
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
- XLogRecPtr restart_decoding;
+ XLogRecPtr restart_lsn;
if (!s->in_use)
continue;
volatile ReplicationSlot *vslot = s;
SpinLockAcquire(&s->mutex);
- restart_decoding = vslot->restart_decoding;
+ restart_lsn = vslot->data.restart_lsn;
SpinLockRelease(&s->mutex);
}
- if (restart_decoding != InvalidXLogRecPtr &&
+ if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr ||
- restart_decoding < min_required))
- min_required = restart_decoding;
+ restart_lsn < min_required))
+ min_required = restart_lsn;
}
LWLockRelease(ReplicationSlotControlLock);
char tmppath[MAXPGPATH];
char path[MAXPGPATH];
- sprintf(path, "pg_replslot/%s", NameStr(slot->name));
- sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
if (mkdir(tmppath, S_IRWXU) < 0)
ereport(ERROR,
SpinLockAcquire(&slot->mutex);
- cp.slot.data_xmin = slot->data_xmin;
- cp.slot.effective_data_xmin = slot->effective_data_xmin;
+ cp.slotdata.data_xmin = slot->data.data_xmin;
- strcpy(NameStr(cp.slot.name), NameStr(slot->name));
+ strcpy(NameStr(cp.slotdata.name), NameStr(slot->data.name));
- cp.slot.database = slot->database;
- cp.slot.restart_decoding = slot->restart_decoding;
- cp.slot.in_use = slot->in_use;
- cp.slot.active = false;
+ cp.slotdata.database = slot->data.database;
+ cp.slotdata.restart_lsn = slot->data.restart_lsn;
SpinLockRelease(&slot->mutex);
if (slot->in_use)
continue;
- slot->data_xmin = cp.slot.data_xmin;
- /*
- * after a crash, always use xmin, not effective_xmin, the
- * slot obviously survived
- */
- slot->effective_data_xmin = cp.slot.data_xmin;
- strcpy(NameStr(slot->name), NameStr(cp.slot.name));
- slot->database = cp.slot.database;
- slot->restart_decoding = cp.slot.restart_decoding;
- /* ignore previous values, they are transient */
+ /* restore the entire set of persistent data */
+ memcpy(&slot->data, &cp.slotdata,
+ sizeof(ReplicationSlotPersistentData));
+
+ /* initialize in memory state */
+ slot->effective_data_xmin = cp.slotdata.data_xmin;
slot->in_use = true;
slot->active = false;
+
restored = true;
break;
}
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
- /*
- * Acquire a logical decoding slot, this will check for conflicting
- * names.
- */
+ /* acquire replication slot, this will check for conflicting names*/
ReplicationSlotCreate(NameStr(*name), false);
- values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->name));
+ values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
nulls[0] = false;
nulls[1] = true;
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
Datum values[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS];
bool nulls[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS];
- char location[MAXFNAMELEN];
- const char *slot_name;
+
TransactionId data_xmin;
- XLogRecPtr last_lsn;
+ XLogRecPtr restart_lsn;
bool active;
Oid database;
+ const char *slot_name;
+
+ char restart_lsn_s[MAXFNAMELEN];
int i;
SpinLockAcquire(&slot->mutex);
}
else
{
- data_xmin = slot->data_xmin;
+ data_xmin = slot->data.data_xmin;
+ database = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ slot_name = pstrdup(NameStr(slot->data.name));
+
active = slot->active;
- database = slot->database;
- last_lsn = slot->restart_decoding;
- slot_name = pstrdup(NameStr(slot->name));
}
SpinLockRelease(&slot->mutex);
memset(nulls, 0, sizeof(nulls));
- snprintf(location, sizeof(location), "%X/%X",
- (uint32) (last_lsn >> 32), (uint32) last_lsn);
+ snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
+ (uint32) (restart_lsn >> 32), (uint32) restart_lsn);
i = 0;
values[i++] = CStringGetTextDatum(slot_name);
values[i++] = database;
values[i++] = BoolGetDatum(active);
values[i++] = TransactionIdGetDatum(data_xmin);
- values[i++] = CStringGetTextDatum(location);
+ values[i++] = CStringGetTextDatum(restart_lsn_s);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname);
- if (MyReplicationSlot->database != InvalidOid)
+ if (MyReplicationSlot->data.database != InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
initStringInfo(&output_message);
- slot_name = NameStr(MyReplicationSlot->name);
+ slot_name = NameStr(MyReplicationSlot->data.name);
pq_beginmessage(&buf, 'T');
pq_sendint(&buf, 1, 2); /* 1 field */
Assert(lsn != InvalidXLogRecPtr);
SpinLockAcquire(&slot->mutex);
- if (slot->restart_decoding != lsn)
+ if (slot->data.restart_lsn != lsn)
{
changed = true;
- slot->restart_decoding = lsn;
+ slot->data.restart_lsn = lsn;
}
SpinLockRelease(&slot->mutex);
*/
if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
{
- if (MyReplicationSlot->database != InvalidOid)
+ if (MyReplicationSlot->data.database != InvalidOid)
elog(ERROR, "cannot handle changeset extraction yet");
else
PhysicalConfirmReceivedLocation(flushPtr);
* by data_xmin and effective_data_xmin since the consequences of a
* missed increase aren't bad, so set both at once.
*/
- if (!TransactionIdIsNormal(slot->data_xmin) ||
+ if (!TransactionIdIsNormal(slot->data.data_xmin) ||
!TransactionIdIsNormal(feedbackXmin) ||
- TransactionIdPrecedes(slot->data_xmin, feedbackXmin))
+ TransactionIdPrecedes(slot->data.data_xmin, feedbackXmin))
{
changed = true;
- slot->data_xmin = feedbackXmin;
+ slot->data.data_xmin = feedbackXmin;
slot->effective_data_xmin = feedbackXmin;
}
SpinLockRelease(&slot->mutex);
DESCR("create a physical replication slot");
DATA(insert OID = 3781 ( drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 23 "19" _null_ _null_ _null_ _null_ drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
-DATA(insert OID = 3475 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_decoding_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3475 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
/* event triggers */
#include "storage/shmem.h"
#include "storage/spin.h"
-
-/*
- * Shared memory state of a single replication slot.
- */
-typedef struct ReplicationSlot
+typedef struct ReplicationSlotPersistentData
{
- /* lock, on same cacheline as effective_xmin */
- slock_t mutex;
-
- /* on-disk xmin horizon, updated first */
- TransactionId data_xmin;
-
- /* in-memory xmin horizon, updated after syncing to disk, used for computations */
- TransactionId effective_data_xmin;
-
- /* is this slot defined */
- bool in_use;
-
- /* is somebody streaming out changes for this slot */
- bool active;
-
/* The slot's identifier */
NameData name;
/* database the slot is active on */
Oid database;
+ /* xmin horizon for data */
+ TransactionId data_xmin;
+
/* ----
- * For logical decoding, this contains the point where, after a shutdown,
- * crash, whatever where do we have to restart decoding from to
+ * For logical decoding, this contains the point where, after a shutdown
+ * or crash, the to have to restart decoding from to
* a) find a valid & ready snapshot
* b) the complete content for all in-progress xacts
*
* be removed.
* ----
*/
- XLogRecPtr restart_decoding;
+ XLogRecPtr restart_lsn;
+
+} ReplicationSlotPersistentData;
+
+/*
+ * Shared memory state of a single replication slot.
+ */
+typedef struct ReplicationSlot
+{
+ /* lock, on same cacheline as effective_xmin */
+ slock_t mutex;
+
+ /* is this slot defined */
+ bool in_use;
+
+ /* is somebody streaming out changes for this slot */
+ bool active;
+
+ /* data surviving shutdowns and crashes */
+ ReplicationSlotPersistentData data;
+
+ /* in-memory xmin horizon, updated after syncing to disk, used for computations */
+ TransactionId effective_data_xmin;
+
} ReplicationSlot;
/*