*
* It is possible that the origin is not yet created for
* tablesync worker, this can happen for the states before
- * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * SUBREL_STATE_DATASYNC. The apply worker can also
* concurrently try to drop the origin and by this time
* the origin might be already removed. For these reasons,
* passing missing_ok = true.
*
* It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true. This can happen for the states
- * before SUBREL_STATE_FINISHEDCOPY.
+ * before SUBREL_STATE_DATASYNC.
*/
ReplicationOriginNameForTablesync(subid, relid, originname,
sizeof(originname));
MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- /* Update the state and make it visible to others. */
+ /*
+ * Update the state, create the replication origin, and make them visible
+ * to others.
+ */
StartTransactionCommand();
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
+
+ /*
+ * Create the replication origin in a separate transaction from the one
+ * that sets up the origin in shared memory. This prevents the risk that
+ * changes to the origin in shared memory cannot be rolled back if the
+ * transaction aborts.
+ */
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+
CommitTransactionCommand();
pgstat_report_stat(true);
CRS_USE_SNAPSHOT, origin_startpos);
/*
- * Setup replication origin tracking. The purpose of doing this before the
- * copy is to avoid doing the copy again due to any error in setting up
- * origin tracking.
+ * Advance the origin to the LSN got from walrcv_create_slot and then set
+ * up the origin. The advancement is WAL logged for the purpose of
+ * recovery. Locks are to prevent the replication origin from vanishing
+ * while advancing.
+ *
+ * The purpose of doing these before the copy is to avoid doing the copy
+ * again due to any error in advancing or setting up origin tracking.
*/
- originid = replorigin_by_name(originname, true);
- if (!OidIsValid(originid))
- {
- /*
- * Origin tracking does not exist, so create it now.
- *
- * Then advance to the LSN got from walrcv_create_slot. This is WAL
- * logged for the purpose of recovery. Locks are to prevent the
- * replication origin from vanishing while advancing.
- */
- originid = replorigin_create(originname);
-
- LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
- replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
- true /* go backward */ , true /* WAL log */ );
- UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+ true /* go backward */ , true /* WAL log */ );
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
- replorigin_session_setup(originid);
- replorigin_session_origin = originid;
- }
- else
- {
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("replication origin \"%s\" already exists",
- originname)));
- }
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
+# After dropping the subscription, all replication origins, whether created by
+# an apply worker or table sync worker, should have been cleaned up.
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin_status");
+is($result, qq(0), 'all replication origins have been cleaned up');
+
$node_subscriber->stop('fast');
$node_publisher->stop('fast');