Test code derived from Rahila's work.
authorRobert Haas <rhaas@postgresql.org>
Sat, 20 Feb 2016 05:04:37 +0000 (10:34 +0530)
committerRobert Haas <rhaas@postgresql.org>
Thu, 11 Aug 2016 19:18:11 +0000 (15:18 -0400)
contrib/parallel_dummy/Makefile [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy--1.0.sql [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy.c [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy.control [new file with mode: 0644]

diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644 (file)
index 0000000..de00f50
--- /dev/null
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644 (file)
index 0000000..d49bd0f
--- /dev/null
@@ -0,0 +1,7 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+                                                         nworkers pg_catalog.int4)
+    RETURNS pg_catalog.int8 STRICT
+       AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644 (file)
index 0000000..6192834
--- /dev/null
@@ -0,0 +1,192 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ *             Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *             contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "lib/ilist.h"
+#include "storage/spin.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define                TOC_SCAN_KEY                    1
+#define                TOC_RESULT_KEY                  2
+
+typedef struct shared_state
+{
+       int32 flag;
+       ConditionVariable cv;
+       slock_t mutex;
+}shared_state;
+void           _PG_init(void);
+void           count_worker_main(dsm_segment *seg, shm_toc *toc);
+void count_parallelscan_initialize(shared_state *sha_state);
+
+static void count_helper(shared_state *sha_state,
+                                                int64 *resultp);
+
+Size count_parallelscan_estimate(void);
+
+Size count_parallelscan_estimate()
+{
+  return add_size(offsetof(shared_state, mutex),
+                                        sizeof(slock_t));
+}
+void count_parallelscan_initialize(shared_state *sha_state)
+{
+       sha_state->flag = 1;
+       ConditionVariableInit(&sha_state->cv);
+       SpinLockInit(&sha_state->mutex);
+}
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+/*     Oid                     relid = PG_GETARG_OID(0); */
+       int32           nworkers = PG_GETARG_INT32(1);
+       int64      *resultp;
+       int64           result;
+       ParallelContext *pcxt;
+       shared_state *sha_state;
+       Size            sz;
+
+       if (nworkers <= 0)
+               ereport(ERROR,
+                               (errmsg("number of parallel workers must be positive")));
+
+
+       EnterParallelMode();
+
+       pcxt = CreateParallelContextForExternalFunction("parallel_dummy",
+                                                                                        "count_worker_main",
+                                                                                        nworkers);
+       sz = count_parallelscan_estimate();
+       shm_toc_estimate_chunk(&pcxt->estimator, sz);
+       shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64));
+       shm_toc_estimate_keys(&pcxt->estimator, 2);
+       InitializeParallelDSM(pcxt);
+       sha_state = shm_toc_allocate(pcxt->toc, sz);
+       count_parallelscan_initialize(sha_state);
+       shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, sha_state);
+       resultp = shm_toc_allocate(pcxt->toc, sizeof(int64));
+       shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp);
+
+       LaunchParallelWorkers(pcxt);
+
+       /* here's where we do the "real work" ... */
+       count_helper(sha_state, resultp);
+
+       WaitForParallelWorkersToFinish(pcxt);
+
+       result = *resultp;
+
+       DestroyParallelContext(pcxt);
+
+       ExitParallelMode();
+
+       PG_RETURN_INT64(result);
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+       shared_state    *sha_state;
+       int64      *resultp;
+
+       sha_state = shm_toc_lookup(toc, TOC_SCAN_KEY);
+       resultp = shm_toc_lookup(toc, TOC_RESULT_KEY);
+       Assert(sha_state != NULL && resultp != NULL);
+
+       count_helper(sha_state, resultp);
+}
+
+static void
+count_helper(shared_state *sha_state, int64 *resultp)
+{
+       int64           mytuples = 0;
+       int32           local_flag;
+       int i;
+       bool            queued_self = false;
+       bool            done;
+       bool            woke_somebody = false;
+       int                     nprepared = 0;
+       int                     nslept = 0;
+
+retry:
+       done = false;
+       for (;;)
+       {
+               SpinLockAcquire(&sha_state->mutex);
+               local_flag = sha_state->flag;
+               if (local_flag == 1)
+               {
+                       sha_state->flag = 2;
+                       done = true;
+               }
+               SpinLockRelease(&sha_state->mutex);
+
+               if (done)
+                       break;
+
+               if (queued_self)
+               {
+                       ++nslept;
+                       ConditionVariableSleep();
+                       queued_self = false;
+               }
+               else
+               {
+                       ++nprepared;
+                       ConditionVariablePrepareToSleep(&sha_state->cv);
+                       queued_self = true;
+               }
+       }
+       if (queued_self)
+       {
+               ConditionVariableCancelSleep();
+               queued_self = false;
+       }
+
+       for (i=0; i < 2000000; i++)
+       {
+               mytuples++;
+       }
+
+       SpinLockAcquire(&sha_state->mutex);
+       *resultp += mytuples;
+       sha_state->flag = 1;
+       SpinLockRelease(&sha_state->mutex);
+       woke_somebody = ConditionVariableSignal(&sha_state->cv);
+
+       /*
+        * If nothing interesting happened from a contention point of view, loop
+        * around again so that we get some.
+        */
+       if (nprepared == 0 && nslept == 0 && !woke_somebody)
+               goto retry;
+
+       elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples (prepared: %d, slept: %d, woke: %d)",
+               MyProcPid, mytuples, nprepared, nslept, (int) woke_somebody);
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644 (file)
index 0000000..90bae3f
--- /dev/null
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true