bdr: implement per-db worker in shmem
authorCraig Ringer <craig@2ndquadrant.com>
Tue, 13 May 2014 01:45:52 +0000 (09:45 +0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:34 +0000 (17:55 +0200)
contrib/bdr/bdr.c
contrib/bdr/bdr.h

index 8db929a6486062d9eb8b3773b9a59702f9f293cd..82a20f4c2cad84bca6ca8ffa43ab5e150891a327 100644 (file)
@@ -67,6 +67,9 @@ static bool bdr_is_restart = false;
 Oid   BdrNodesRelid;
 Oid   BdrConflictHistoryRelId;
 BdrConnectionConfig  **bdr_connection_configs;
+/* All databases for which BDR is configured, valid after _PG_init */
+char **bdr_distinct_dbnames;
+uint32 bdr_distinct_dbnames_count = 0;
 
 /* GUC storage */
 static char *connections = NULL;
@@ -1018,11 +1021,13 @@ bdr_perdb_worker_main(Datum main_arg)
    List             *apply_workers;
    ListCell         *c;
    BdrPerdbWorker   *bdr_perdb_worker;
+   BdrWorker        *bdr_worker_slot;
 
    Assert(IsBackgroundWorker);
 
-   /* FIXME: won't work with EXEC_BACKEND, change to index into shm array */
-   bdr_perdb_worker = (BdrPerdbWorker *) DatumGetPointer(main_arg);
+   bdr_worker_slot = &BdrWorkerCtl->slots[ DatumGetInt32(main_arg) ];
+   Assert(bdr_worker_slot->worker_type == BDR_WORKER_PERDB);
+   bdr_perdb_worker = &bdr_worker_slot->worker_data.perdb_worker;
 
    bdr_worker_init(NameStr(bdr_perdb_worker->dbname));
 
@@ -1188,7 +1193,52 @@ bdr_worker_shmem_startup(void)
 static void
 bdr_worker_shmem_create_workers(void)
 {
-   int off;
+   uint32 off;
+
+   /*
+    * Create a BdrPerdbWorker for each distinct database found during
+    * _PG_init. The bgworker for each has already been registered and assigned
+    * a slot position during _PG_init, but the slot doesn't have anything
+    * useful in it yet.
+    *
+    * Because these slots are pre-assigned before shmem is bought up they
+    * MUST be reserved first, before any shmem entries are allocated, so
+    * they get the first slots.
+    *
+    * When started, this worker will continue setup - doing any required
+    * initialization of the database, then registering dynamic bgworkers for
+    * the DB's individual BDR connections.
+    *
+    * If we ever want to support dynamically adding/removing DBs from BDR at
+    * runtime, this'll need to move into a static bgworker because dynamic
+    * bgworkers can't be launched directly from the postmaster. We'll need a
+    * "bdr manager" static bgworker.
+    */
+
+   for (off = 0; off < bdr_distinct_dbnames_count; off++)
+   {
+       BdrWorker      *shmworker;
+       BdrPerdbWorker *perdb;
+       uint32      ctl_idx;
+
+       shmworker = (BdrWorker *) bdr_worker_shmem_alloc(BDR_WORKER_PERDB, &ctl_idx);
+       Assert(shmworker->worker_type == BDR_WORKER_PERDB);
+       /*
+        * The workers have already been assigned shmem indexes during
+        * _PG_init, so they MUST get the same index here. So long as these
+        * entries are assigned before any other shmem slots they will.
+        */
+       Assert(ctl_idx == off);
+       perdb = &shmworker->worker_data.perdb_worker;
+
+       strncpy(NameStr(perdb->dbname), bdr_distinct_dbnames[off], NAMEDATALEN);
+       NameStr(perdb->dbname)[NAMEDATALEN-1] = '\0';
+
+       perdb->seq_slot = off;
+
+       elog(DEBUG1, "Assigning shmem bdr database worker for db %s",
+            NameStr(perdb->dbname));
+   }
 
    /*
     * Create a BdrApplyWorker for each valid BdrConnectionConfig found during
@@ -1209,7 +1259,7 @@ bdr_worker_shmem_create_workers(void)
        if (cfg == NULL || !cfg->is_valid)
            continue;
 
-       shmworker = (BdrWorker *) bdr_worker_shmem_alloc(BDR_WORKER_APPLY);
+       shmworker = (BdrWorker *) bdr_worker_shmem_alloc(BDR_WORKER_APPLY, NULL);
        Assert(shmworker->worker_type == BDR_WORKER_APPLY);
        worker = &shmworker->worker_data.apply_worker;
        worker->connection_config_idx = off;
@@ -1231,12 +1281,14 @@ bdr_worker_shmem_create_workers(void)
  *
  * The block is zeroed. The worker type is set in the header.
  *
+ * ctl_idx, if passed, is set to the index of the worker within BdrWorkerCtl.
+ *
  * To release a block, use bdr_worker_shmem_release(...)
  */
 BdrWorker*
-bdr_worker_shmem_alloc(BdrWorkerType worker_type)
+bdr_worker_shmem_alloc(BdrWorkerType worker_type, uint32 *ctl_idx)
 {
-   int i;
+   uint32 i;
    LWLockAcquire(BdrWorkerCtl->lock, LW_EXCLUSIVE);
    for (i = 0; i < bdr_max_workers; i++)
    {
@@ -1246,6 +1298,8 @@ bdr_worker_shmem_alloc(BdrWorkerType worker_type)
            memset(new_entry, 0, sizeof(BdrWorker));
            new_entry->worker_type = worker_type;
            LWLockRelease(BdrWorkerCtl->lock);
+           if (ctl_idx)
+               *ctl_idx = i;
            return new_entry;
        }
    }
@@ -1306,17 +1360,17 @@ bdr_worker_shmem_release(BdrWorker* worker, BackgroundWorkerHandle *handle)
 void
 _PG_init(void)
 {
-   BackgroundWorker perdb_worker;
    List       *connames;
    ListCell   *c;
    MemoryContext old_context;
-   size_t      off;
    char       *connections_tmp;
 
    char      **used_databases;
    char      **database_initcons;
    Size        num_used_databases = 0;
    int         connection_config_idx;
+   BackgroundWorker bgw;
+   uint32      off;
 
    if (!process_shared_preload_libraries_in_progress)
        ereport(ERROR,
@@ -1493,51 +1547,42 @@ _PG_init(void)
    pfree(database_initcons);
 
    /*
-    * We now need to register one static bgworker per database.  When started,
-    * this worker will continue setup - doing any required initialization of
-    * the database, then registering dynamic bgworkers for the DB's individual
-    * BDR connections.
-    *
-    * These workers get started *after* the shm init callback is run, we're
-    * just registering them for launch.
-    *
-    * If we ever want to support dynamically adding/removing DBs from BDR at
-    * runtime, this'll need to move into a static bgworker or code called by
-    * the shm startup hook and a guc reload hook.
-    *
-    * TODO: Move this into the shared memory based init.
+    * Copy the list of used databases into a global where we can
+    * use it for registering the per-database workers during shmem init.
+    */
+   bdr_distinct_dbnames = palloc(sizeof(char*)*num_used_databases);
+   memcpy(bdr_distinct_dbnames, used_databases,
+          sizeof(char*)*num_used_databases);
+   bdr_distinct_dbnames_count = num_used_databases;
+   pfree(used_databases);
+   num_used_databases = 0;
+   used_databases = NULL;
+
+   /*
+    * Register the per-db workers and assign them an index in shmem. The
+    * memory doesn't actually exist yet, it'll be allocated in shmem init.
     */
-   perdb_worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+   bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
        BGWORKER_BACKEND_DATABASE_CONNECTION;
-   perdb_worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+   bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
    /* TODO: For EXEC_BACKEND we must use bgw_library_name & bgw_function_name */
-   perdb_worker.bgw_main = bdr_perdb_worker_main;
-   perdb_worker.bgw_restart_time = 5;
-   perdb_worker.bgw_notify_pid = 0;
-
-   for (off = 0; off < num_used_databases; off++)
+   bgw.bgw_main = bdr_perdb_worker_main;
+   bgw.bgw_restart_time = 5;
+   bgw.bgw_notify_pid = 0;
+   for (off = 0; off < bdr_distinct_dbnames_count; off++)
    {
-       /* Start a worker for this db */
-       BdrPerdbWorker *con = palloc(sizeof(BdrPerdbWorker));
-
-       strncpy(NameStr(con->dbname), used_databases[off], NAMEDATALEN);
-       NameStr(con->dbname)[NAMEDATALEN-1] = '\0';
-
-       con->seq_slot = off;
-
-       elog(DEBUG1, "starting bdr database worker for db %s", NameStr(con->dbname));
-       snprintf(perdb_worker.bgw_name, BGW_MAXLEN,
-                "bdr: %s", NameStr(con->dbname));
-       perdb_worker.bgw_main_arg = PointerGetDatum(con);
-       RegisterBackgroundWorker(&perdb_worker);
+       snprintf(bgw.bgw_name, BGW_MAXLEN,
+                "bdr: %s", bdr_distinct_dbnames[off]);
+       /*
+        * This index into BdrWorkerCtl shmem hasn't been populated yet. It'll
+        * be set up in bdr_worker_shmem_create_workers .
+        */
+       bgw.bgw_main_arg = Int32GetDatum(off);
+       RegisterBackgroundWorker(&bgw);
    }
 
    EmitWarningsOnPlaceholders("bdr");
 
-   for (off = 0; off < num_used_databases; off++)
-       pfree(used_databases[off]);
-   pfree(used_databases);
-
    pfree(connections_tmp);
 
 out:
index 776e0ebae0e319f05d1d9f289665f98204647e41..e4987cfa3b034a642555df79c614717ea90c3b2f 100644 (file)
@@ -316,7 +316,8 @@ extern bool bdr_get_bigendian(void);
 extern void bdr_init_replica(Name dbname);
 
 /* shared memory management */
-extern BdrWorker* bdr_worker_shmem_alloc(BdrWorkerType worker_type);
+extern BdrWorker* bdr_worker_shmem_alloc(BdrWorkerType worker_type,
+                                        uint32 *ctl_idx);
 extern void bdr_worker_shmem_release(BdrWorker* worker, BackgroundWorkerHandle *handle);
 
 /* forbid commands we do not support currently (or never will) */