}
/*
- * Initialize a logical or physical replication slot and wait for an initial
- * consistent point to start sending changes from.
+ * Create a new replication slot.
*/
static void
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
slot_name = NameStr(MyReplicationSlot->data.name);
+ /*
+ * It may seem somewhat pointless to send back the same slot name the
+ * client just requested and nothing else, but logical replication
+ * will add more fields here. (We could consider removing the slot
+ * name from what's sent back, though, since the client has specified
+ * that.)
+ */
+
pq_beginmessage(&buf, 'T');
pq_sendint(&buf, 1, 2); /* 1 field */
/* first field: slot name */
- pq_sendstring(&buf, "replication_id"); /* col name */
+ pq_sendstring(&buf, "slot_name"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
}
/*
- * Free permanent state by a now inactive but defined logical slot.
+ * Get rid of a replication slot that is no longer wanted.
*/
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
- /* no need to check decoding requirements here */;
ReplicationSlotDrop(cmd->slotname);
EndCommand("DROP_REPLICATION_SLOT", DestRemote);
}
+
/*
* Execute an incoming replication command.
*/
/* database the slot is active on */
Oid database;
- /* xmin horizon for data */
- TransactionId data_xmin;
-
- /* ----
- * For logical decoding, this contains the point where, after a shutdown
- * or crash, the to have to restart decoding from to
- * a) find a valid & ready snapshot
- * b) the complete content for all in-progress xacts
- *
- * For streaming replication, this contains the oldest LSN (in any
- * timeline) the standb might ask for.
+ /*
+ * xmin horizon for data
*
- * For both only WAL segments that are smaller than restart_decoding, will
- * be removed.
- * ----
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_data_xmin, below.
*/
+ TransactionId data_xmin;
+
+ /* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn;
} ReplicationSlotPersistentData;
/* is somebody streaming out changes for this slot */
bool active;
- /* data surviving shutdowns and crashes */
- ReplicationSlotPersistentData data;
-
- /* in-memory xmin horizon, updated after syncing to disk, used for computations */
+ /*
+ * For logical decoding, it's extremely important that we never remove any
+ * data that's still needed for decoding purposes, even after a crash;
+ * otherwise, decoding will produce wrong answers. Ordinary streaming
+ * replication also needs to prevent old row versions from being removed
+ * too soon, but the worst consequence we might encounter there is unwanted
+ * query cancellations on the standby. Thus, for logical decoding,
+ * this value represents the latest data_xmin that has actually been
+ * written to disk, whereas for streaming replication, it's just the
+ * same as the persistent value (data.data_xmin).
+ */
TransactionId effective_data_xmin;
+ /* data surviving shutdowns and crashes */
+ ReplicationSlotPersistentData data;
} ReplicationSlot;
/*