Reviewed in an earlier version by Rahila Syed.
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
/* Reset WAL record construction state */
XLogResetInsertion();
+ /* Cancel condition variable sleep */
+ ConditionVariableCancelSleep();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
}
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
* files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
- s_lock.o predicate.o
+ s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ * Implementation of condition variables. Condition variables provide
+ * a way for one process to wait until a specific condition occurs,
+ * without needing to know the specific identity of the process for
+ * which they are waiting. Waits for condition variables can be
+ * interrupted, unlike LWLock waits. Condition variables are safe
+ * to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+ SpinLockInit(&cv->mutex);
+ proclist_init(&cv->wakeup);
+}
+
+/*
+ * Add ourselves to the wait queue for a condition variable and mark
+ * ourselves as sleeping.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+ int pgprocno = MyProc->pgprocno;
+
+ /*
+ * It's not legal to prepare a sleep until the previous sleep has been
+ * completed or cancelled.
+ */
+ Assert(cv_sleep_target == NULL);
+
+ /* Record the condition variable on which we will sleep. */
+ cv_sleep_target = cv;
+
+ /* Mark myself as sleeping. */
+ MyProc->cvSleeping = true;
+
+ /* Add myself to the wait queue. */
+ SpinLockAcquire(&cv->mutex);
+ proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+}
+
+/*
+ * Sleeping on a condition variable is extremely simple. We just repeatedly
+ * wait on our latch until someone clears our cvSleeping flag. This may
+ * even happen immediately, since a signal or broadcast operation could have
+ * happened after we prepared to sleep and before we reach this function.
+ */
+void
+ConditionVariableSleep(void)
+{
+ Assert(cv_sleep_target != NULL);
+
+ while (MyProc->cvSleeping)
+ {
+ CHECK_FOR_INTERRUPTS();
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+ ResetLatch(&MyProc->procLatch);
+ }
+
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Cancel any pending sleep operation. We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+ ConditionVariable *cv = cv_sleep_target;
+
+ if (cv_sleep_target == NULL)
+ return;
+
+ SpinLockAcquire(&cv->mutex);
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ MyProc->cvSleeping = false;
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+ PGPROC *proc = NULL;
+
+ /* Remove the first process from the wakeup queue (if any). */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_is_empty(&cv->wakeup))
+ proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* If we found someone sleeping, set their latch to wake them up. */
+ if (proc != NULL)
+ {
+ SetLatch(&proc->procLatch);
+ return true;
+ }
+
+ /* No sleeping processes. */
+ return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+ int nwoken = 0;
+
+ /*
+ * Let's just do this the dumbest way possible. We could try to dequeue
+ * all the sleepers at once to save spinlock cycles, but it's a bit hard
+ * to get that right in the face of possible sleep cancellations, and
+ * we don't want to loop holding the mutex.
+ */
+ while (ConditionVariableSignal(cv))
+ ++nwoken;
+
+ return nwoken;
+}
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
*/
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ * Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true. Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable. In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+ slock_t mutex;
+ proclist_head wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable. In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met. If not, the process should then sleep. If so,
+ * it should cancel the sleep. A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again. If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition. Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(void);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif /* CONDITION_VARIABLE_H */
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
+ /* Support for condition variables. */
+ bool cvSleeping; /* true if sleeping on a condition variable */
+ proclist_node cvWaitLink; /* position in CV wait list */
+
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
proclist_node_get(node->next, node_offset)->prev = node->prev;
}
+/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+ PGPROC *proc;
+
+ Assert(!proclist_is_empty(list));
+ proc = GetPGProcByNumber(list->head);
+ proclist_delete_offset(list, list->head, node_offset);
+ return proc;
+}
+
/*
* Helper macros to avoid repetition of offsetof(PGPROC, <member>).
* 'link_member' is the name of a proclist_node member in PGPROC.
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+ proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current