/*-------------------------------------------------------------------------
  * tablesync.c
- *   PostgreSQL logical replication
+ *   PostgreSQL logical replication: initial table data synchronization
  *
  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
  *
  *    - It allows us to synchronize any tables added after the initial
  *      synchronization has finished.
  *
- *   The stream position synchronization works in multiple steps.
- *    - Sync finishes copy and sets worker state as SYNCWAIT and waits for
- *      state to change in a loop.
- *    - Apply periodically checks tables that are synchronizing for SYNCWAIT.
- *      When the desired state appears, it will set the worker state to
- *      CATCHUP and starts loop-waiting until either the table state is set
- *      to SYNCDONE or the sync worker exits.
+ *   The stream position synchronization works in multiple steps:
+ *    - Apply worker requests a tablesync worker to start, setting the new
+ *      table state to INIT.
+ *    - Tablesync worker starts; changes table state from INIT to DATASYNC while
+ *      copying.
+ *    - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
+ *      waits for state change.
+ *    - Apply worker periodically checks for tables in SYNCWAIT state.  When
+ *      any appear, it sets the table state to CATCHUP and starts loop-waiting
+ *      until either the table state is set to SYNCDONE or the sync worker
+ *      exits.
  *    - After the sync worker has seen the state change to CATCHUP, it will
  *      read the stream and apply changes (acting like an apply worker) until
  *      it catches up to the specified stream position.  Then it sets the
  *      state to SYNCDONE.  There might be zero changes applied between
  *      CATCHUP and SYNCDONE, because the sync worker might be ahead of the
  *      apply worker.
- *    - Once the state was set to SYNCDONE, the apply will continue tracking
+ *    - Once the state is set to SYNCDONE, the apply will continue tracking
  *      the table until it reaches the SYNCDONE stream position, at which
  *      point it sets state to READY and stops tracking.  Again, there might
  *      be zero changes in between.
  *
- *   So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
- *   SYNCDONE -> READY.
+ *   So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
+ *   CATCHUP -> SYNCDONE -> READY.
  *
  *   The catalog pg_subscription_rel is used to keep information about
  *   subscribed tables and their state.  Some transient state during data
  *         -> continue rep
  *       apply:11
  *         -> set in catalog READY
- *    - Sync in front:
+ *
+ *    - Sync is in front:
  *       sync:10
  *         -> set in memory SYNCWAIT
  *       apply:8
 }
 
 /*
- * Wait until the relation synchronization state is set in the catalog to the
- * expected one.
+ * Wait until the relation sync state is set in the catalog to the expected
+ * one; return true when it happens.
  *
- * Used when transitioning from CATCHUP state to SYNCDONE.
+ * Returns false if the table sync worker or the table itself have
+ * disappeared, or the table state has been reset.
  *
- * Returns false if the synchronization worker has disappeared or the table state
- * has been reset.
+ * Currently, this is used in the apply worker when transitioning from
+ * CATCHUP state to SYNCDONE.
  */
 static bool
 wait_for_relation_state_change(Oid relid, char expected_state)
 
        CHECK_FOR_INTERRUPTS();
 
-       /* XXX use cache invalidation here to improve performance? */
-       PushActiveSnapshot(GetLatestSnapshot());
+       InvalidateCatalogSnapshot();
        state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                       relid, &statelsn, true);
-       PopActiveSnapshot();
+                                       relid, &statelsn);
 
        if (state == SUBREL_STATE_UNKNOWN)
-           return false;
+           break;
 
        if (state == expected_state)
            return true;
 
        /* Check if the sync worker is still running and bail if not. */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-       /* Check if the opposite worker is still running and bail if not. */
-       worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                       am_tablesync_worker() ? InvalidOid : relid,
+       worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
                                        false);
        LWLockRelease(LogicalRepWorkerLock);
        if (!worker)
-           return false;
+           break;
 
        (void) WaitLatch(MyLatch,
                         WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 /*
  * Start syncing the table in the sync worker.
  *
+ * If nothing needs to be done to sync the table, we exit the worker without
+ * any further action.
+ *
  * The returned slot name is palloc'ed in current memory context.
  */
 char *
    char       *err;
    char        relstate;
    XLogRecPtr  relstate_lsn;
+   Relation    rel;
+   WalRcvExecResult *res;
 
    /* Check the state of the table synchronization. */
    StartTransactionCommand();
    relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                       MyLogicalRepWorker->relid,
-                                      &relstate_lsn, true);
+                                      &relstate_lsn);
    CommitTransactionCommand();
 
    SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+   /*
+    * If synchronization is already done or no longer necessary, exit now
+    * that we've updated shared memory state.
+    */
+   switch (relstate)
+   {
+       case SUBREL_STATE_SYNCDONE:
+       case SUBREL_STATE_READY:
+       case SUBREL_STATE_UNKNOWN:
+           finish_sync_worker();   /* doesn't return */
+   }
+
    /*
     * To build a slot name for the sync work, we are limited to NAMEDATALEN -
     * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
        ereport(ERROR,
                (errmsg("could not connect to the publisher: %s", err)));
 
-   switch (MyLogicalRepWorker->relstate)
-   {
-       case SUBREL_STATE_INIT:
-       case SUBREL_STATE_DATASYNC:
-           {
-               Relation    rel;
-               WalRcvExecResult *res;
+   Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
+          MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
 
-               SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-               MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
-               MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
-               SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-               /* Update the state and make it visible to others. */
-               StartTransactionCommand();
-               UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-                                          MyLogicalRepWorker->relid,
-                                          MyLogicalRepWorker->relstate,
-                                          MyLogicalRepWorker->relstate_lsn);
-               CommitTransactionCommand();
-               pgstat_report_stat(false);
+   SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+   MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+   MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+   SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-               /*
-                * We want to do the table data sync in a single transaction.
-                */
-               StartTransactionCommand();
+   /* Update the state and make it visible to others. */
+   StartTransactionCommand();
+   UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                              MyLogicalRepWorker->relid,
+                              MyLogicalRepWorker->relstate,
+                              MyLogicalRepWorker->relstate_lsn);
+   CommitTransactionCommand();
+   pgstat_report_stat(false);
 
-               /*
-                * Use a standard write lock here. It might be better to
-                * disallow access to the table while it's being synchronized.
-                * But we don't want to block the main apply process from
-                * working and it has to open the relation in RowExclusiveLock
-                * when remapping remote relation id to local one.
-                */
-               rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+   /*
+    * We want to do the table data sync in a single transaction.
+    */
+   StartTransactionCommand();
 
-               /*
-                * Create a temporary slot for the sync process. We do this
-                * inside the transaction so that we can use the snapshot made
-                * by the slot to get existing data.
-                */
-               res = walrcv_exec(wrconn,
-                                 "BEGIN READ ONLY ISOLATION LEVEL "
-                                 "REPEATABLE READ", 0, NULL);
-               if (res->status != WALRCV_OK_COMMAND)
-                   ereport(ERROR,
-                           (errmsg("table copy could not start transaction on publisher"),
-                            errdetail("The error was: %s", res->err)));
-               walrcv_clear_result(res);
+   /*
+    * Use a standard write lock here. It might be better to disallow access
+    * to the table while it's being synchronized. But we don't want to block
+    * the main apply process from working and it has to open the relation in
+    * RowExclusiveLock when remapping remote relation id to local one.
+    */
+   rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 
-               /*
-                * Create new temporary logical decoding slot.
-                *
-                * We'll use slot for data copy so make sure the snapshot is
-                * used for the transaction; that way the COPY will get data
-                * that is consistent with the lsn used by the slot to start
-                * decoding.
-                */
-               walrcv_create_slot(wrconn, slotname, true,
-                                  CRS_USE_SNAPSHOT, origin_startpos);
+   /*
+    * Start a transaction in the remote node in REPEATABLE READ mode.  This
+    * ensures that both the replication slot we create (see below) and the
+    * COPY are consistent with each other.
+    */
+   res = walrcv_exec(wrconn,
+                     "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
+                     0, NULL);
+   if (res->status != WALRCV_OK_COMMAND)
+       ereport(ERROR,
+               (errmsg("table copy could not start transaction on publisher"),
+                errdetail("The error was: %s", res->err)));
+   walrcv_clear_result(res);
 
-               PushActiveSnapshot(GetTransactionSnapshot());
-               copy_table(rel);
-               PopActiveSnapshot();
+   /*
+    * Create a new temporary logical decoding slot.  This slot will be used
+    * for the catchup phase after COPY is done, so tell it to use the
+    * snapshot to make the final data consistent.
+    */
+   walrcv_create_slot(wrconn, slotname, true,
+                      CRS_USE_SNAPSHOT, origin_startpos);
 
-               res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-               if (res->status != WALRCV_OK_COMMAND)
-                   ereport(ERROR,
-                           (errmsg("table copy could not finish transaction on publisher"),
-                            errdetail("The error was: %s", res->err)));
-               walrcv_clear_result(res);
+   /* Now do the initial data copy */
+   PushActiveSnapshot(GetTransactionSnapshot());
+   copy_table(rel);
+   PopActiveSnapshot();
 
-               table_close(rel, NoLock);
+   res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+   if (res->status != WALRCV_OK_COMMAND)
+       ereport(ERROR,
+               (errmsg("table copy could not finish transaction on publisher"),
+                errdetail("The error was: %s", res->err)));
+   walrcv_clear_result(res);
 
-               /* Make the copy visible. */
-               CommandCounterIncrement();
+   table_close(rel, NoLock);
 
-               /*
-                * We are done with the initial data synchronization, update
-                * the state.
-                */
-               SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-               MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
-               MyLogicalRepWorker->relstate_lsn = *origin_startpos;
-               SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-               /* Wait for main apply worker to tell us to catchup. */
-               wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
-
-               /*----------
-                * There are now two possible states here:
-                * a) Sync is behind the apply.  If that's the case we need to
-                *    catch up with it by consuming the logical replication
-                *    stream up to the relstate_lsn.  For that, we exit this
-                *    function and continue in ApplyWorkerMain().
-                * b) Sync is caught up with the apply.  So it can just set
-                *    the state to SYNCDONE and finish.
-                *----------
-                */
-               if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
-               {
-                   /*
-                    * Update the new state in catalog.  No need to bother
-                    * with the shmem state as we are exiting for good.
-                    */
-                   UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-                                              MyLogicalRepWorker->relid,
-                                              SUBREL_STATE_SYNCDONE,
-                                              *origin_startpos);
-                   finish_sync_worker();
-               }
-               break;
-           }
-       case SUBREL_STATE_SYNCDONE:
-       case SUBREL_STATE_READY:
-       case SUBREL_STATE_UNKNOWN:
+   /* Make the copy visible. */
+   CommandCounterIncrement();
 
-           /*
-            * Nothing to do here but finish.  (UNKNOWN means the relation was
-            * removed from pg_subscription_rel before the sync worker could
-            * start.)
-            */
-           finish_sync_worker();
-           break;
-       default:
-           elog(ERROR, "unknown relation state \"%c\"",
-                MyLogicalRepWorker->relstate);
-   }
+   /*
+    * We are done with the initial data synchronization, update the state.
+    */
+   SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+   MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
+   MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+   SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+   /*
+    * Finally, wait until the main apply worker tells us to catch up and then
+    * return to let LogicalRepApplyLoop do it.
+    */
+   wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
    return slotname;
 }