static void
bdr_catchup_to_lsn_cleanup(int code, Datum offset)
{
- int worker_shmem_idx = DatumGetInt32(offset);
+ uint32 worker_shmem_idx = DatumGetInt32(offset);
- /* Clear the worker's shared memory struct now we're done with it */
+ /*
+ * Clear the worker's shared memory struct now we're done with it.
+ *
+ * There's no need to unregister the worker as it was registered with
+ * BGW_NEVER_RESTART.
+ */
bdr_worker_shmem_release(&BdrWorkerCtl->slots[worker_shmem_idx], NULL);
}
bdr_catchup_to_lsn(int cfg_index,
XLogRecPtr target_lsn)
{
- int worker_shmem_idx;
- pid_t bgw_pid;
- BdrApplyWorker *catchup_worker;
- BackgroundWorker bgw;
- BackgroundWorkerHandle *bgw_handle;
- BgwHandleStatus bgw_status;
+ uint32 worker_shmem_idx;
+ BdrWorker *worker;
BdrConnectionConfig *cfg;
cfg = bdr_connection_configs[cfg_index];
(uint32)(target_lsn>>32), (uint32)target_lsn);
/* Create the shmem entry for the catchup worker */
- LWLockAcquire(BdrWorkerCtl->lock, LW_SHARED);
- for (worker_shmem_idx = 0; worker_shmem_idx < bdr_max_workers; worker_shmem_idx++)
- {
- BdrWorker *worker = &BdrWorkerCtl->slots[worker_shmem_idx];
- if (worker->worker_type == BDR_WORKER_EMPTY_SLOT)
- break;
- }
- if (worker_shmem_idx == bdr_max_workers)
- {
- LWLockRelease(BdrWorkerCtl->lock);
- ereport(ERROR,
- (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("No free bdr worker slots, bdr_max_workers=%d too low",
- bdr_max_workers)));
- }
- BdrWorkerCtl->slots[worker_shmem_idx].worker_type = BDR_WORKER_APPLY;
- catchup_worker = &BdrWorkerCtl->slots[worker_shmem_idx].worker_data.apply_worker;
- LWLockRelease(BdrWorkerCtl->lock);
+ worker = bdr_worker_shmem_alloc(BDR_WORKER_APPLY, &worker_shmem_idx);
/*
* Launch the catchup worker, ensuring that we free the shmem slot for the
PG_ENSURE_ERROR_CLEANUP(bdr_catchup_to_lsn_cleanup,
Int32GetDatum(worker_shmem_idx));
{
+ BgwHandleStatus bgw_status;
+ BackgroundWorker bgw;
+ BackgroundWorkerHandle *bgw_handle;
+ pid_t bgw_pid;
pid_t prev_bgw_pid = 0;
+ BdrApplyWorker *catchup_worker = &worker->worker_data.apply_worker;
/* Make sure the catchup worker can find its bdr.xxx_ GUCs */
catchup_worker->connection_config_idx = cfg_index;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
/* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
bgw.bgw_main = bdr_apply_main;
- /*
- * Would prefer this to be BGW_NEVER_RESTART but it's not honoured for
- * exit 0 anyway. Set a small delay to give us time to unregister the
- * worker after it exits, before the replacement is started. See below
- * in the switch handling the exit cases for more detail.
- */
- bgw.bgw_restart_time = 5;
+
+ bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProc->pid;
bgw.bgw_main_arg = Int32GetDatum(worker_shmem_idx);
proc_exit(1);
break;
case BGWH_STOPPED:
+ TerminateBackgroundWorker(bgw_handle);
+ break;
case BGWH_NOT_YET_STARTED:
case BGWH_STARTED:
- /*
- * You'd think we'd only get here in the STOPPED case, but Pg
- * restarts our bgworker even if we set BGW_NEVER_RESTART if we
- * exit 0.
- *
- * If the pid changes, we know the worker exited, even if it's
- * reported as running. So we make sure we always terminate it.
- */
- TerminateBackgroundWorker(bgw_handle);
+ /* Should be unreachable */
+ elog(ERROR, "Unreachable case, bgw status %d", bgw_status);
break;
}
pfree(bgw_handle);