Condition variable code.
authorRobert Haas <rhaas@postgresql.org>
Sat, 20 Feb 2016 04:43:42 +0000 (10:13 +0530)
committerRobert Haas <rhaas@postgresql.org>
Thu, 11 Aug 2016 19:18:11 +0000 (15:18 -0400)
Reviewed in an earlier version by Rahila Syed.

12 files changed:
src/backend/access/transam/xact.c
src/backend/bootstrap/bootstrap.c
src/backend/postmaster/bgwriter.c
src/backend/postmaster/checkpointer.c
src/backend/postmaster/walwriter.c
src/backend/replication/walsender.c
src/backend/storage/lmgr/Makefile
src/backend/storage/lmgr/condition_variable.c [new file with mode: 0644]
src/backend/storage/lmgr/proc.c
src/include/storage/condition_variable.h [new file with mode: 0644]
src/include/storage/proc.h
src/include/storage/proclist.h

index 23f36ead7e54e86d0ef1e33a63aa60ade10f8b8f..b40b2e0ae8503f6f252772406a01b0a266c534fa 100644 (file)
@@ -45,6 +45,7 @@
 #include "replication/origin.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -2476,6 +2477,9 @@ AbortTransaction(void)
        /* Reset WAL record construction state */
        XLogResetInsertion();
 
+       /* Cancel condition variable sleep */
+       ConditionVariableCancelSleep();
+
        /*
         * Also clean up any open wait for lock, since the lock manager will choke
         * if we try to wait for another lock before doing this.
index e518e178bb4b43958a929aa0b998f09d92f5a1b6..9eeb49c676a1b6c6167cb0e943392b2c6168b1b1 100644 (file)
@@ -33,6 +33,7 @@
 #include "replication/walreceiver.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
@@ -535,6 +536,7 @@ static void
 ShutdownAuxiliaryProcess(int code, Datum arg)
 {
        LWLockReleaseAll();
+       ConditionVariableCancelSleep();
        pgstat_report_wait_end();
 }
 
index 00f03d8acbe7ba54b4fc91aa28c80f2c508850fb..40f3f809e5a10e6ffba3679db09217a358e29dad 100644 (file)
@@ -46,6 +46,7 @@
 #include "postmaster/bgwriter.h"
 #include "storage/bufmgr.h"
 #include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -189,6 +190,7 @@ BackgroundWriterMain(void)
                 * about in bgwriter, but we do have LWLocks, buffers, and temp files.
                 */
                LWLockReleaseAll();
+               ConditionVariableCancelSleep();
                AbortBufferIO();
                UnlockBuffers();
                /* buffer pins are released here: */
index 8d4b3539b1ec8f5d901f8fa4d8d05bf92c4aff67..0c072f387b7647bed89b1f16af36854e914702ec 100644 (file)
@@ -49,6 +49,7 @@
 #include "postmaster/bgwriter.h"
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -273,6 +274,7 @@ CheckpointerMain(void)
                 * files.
                 */
                LWLockReleaseAll();
+               ConditionVariableCancelSleep();
                pgstat_report_wait_end();
                AbortBufferIO();
                UnlockBuffers();
index 228190a836d52e59ed0f9aef0a578d4c0f906c29..e5de0199a36bd8e24dd0869484c1b3ad5872a7e6 100644 (file)
@@ -50,6 +50,7 @@
 #include "pgstat.h"
 #include "postmaster/walwriter.h"
 #include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
@@ -169,6 +170,7 @@ WalWriterMain(void)
                 * about in walwriter, but we do have LWLocks, and perhaps buffers?
                 */
                LWLockReleaseAll();
+               ConditionVariableCancelSleep();
                pgstat_report_wait_end();
                AbortBufferIO();
                UnlockBuffers();
index a0dba194a615b38bac046d66e76b800a244808c7..44143d7aefd163d368d634f1178c038fe51e498c 100644 (file)
@@ -66,6 +66,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
 WalSndErrorCleanup(void)
 {
        LWLockReleaseAll();
+       ConditionVariableCancelSleep();
        pgstat_report_wait_end();
 
        if (sendFile >= 0)
index cd6ec73f08f10c10978e248d7c6194f205bd51ae..e1b787e838fce7727ae2fb538dfd356e86fe9611 100644 (file)
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
-       s_lock.o predicate.o
+       s_lock.o predicate.o condition_variable.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644 (file)
index 0000000..0639689
--- /dev/null
@@ -0,0 +1,157 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ *       Implementation of condition variables.  Condition variables provide
+ *       a way for one process to wait until a specific condition occurs,
+ *       without needing to know the specific identity of the process for
+ *       which they are waiting.  Waits for condition variables can be
+ *       interrupted, unlike LWLock waits.  Condition variables are safe
+ *       to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+       SpinLockInit(&cv->mutex);
+       proclist_init(&cv->wakeup);
+}
+
+/*
+ * Add ourselves to the wait queue for a condition variable and mark
+ * ourselves as sleeping.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+       int             pgprocno = MyProc->pgprocno;
+
+       /*
+        * It's not legal to prepare a sleep until the previous sleep has been
+        * completed or cancelled.
+        */
+       Assert(cv_sleep_target == NULL);
+
+       /* Record the condition variable on which we will sleep. */
+       cv_sleep_target = cv;
+
+       /* Mark myself as sleeping. */
+       MyProc->cvSleeping = true;
+
+       /* Add myself to the wait queue. */
+       SpinLockAcquire(&cv->mutex);
+       proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+       SpinLockRelease(&cv->mutex);
+}
+
+/*
+ * Sleeping on a condition variable is extremely simple.  We just repeatedly
+ * wait on our latch until someone clears our cvSleeping flag.  This may
+ * even happen immediately, since a signal or broadcast operation could have
+ * happened after we prepared to sleep and before we reach this function.
+ */
+void
+ConditionVariableSleep(void)
+{
+       Assert(cv_sleep_target != NULL);
+
+       while (MyProc->cvSleeping)
+       {
+               CHECK_FOR_INTERRUPTS();
+               WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+               ResetLatch(&MyProc->procLatch);
+       }
+
+       cv_sleep_target = NULL;
+}
+
+/*
+ * Cancel any pending sleep operation.  We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+       ConditionVariable *cv = cv_sleep_target;
+
+       if (cv_sleep_target == NULL)
+               return;
+
+       SpinLockAcquire(&cv->mutex);
+       proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+       SpinLockRelease(&cv->mutex);
+
+       MyProc->cvSleeping = false;
+       cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+       PGPROC  *proc = NULL;
+
+       /* Remove the first process from the wakeup queue (if any). */
+       SpinLockAcquire(&cv->mutex);
+       if (!proclist_is_empty(&cv->wakeup))
+               proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+       SpinLockRelease(&cv->mutex);
+
+       /* If we found someone sleeping, set their latch to wake them up. */
+       if (proc != NULL)
+       {
+               SetLatch(&proc->procLatch);
+               return true;
+       }
+
+       /* No sleeping processes. */
+       return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+       int             nwoken = 0;
+
+       /*
+        * Let's just do this the dumbest way possible.  We could try to dequeue
+        * all the sleepers at once to save spinlock cycles, but it's a bit hard
+        * to get that right in the face of possible sleep cancellations, and
+        * we don't want to loop holding the mutex.
+        */
+       while (ConditionVariableSignal(cv))
+               ++nwoken;
+
+       return nwoken;
+}
index 9a758bd91600b0839afadf3e3907e06a0902f8d5..ec08091efe64d05f2f6abcb94957e04af435eb82 100644 (file)
@@ -42,6 +42,7 @@
 #include "postmaster/autovacuum.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "storage/condition_variable.h"
 #include "storage/standby.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -805,6 +806,9 @@ ProcKill(int code, Datum arg)
         */
        LWLockReleaseAll();
 
+       /* Cancel any pending condition variable sleep, too */
+       ConditionVariableCancelSleep();
+
        /* Make sure active replication slots are released */
        if (MyReplicationSlot != NULL)
                ReplicationSlotRelease();
@@ -910,6 +914,9 @@ AuxiliaryProcKill(int code, Datum arg)
        /* Release any LW locks I am holding (see notes above) */
        LWLockReleaseAll();
 
+       /* Cancel any pending condition variable sleep, too */
+       ConditionVariableCancelSleep();
+
        /*
         * Reset MyLatch to the process local one.  This is so that signal
         * handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644 (file)
index 0000000..54b7fba
--- /dev/null
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ *       Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true.  Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable.  In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+       slock_t         mutex;
+       proclist_head   wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable.  In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met.  If not, the process should then sleep.  If so,
+ * it should cancel the sleep.  A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again.  If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition.  Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(void);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif   /* CONDITION_VARIABLE_H */
index f576f052dfe6b71737e84e663f89e0302b7b126b..812008ab2bc7efec22f7f30b285092d96eda7a10 100644 (file)
@@ -115,6 +115,10 @@ struct PGPROC
        uint8           lwWaitMode;             /* lwlock mode being waited for */
        proclist_node lwWaitLink;       /* position in LW lock wait list */
 
+       /* Support for condition variables. */
+       bool            cvSleeping;             /* true if sleeping on a condition variable */
+       proclist_node   cvWaitLink;     /* position in CV wait list */
+
        /* Info about lock the process is currently waiting for, if any. */
        /* waitLock and waitProcLock are NULL if not currently waiting. */
        LOCK       *waitLock;           /* Lock object we're sleeping on ... */
index 2013a406a3c2bd8e120424934194c95e687eb969..0d7935c903554a3e424aedc97dde9762da64c198 100644 (file)
@@ -119,6 +119,20 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
                proclist_node_get(node->next, node_offset)->prev = node->prev;
 }
 
+/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+       PGPROC *proc;
+
+       Assert(!proclist_is_empty(list));
+       proc = GetPGProcByNumber(list->head);
+       proclist_delete_offset(list, list->head, node_offset);
+       return proc;
+}
+
 /*
  * Helper macros to avoid repetition of offsetof(PGPROC, <member>).
  * 'link_member' is the name of a proclist_node member in PGPROC.
@@ -129,6 +143,8 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
        proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
 #define proclist_push_tail(list, procno, link_member) \
        proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+       proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
 
 /*
  * Iterate through the list pointed at by 'lhead', storing the current