--- /dev/null
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ * Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "lib/ilist.h"
+#include "storage/spin.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define TOC_SCAN_KEY 1
+#define TOC_RESULT_KEY 2
+
+typedef struct shared_state
+{
+ int32 flag;
+ ConditionVariable cv;
+ slock_t mutex;
+}shared_state;
+void _PG_init(void);
+void count_worker_main(dsm_segment *seg, shm_toc *toc);
+void count_parallelscan_initialize(shared_state *sha_state);
+
+static void count_helper(shared_state *sha_state,
+ int64 *resultp);
+
+Size count_parallelscan_estimate(void);
+
+Size count_parallelscan_estimate()
+{
+ return add_size(offsetof(shared_state, mutex),
+ sizeof(slock_t));
+}
+void count_parallelscan_initialize(shared_state *sha_state)
+{
+ sha_state->flag = 1;
+ ConditionVariableInit(&sha_state->cv);
+ SpinLockInit(&sha_state->mutex);
+}
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+/* Oid relid = PG_GETARG_OID(0); */
+ int32 nworkers = PG_GETARG_INT32(1);
+ int64 *resultp;
+ int64 result;
+ ParallelContext *pcxt;
+ shared_state *sha_state;
+ Size sz;
+
+ if (nworkers <= 0)
+ ereport(ERROR,
+ (errmsg("number of parallel workers must be positive")));
+
+
+ EnterParallelMode();
+
+ pcxt = CreateParallelContextForExternalFunction("parallel_dummy",
+ "count_worker_main",
+ nworkers);
+ sz = count_parallelscan_estimate();
+ shm_toc_estimate_chunk(&pcxt->estimator, sz);
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64));
+ shm_toc_estimate_keys(&pcxt->estimator, 2);
+ InitializeParallelDSM(pcxt);
+ sha_state = shm_toc_allocate(pcxt->toc, sz);
+ count_parallelscan_initialize(sha_state);
+ shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, sha_state);
+ resultp = shm_toc_allocate(pcxt->toc, sizeof(int64));
+ shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp);
+
+ LaunchParallelWorkers(pcxt);
+
+ /* here's where we do the "real work" ... */
+ count_helper(sha_state, resultp);
+
+ WaitForParallelWorkersToFinish(pcxt);
+
+ result = *resultp;
+
+ DestroyParallelContext(pcxt);
+
+ ExitParallelMode();
+
+ PG_RETURN_INT64(result);
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+ shared_state *sha_state;
+ int64 *resultp;
+
+ sha_state = shm_toc_lookup(toc, TOC_SCAN_KEY);
+ resultp = shm_toc_lookup(toc, TOC_RESULT_KEY);
+ Assert(sha_state != NULL && resultp != NULL);
+
+ count_helper(sha_state, resultp);
+}
+
+static void
+count_helper(shared_state *sha_state, int64 *resultp)
+{
+ int64 mytuples = 0;
+ int32 local_flag;
+ int i;
+ bool queued_self = false;
+ bool done;
+ bool woke_somebody = false;
+ int nprepared = 0;
+ int nslept = 0;
+
+retry:
+ done = false;
+ for (;;)
+ {
+ SpinLockAcquire(&sha_state->mutex);
+ local_flag = sha_state->flag;
+ if (local_flag == 1)
+ {
+ sha_state->flag = 2;
+ done = true;
+ }
+ SpinLockRelease(&sha_state->mutex);
+
+ if (done)
+ break;
+
+ if (queued_self)
+ {
+ ++nslept;
+ ConditionVariableSleep();
+ queued_self = false;
+ }
+ else
+ {
+ ++nprepared;
+ ConditionVariablePrepareToSleep(&sha_state->cv);
+ queued_self = true;
+ }
+ }
+ if (queued_self)
+ {
+ ConditionVariableCancelSleep();
+ queued_self = false;
+ }
+
+ for (i=0; i < 2000000; i++)
+ {
+ mytuples++;
+ }
+
+ SpinLockAcquire(&sha_state->mutex);
+ *resultp += mytuples;
+ sha_state->flag = 1;
+ SpinLockRelease(&sha_state->mutex);
+ woke_somebody = ConditionVariableSignal(&sha_state->cv);
+
+ /*
+ * If nothing interesting happened from a contention point of view, loop
+ * around again so that we get some.
+ */
+ if (nprepared == 0 && nslept == 0 && !woke_somebody)
+ goto retry;
+
+ elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples (prepared: %d, slept: %d, woke: %d)",
+ MyProcPid, mytuples, nprepared, nslept, (int) woke_somebody);
+}