#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
sendFile = -1;
}
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
+ /* do a minimal amount of cleanup for the !ready_to_stop cases */
+ LWLockReleaseAll();
+
replication_active = false;
if (walsender_ready_to_stop)
proc_exit(0);
* written at wal_level='minimal'.
*/
+ if (cmd->slotname)
+ {
+ ReplicationSlotAcquire(cmd->slotname);
+ if (MyReplicationSlot->database != InvalidOid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ (errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
+ }
+
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
Assert(streamingDoneSending && streamingDoneReceiving);
}
+ if (cmd->slotname)
+ ReplicationSlotRelease();
+
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
pq_puttextmessage('C', "START_STREAMING");
}
+/*
+ * Initialize a logical or physical replication slot and wait for an initial
+ * consistent point to start sending changes from.
+ */
+static void
+CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
+{
+ const char *slot_name;
+ StringInfoData buf;
+ char xpos[MAXFNAMELEN];
+ const char *snapshot_name = NULL;
+
+ Assert(!MyReplicationSlot);
+
+ /* setup state for XLogReadPage */
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = ThisTimeLineID;
+
+ ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
+
+ initStringInfo(&output_message);
+
+ slot_name = NameStr(MyReplicationSlot->name);
+ snprintf(xpos, sizeof(xpos), "%X/%X",
+ (uint32) (MyReplicationSlot->confirmed_flush >> 32),
+ (uint32) MyReplicationSlot->confirmed_flush);
+
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 4, 2); /* 4 fields */
+
+ /* first field: slot name */
+ pq_sendstring(&buf, "replication_id"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field: LSN at which we became consistent */
+ pq_sendstring(&buf, "consistent_point"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* third field: exported snapshot's name */
+ pq_sendstring(&buf, "snapshot_name"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fourth field: output plugin */
+ pq_sendstring(&buf, "plugin"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 4, 2); /* # of columns */
+
+ /* slot_name */
+ pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+ pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+ /* consistent wal location */
+ pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+ pq_sendbytes(&buf, xpos, strlen(xpos));
+
+ /* snapshot name */
+ if (snapshot_name != NULL)
+ {
+ pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+ pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+ }
+ else
+ pq_sendint(&buf, -1, 4); /* col3 len, NULL */
+
+ /* plugin */
+ if (cmd->plugin != NULL)
+ {
+ pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+ pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+ }
+ else
+ pq_sendint(&buf, -1, 4); /* col4 len, NULL */
+
+ pq_endmessage(&buf);
+
+ /*
+ * release active status again, START_REPLICATION will reacquire it
+ */
+ ReplicationSlotRelease();
+}
+
+/*
+ * Free permanent state by a now inactive but defined logical slot.
+ */
+static void
+DropReplicationSlot(DropReplicationSlotCmd *cmd)
+{
+ /* no need to check decoding requirements here */;
+ ReplicationSlotDrop(cmd->slotname);
+ EndCommand("DROP_REPLICATION_SLOT", DestRemote);
+}
/*
* Execute an incoming replication command.
*/
IdentifySystem();
break;
- case T_StartReplicationCmd:
- StartReplication((StartReplicationCmd *) cmd_node);
- break;
-
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
+ case T_CreateReplicationSlotCmd:
+ CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ break;
+
+ case T_DropReplicationSlotCmd:
+ DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ break;
+
+ case T_StartReplicationCmd:
+ {
+ StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+ if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+ StartReplication(cmd);
+ else
+ elog(ERROR, "cannot handle changeset extraction yet");
+ break;
+ }
+
case T_TimeLineHistoryCmd:
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
}
}
+/*
+ * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
+ */
+static void
+PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
+{
+ bool changed = false;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(lsn != InvalidXLogRecPtr);
+ SpinLockAcquire(&slot->mutex);
+ if (slot->restart_decoding != lsn)
+ {
+ changed = true;
+ slot->restart_decoding = lsn;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (changed)
+ ReplicationSlotsComputeRequiredLSN();
+
+ /*
+ * One could argue that the slot should saved to disk now, but that'd be
+ * energy wasted - the worst lost information can do here is give us wrong
+ * information in a statistics view - we'll just potentially be more
+ * conservative in removing files.
+ */
+}
+
/*
* Regular reply from standby advising of WAL positions on standby server.
*/
if (!am_cascading_walsender)
SyncRepReleaseWaiters();
+
+ /*
+ * Advance our local xmin horizon when the client confirmed a flush.
+ */
+ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+ {
+ if (MyReplicationSlot->database != InvalidOid)
+ elog(ERROR, "cannot handle changeset extraction yet");
+ else
+ PhysicalConfirmReceivedLocation(flushPtr);
+ }
+}
+
+/* compute new replication slot xmin horizon if needed */
+static void
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+{
+ bool changed = false;
+ volatile ReplicationSlot *slot = MyReplicationSlot;
+ SpinLockAcquire(&slot->mutex);
+
+ MyPgXact->xmin = InvalidTransactionId;
+
+ /*
+ * For physical replication we don't need the the interlock provided
+ * by data_xmin and effective_data_xmin since the consequences of a
+ * missed increase aren't bad, so set both at once.
+ */
+ if (!TransactionIdIsNormal(slot->data_xmin) ||
+ !TransactionIdIsNormal(feedbackXmin) ||
+ TransactionIdPrecedes(slot->data_xmin, feedbackXmin))
+ {
+ changed = true;
+ slot->data_xmin = feedbackXmin;
+ slot->effective_data_xmin = feedbackXmin;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (changed)
+ {
+ ReplicationSlotSave();
+ ReplicationSlotsComputeRequiredXmin(false);
+ }
}
/*
if (!TransactionIdIsNormal(feedbackXmin))
{
MyPgXact->xmin = InvalidTransactionId;
+ if (MyReplicationSlot != NULL)
+ PhysicalReplicationSlotNewXmin(feedbackXmin);
return;
}
* GetOldestXmin. (If we're moving our xmin forward, this is obviously
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
+ *
+ * If we're using a replication slot we reserve the xmin via that,
+ * otherwise via the walsender's PGXACT entry.
+
+ * XXX: It might make sense to introduce ephemeral slots and always use
+ * the slot mechanism.
*/
- MyPgXact->xmin = feedbackXmin;
+ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
+ PhysicalReplicationSlotNewXmin(feedbackXmin);
+ else
+ MyPgXact->xmin = feedbackXmin;
}
/* Main loop of walsender process that streams the WAL over Copy messages. */