This allows a transaction abort to avoid killing those workers.
Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
+       /*
+        * Don't allow PREPARE but for transaction that has/might kill logical
+        * replication workers.
+        */
+       if (XactManipulatesLogicalReplicationWorkers())
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
        /* Prevent cancel/die interrupt while cleaning up */
        HOLD_INTERRUPTS();
 
 
 
                        RemoveSubscriptionRel(sub->oid, relid);
 
-                       logicalrep_worker_stop(sub->oid, relid);
+                       logicalrep_worker_stop_at_commit(sub->oid, relid);
 
                        namespace = get_namespace_name(get_rel_namespace(relid));
                        ereport(NOTICE,
        char       *subname;
        char       *conninfo;
        char       *slotname;
+       List       *subworkers;
+       ListCell   *lc;
        char            originname[NAMEDATALEN];
        char       *err = NULL;
        RepOriginId originid;
 
        ReleaseSysCache(tup);
 
+       /*
+        * If we are dropping the replication slot, stop all the subscription
+        * workers immediately, so that the slot becomes accessible.  Otherwise
+        * just schedule the stopping for the end of the transaction.
+        *
+        * New workers won't be started because we hold an exclusive lock on the
+        * subscription till the end of the transaction.
+        */
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+       subworkers = logicalrep_workers_find(subid, false);
+       LWLockRelease(LogicalRepWorkerLock);
+       foreach (lc, subworkers)
+       {
+               LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+               if (slotname)
+                       logicalrep_worker_stop(w->subid, w->relid);
+               else
+                       logicalrep_worker_stop_at_commit(w->subid, w->relid);
+       }
+       list_free(subworkers);
+
        /* Clean up dependencies */
        deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
        /* Remove any associated relation synchronization states. */
        RemoveSubscriptionRel(subid, InvalidOid);
 
-       /* Kill the apply worker so that the slot becomes accessible. */
-       logicalrep_worker_stop(subid, InvalidOid);
-
        /* Remove the origin tracking if exists. */
        snprintf(originname, sizeof(originname), "pg_%u", subid);
        originid = replorigin_by_name(originname, true);
 
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+       Oid     subid;
+       Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
        return res;
 }
 
+/*
+ * Similar to logicalrep_worker_find(), but returns list of all workers for
+ * the subscription, instead just one.
+ */
+List *
+logicalrep_workers_find(Oid subid, bool only_running)
+{
+       int                     i;
+       List       *res = NIL;
+
+       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+       /* Search for attached worker for a given subscription id. */
+       for (i = 0; i < max_logical_replication_workers; i++)
+       {
+               LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+               if (w->in_use && w->subid == subid && (!only_running || w->proc))
+                       res = lappend(res, w);
+       }
+
+       return res;
+}
+
 /*
  * Start new apply background worker.
  */
        LWLockRelease(LogicalRepWorkerLock);
 }
 
+/*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+       LogicalRepWorkerId *wid;
+       MemoryContext           oldctx;
+
+       /* Make sure we store the info in context that survives until commit. */
+       oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+       wid = palloc(sizeof(LogicalRepWorkerId));
+       wid->subid = subid;
+       wid->relid = relid;
+
+       on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+       MemoryContextSwitchTo(oldctx);
+}
+
 /*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
        }
 }
 
+/*
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+       return (on_commit_stop_workers != NIL);
+}
+
 /*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-       if (isCommit && on_commit_launcher_wakeup)
-               ApplyLauncherWakeup();
+       if (isCommit)
+       {
+               ListCell *lc;
 
+               foreach (lc, on_commit_stop_workers)
+               {
+                       LogicalRepWorkerId *wid = lfirst(lc);
+                       logicalrep_worker_stop(wid->subid, wid->relid);
+               }
+
+               if (on_commit_launcher_wakeup)
+                       ApplyLauncherWakeup();
+       }
+
+       /*
+        * No need to pfree on_commit_stop_workers.  It was allocated in
+        * transaction memory context, which is going to be cleaned soon.
+        */
+       on_commit_stop_workers = NIL;
        on_commit_launcher_wakeup = false;
 }
 
 
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
 
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                           bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                                 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);