Oid BdrNodesRelid;
Oid BdrConflictHistoryRelId;
BdrConnectionConfig **bdr_connection_configs;
+/* All databases for which BDR is configured, valid after _PG_init */
+char **bdr_distinct_dbnames;
+uint32 bdr_distinct_dbnames_count = 0;
/* GUC storage */
static char *connections = NULL;
List *apply_workers;
ListCell *c;
BdrPerdbWorker *bdr_perdb_worker;
+ BdrWorker *bdr_worker_slot;
Assert(IsBackgroundWorker);
- /* FIXME: won't work with EXEC_BACKEND, change to index into shm array */
- bdr_perdb_worker = (BdrPerdbWorker *) DatumGetPointer(main_arg);
+ bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
+ Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
+ bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker;
bdr_worker_init(NameStr(bdr_perdb_worker->dbname));
static void
bdr_worker_shmem_create_workers(void)
{
- int off;
+ uint32 off;
+
+ /*
+ * Create a BdrPerdbWorker for each distinct database found during
+ * _PG_init. The bgworker for each has already been registered and assigned
+ * a slot position during _PG_init, but the slot doesn't have anything
+ * useful in it yet.
+ *
+ * Because these slots are pre-assigned before shmem is bought up they
+ * MUST be reserved first, before any shmem entries are allocated, so
+ * they get the first slots.
+ *
+ * When started, this worker will continue setup - doing any required
+ * initialization of the database, then registering dynamic bgworkers for
+ * the DB's individual BDR connections.
+ *
+ * If we ever want to support dynamically adding/removing DBs from BDR at
+ * runtime, this'll need to move into a static bgworker because dynamic
+ * bgworkers can't be launched directly from the postmaster. We'll need a
+ * "bdr manager" static bgworker.
+ */
+
+ for (off = 0; off < bdr_distinct_dbnames_count; off++)
+ {
+ BdrWorker *shmworker;
+ BdrPerdbWorker *perdb;
+ uint32 ctl_idx;
+
+ shmworker = (BdrWorker *) bdr_worker_shmem_alloc(BDR_WORKER_PERDB, &ctl_idx);
+ Assert(shmworker->worker_type == BDR_WORKER_PERDB);
+ /*
+ * The workers have already been assigned shmem indexes during
+ * _PG_init, so they MUST get the same index here. So long as these
+ * entries are assigned before any other shmem slots they will.
+ */
+ Assert(ctl_idx == off);
+ perdb = &shmworker->worker_data.perdb_worker;
+
+ strncpy(NameStr(perdb->dbname), bdr_distinct_dbnames[off], NAMEDATALEN);
+ NameStr(perdb->dbname)[NAMEDATALEN-1] = '\0';
+
+ perdb->seq_slot = off;
+
+ elog(DEBUG1, "Assigning shmem bdr database worker for db %s",
+ NameStr(perdb->dbname));
+ }
/*
* Create a BdrApplyWorker for each valid BdrConnectionConfig found during
if (cfg == NULL || !cfg->is_valid)
continue;
- shmworker = (BdrWorker *) bdr_worker_shmem_alloc(BDR_WORKER_APPLY);
+ shmworker = (BdrWorker *) bdr_worker_shmem_alloc(BDR_WORKER_APPLY, NULL);
Assert(shmworker->worker_type == BDR_WORKER_APPLY);
worker = &shmworker->worker_data.apply_worker;
worker->connection_config_idx = off;
*
* The block is zeroed. The worker type is set in the header.
*
+ * ctl_idx, if passed, is set to the index of the worker within BdrWorkerCtl.
+ *
* To release a block, use bdr_worker_shmem_release(...)
*/
BdrWorker*
-bdr_worker_shmem_alloc(BdrWorkerType worker_type)
+bdr_worker_shmem_alloc(BdrWorkerType worker_type, uint32 *ctl_idx)
{
- int i;
+ uint32 i;
LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
for (i = 0; i < bdr_max_workers; i++)
{
memset(new_entry, 0, sizeof(BdrWorker));
new_entry->worker_type = worker_type;
LWLockRelease(BdrWorkerCtl->lock);
+ if (ctl_idx)
+ *ctl_idx = i;
return new_entry;
}
}
void
_PG_init(void)
{
- BackgroundWorker perdb_worker;
List *connames;
ListCell *c;
MemoryContext old_context;
- size_t off;
char *connections_tmp;
char **used_databases;
char **database_initcons;
Size num_used_databases = 0;
int connection_config_idx;
+ BackgroundWorker bgw;
+ uint32 off;
if (!process_shared_preload_libraries_in_progress)
ereport(ERROR,
pfree(database_initcons);
/*
- * We now need to register one static bgworker per database. When started,
- * this worker will continue setup - doing any required initialization of
- * the database, then registering dynamic bgworkers for the DB's individual
- * BDR connections.
- *
- * These workers get started *after* the shm init callback is run, we're
- * just registering them for launch.
- *
- * If we ever want to support dynamically adding/removing DBs from BDR at
- * runtime, this'll need to move into a static bgworker or code called by
- * the shm startup hook and a guc reload hook.
- *
- * TODO: Move this into the shared memory based init.
+ * Copy the list of used databases into a global where we can
+ * use it for registering the per-database workers during shmem init.
+ */
+ bdr_distinct_dbnames = palloc(sizeof(char*)*num_used_databases);
+ memcpy(bdr_distinct_dbnames, used_databases,
+ sizeof(char*)*num_used_databases);
+ bdr_distinct_dbnames_count = num_used_databases;
+ pfree(used_databases);
+ num_used_databases = 0;
+ used_databases = NULL;
+
+ /*
+ * Register the per-db workers and assign them an index in shmem. The
+ * memory doesn't actually exist yet, it'll be allocated in shmem init.
*/
- perdb_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
- perdb_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
/* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
- perdb_worker.bgw_main = bdr_perdb_worker_main;
- perdb_worker.bgw_restart_time = 5;
- perdb_worker.bgw_notify_pid = 0;
-
- for (off = 0; off < num_used_databases; off++)
+ bgw.bgw_main = bdr_perdb_worker_main;
+ bgw.bgw_restart_time = 5;
+ bgw.bgw_notify_pid = 0;
+ for (off = 0; off < bdr_distinct_dbnames_count; off++)
{
- /* Start a worker for this db */
- BdrPerdbWorker *con = palloc(sizeof(BdrPerdbWorker));
-
- strncpy(NameStr(con->dbname), used_databases[off], NAMEDATALEN);
- NameStr(con->dbname)[NAMEDATALEN-1] = '\0';
-
- con->seq_slot = off;
-
- elog(DEBUG1, "starting bdr database worker for db %s", NameStr(con->dbname));
- snprintf(perdb_worker.bgw_name, BGW_MAXLEN,
- "bdr: %s", NameStr(con->dbname));
- perdb_worker.bgw_main_arg = PointerGetDatum(con);
- RegisterBackgroundWorker(&perdb_worker);
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "bdr: %s", bdr_distinct_dbnames[off]);
+ /*
+ * This index into BdrWorkerCtl shmem hasn't been populated yet. It'll
+ * be set up in bdr_worker_shmem_create_workers .
+ */
+ bgw.bgw_main_arg = Int32GetDatum(off);
+ RegisterBackgroundWorker(&bgw);
}
EmitWarningsOnPlaceholders("bdr");
- for (off = 0; off < num_used_databases; off++)
- pfree(used_databases[off]);
- pfree(used_databases);
-
pfree(connections_tmp);
out: