Fix orphaned origin in shared memory after DROP SUBSCRIPTION REL_15_STABLE github/REL_15_STABLE
authorMichael Paquier <michael@paquier.xyz>
Tue, 23 Dec 2025 05:32:24 +0000 (14:32 +0900)
committerMichael Paquier <michael@paquier.xyz>
Tue, 23 Dec 2025 05:32:24 +0000 (14:32 +0900)
Since ce0fdbfe9722, a replication slot and an origin are created by each
tablesync worker, whose information is stored in both a catalog and
shared memory (once the origin is set up in the latter case).  The
transaction where the origin is created is the same as the one that runs
the initial COPY, with the catalog state of the origin becoming visible
for other sessions only once the COPY transaction has committed.  The
catalog state is coupled with a state in shared memory, initialized at
the same time as the origin created in the catalogs.  Note that the
transaction doing the initial data sync can take a long time, time that
depends on the amount of data to transfer from a publication node to its
subscriber node.

Now, when a DROP SUBSCRIPTION is executed, all its workers are stopped
with the origins removed.  The removal of each origin relies on a
catalog lookup.  A worker still running the initial COPY would fail its
transaction, with the catalog state of the origin rolled back while the
shared memory state remains around.  The session running the DROP
SUBSCRIPTION should be in charge of cleaning up the catalog and the
shared memory state, but as there is no data in the catalogs the shared
memory state is not removed.  This issue would leave orphaned origin
data in shared memory, leading to a confusing state as it would still
show up in pg_replication_origin_status.  Note that this shared memory
data is sticky, being flushed on disk in replorigin_checkpoint at
checkpoint.  This prevents other origins from reusing a slot position
in the shared memory data.

To address this problem, the commit moves the creation of the origin at
the end of the transaction that precedes the one executing the initial
COPY, making the origin immediately visible in the catalogs for other
sessions, giving DROP SUBSCRIPTION a way to know about it.  A different
solution would have been to clean up the shared memory state using an
abort callback within the tablesync worker.  The solution of this commit
is more consistent with the apply worker that creates an origin in a
short transaction.

A test is added in the subscription test 004_sync.pl, which was able to
display the problem.  The test fails when this commit is reverted.

Reported-by: Tenglong Gu <brucegu@amazon.com>
Reported-by: Daisuke Higuchi <higudai@amazon.com>
Analyzed-by: Michael Paquier <michael@paquier.xyz>
Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/aUTekQTg4OYnw-Co@paquier.xyz
Backpatch-through: 14

src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/test/subscription/t/004_sync.pl

index cb9867c96d49d1207f0e702ae8b8593a67361291..27cbb5870770236b9e76a35d4dc1da177e3e3dfd 100644 (file)
@@ -901,7 +901,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                     *
                     * It is possible that the origin is not yet created for
                     * tablesync worker, this can happen for the states before
-                    * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+                    * SUBREL_STATE_DATASYNC. The apply worker can also
                     * concurrently try to drop the origin and by this time
                     * the origin might be already removed. For these reasons,
                     * passing missing_ok = true.
@@ -1478,7 +1478,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
         *
         * It is possible that the origin is not yet created for tablesync
         * worker so passing missing_ok = true. This can happen for the states
-        * before SUBREL_STATE_FINISHEDCOPY.
+        * before SUBREL_STATE_DATASYNC.
         */
        ReplicationOriginNameForTablesync(subid, relid, originname,
                                          sizeof(originname));
index dc0526e2da25cd42cf830a2d937cac6d5f064c6a..eb02cdd47505673666cbb0e6239b06be61b18da7 100644 (file)
@@ -1326,12 +1326,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
    SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-   /* Update the state and make it visible to others. */
+   /*
+    * Update the state, create the replication origin, and make them visible
+    * to others.
+    */
    StartTransactionCommand();
    UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               MyLogicalRepWorker->relid,
                               MyLogicalRepWorker->relstate,
                               MyLogicalRepWorker->relstate_lsn);
+
+   /*
+    * Create the replication origin in a separate transaction from the one
+    * that sets up the origin in shared memory. This prevents the risk that
+    * changes to the origin in shared memory cannot be rolled back if the
+    * transaction aborts.
+    */
+   originid = replorigin_by_name(originname, true);
+   if (!OidIsValid(originid))
+       originid = replorigin_create(originname);
+
    CommitTransactionCommand();
    pgstat_report_stat(true);
 
@@ -1395,37 +1409,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                       CRS_USE_SNAPSHOT, origin_startpos);
 
    /*
-    * Setup replication origin tracking. The purpose of doing this before the
-    * copy is to avoid doing the copy again due to any error in setting up
-    * origin tracking.
+    * Advance the origin to the LSN got from walrcv_create_slot and then set
+    * up the origin. The advancement is WAL logged for the purpose of
+    * recovery. Locks are to prevent the replication origin from vanishing
+    * while advancing.
+    *
+    * The purpose of doing these before the copy is to avoid doing the copy
+    * again due to any error in advancing or setting up origin tracking.
     */
-   originid = replorigin_by_name(originname, true);
-   if (!OidIsValid(originid))
-   {
-       /*
-        * Origin tracking does not exist, so create it now.
-        *
-        * Then advance to the LSN got from walrcv_create_slot. This is WAL
-        * logged for the purpose of recovery. Locks are to prevent the
-        * replication origin from vanishing while advancing.
-        */
-       originid = replorigin_create(originname);
-
-       LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
-       replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
-                          true /* go backward */ , true /* WAL log */ );
-       UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+   LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+   replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+                      true /* go backward */ , true /* WAL log */ );
+   UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
-       replorigin_session_setup(originid);
-       replorigin_session_origin = originid;
-   }
-   else
-   {
-       ereport(ERROR,
-               (errcode(ERRCODE_DUPLICATE_OBJECT),
-                errmsg("replication origin \"%s\" already exists",
-                       originname)));
-   }
+   replorigin_session_setup(originid);
+   replorigin_session_origin = originid;
 
    /* Now do the initial data copy */
    PushActiveSnapshot(GetTransactionSnapshot());
index 6251c07b7888aa09a774b9dd61aa539adf87e1a7..fb692c1177c222d5f532b7e7447f814a14620521 100644 (file)
@@ -172,6 +172,12 @@ ok( $node_publisher->poll_query_until(
        'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
    'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
 
+# After dropping the subscription, all replication origins, whether created by
+# an apply worker or table sync worker, should have been cleaned up.
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT count(*) FROM pg_replication_origin_status");
+is($result, qq(0), 'all replication origins have been cleaned up');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');