{
        dsm_handle      handle;
        uint32          refcnt;                 /* 2+ = active, 1 = moribund, 0 = gone */
+       void       *impl_private_pm_handle; /* only needed on Windows */
+       bool            pinned;
 } dsm_control_item;
 
 /* Layout of the dynamic shared memory control segment. */
                        dsm_control->item[i].handle = seg->handle;
                        /* refcnt of 1 triggers destruction, so start at 2 */
                        dsm_control->item[i].refcnt = 2;
+                       dsm_control->item[i].impl_private_pm_handle = NULL;
+                       dsm_control->item[i].pinned = false;
                        seg->control_slot = i;
                        LWLockRelease(DynamicSharedMemoryControlLock);
                        return seg;
        dsm_control->item[nitems].handle = seg->handle;
        /* refcnt of 1 triggers destruction, so start at 2 */
        dsm_control->item[nitems].refcnt = 2;
+       dsm_control->item[nitems].impl_private_pm_handle = NULL;
+       dsm_control->item[nitems].pinned = false;
        seg->control_slot = nitems;
        dsm_control->nitems++;
        LWLockRelease(DynamicSharedMemoryControlLock);
                /* If new reference count is 1, try to destroy the segment. */
                if (refcnt == 1)
                {
+                       /* A pinned segment should never reach 1. */
+                       Assert(!dsm_control->item[control_slot].pinned);
+
                        /*
                         * If we fail to destroy the segment here, or are killed before we
                         * finish doing so, the reference count will remain at 1, which
 }
 
 /*
- * Keep a dynamic shared memory segment until postmaster shutdown.
+ * Keep a dynamic shared memory segment until postmaster shutdown, or until
+ * dsm_unpin_segment is called.
  *
- * This function should not be called more than once per segment;
- * on Windows, doing so will create unnecessary handles which will
- * consume system resources to no benefit.
+ * This function should not be called more than once per segment, unless the
+ * segment is explicitly unpinned with dsm_unpin_segment in between calls.
  *
  * Note that this function does not arrange for the current process to
  * keep the segment mapped indefinitely; if that behavior is desired,
 void
 dsm_pin_segment(dsm_segment *seg)
 {
+       void       *handle;
+
        /*
         * Bump reference count for this segment in shared memory. This will
         * ensure that even if there is no session which is attached to this
-        * segment, it will remain until postmaster shutdown.
+        * segment, it will remain until postmaster shutdown or an explicit call
+        * to unpin.
         */
        LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);
+       if (dsm_control->item[seg->control_slot].pinned)
+               elog(ERROR, "cannot pin a segment that is already pinned");
+       dsm_impl_pin_segment(seg->handle, seg->impl_private, &handle);
+       dsm_control->item[seg->control_slot].pinned = true;
        dsm_control->item[seg->control_slot].refcnt++;
+       dsm_control->item[seg->control_slot].impl_private_pm_handle = handle;
        LWLockRelease(DynamicSharedMemoryControlLock);
+}
+
+/*
+ * Unpin a dynamic shared memory segment that was previously pinned with
+ * dsm_pin_segment.  This function should not be called unless dsm_pin_segment
+ * was previously called for this segment.
+ *
+ * The argument is a dsm_handle rather than a dsm_segment in case you want
+ * to unpin a segment to which you haven't attached.  This turns out to be
+ * useful if, for example, a reference to one shared memory segment is stored
+ * within another shared memory segment.  You might want to unpin the
+ * referenced segment before destroying the referencing segment.
+ */
+void
+dsm_unpin_segment(dsm_handle handle)
+{
+       uint32          control_slot = INVALID_CONTROL_SLOT;
+       bool            destroy = false;
+       uint32          i;
 
-       dsm_impl_pin_segment(seg->handle, seg->impl_private);
+       /* Find the control slot for the given handle. */
+       LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);
+       for (i = 0; i < dsm_control->nitems; ++i)
+       {
+               /* Skip unused slots. */
+               if (dsm_control->item[i].refcnt == 0)
+                       continue;
+
+               /* If we've found our handle, we can stop searching. */
+               if (dsm_control->item[i].handle == handle)
+               {
+                       control_slot = i;
+                       break;
+               }
+       }
+
+       /*
+        * We should definitely have found the slot, and it should not already be
+        * in the process of going away, because this function should only be
+        * called on a segment which is pinned.
+        */
+       if (control_slot == INVALID_CONTROL_SLOT)
+               elog(ERROR, "cannot unpin unknown segment handle");
+       if (!dsm_control->item[control_slot].pinned)
+               elog(ERROR, "cannot unpin a segment that is not pinned");
+       Assert(dsm_control->item[control_slot].refcnt > 1);
+
+       /*
+        * Allow implementation-specific code to run.  We have to do this before
+        * releasing the lock, because impl_private_pm_handle may get modified by
+        * dsm_impl_unpin_segment.
+        */
+       dsm_impl_unpin_segment(handle,
+                                       &dsm_control->item[control_slot].impl_private_pm_handle);
+
+       /* Note that 1 means no references (0 means unused slot). */
+       if (--dsm_control->item[control_slot].refcnt == 1)
+               destroy = true;
+       dsm_control->item[control_slot].pinned = false;
+
+       /* Now we can release the lock. */
+       LWLockRelease(DynamicSharedMemoryControlLock);
+
+       /* Clean up resources if that was the last reference. */
+       if (destroy)
+       {
+               void       *junk_impl_private = NULL;
+               void       *junk_mapped_address = NULL;
+               Size            junk_mapped_size = 0;
+
+               /*
+                * For an explanation of how error handling works in this case, see
+                * comments in dsm_detach.  Note that if we reach this point, the
+                * current process certainly does not have the segment mapped, because
+                * if it did, the reference count would have still been greater than 1
+                * even after releasing the reference count held by the pin.  The fact
+                * that there can't be a dsm_segment for this handle makes it OK to
+                * pass the mapped size, mapped address, and private data as NULL
+                * here.
+                */
+               if (dsm_impl_op(DSM_OP_DESTROY, handle, 0, &junk_impl_private,
+                                               &junk_mapped_address, &junk_mapped_size, WARNING))
+               {
+                       LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE);
+                       Assert(dsm_control->item[control_slot].handle == handle);
+                       Assert(dsm_control->item[control_slot].refcnt == 1);
+                       dsm_control->item[control_slot].refcnt = 0;
+                       LWLockRelease(DynamicSharedMemoryControlLock);
+               }
+       }
 }
 
 /*
 
 #endif
 
 /*
- * Implementation-specific actions that must be performed when a segment
- * is to be preserved until postmaster shutdown.
+ * Implementation-specific actions that must be performed when a segment is to
+ * be preserved even when no backend has it attached.
  *
  * Except on Windows, we don't need to do anything at all.  But since Windows
  * cleans up segments automatically when no references remain, we duplicate
  * do anything to receive the handle; Windows transfers it automatically.
  */
 void
-dsm_impl_pin_segment(dsm_handle handle, void *impl_private)
+dsm_impl_pin_segment(dsm_handle handle, void *impl_private,
+                                        void **impl_private_pm_handle)
 {
        switch (dynamic_shared_memory_type)
        {
                                                  errmsg("could not duplicate handle for \"%s\": %m",
                                                                 name)));
                                }
+
+                               /*
+                                * Here, we remember the handle that we created in the
+                                * postmaster process.  This handle isn't actually usable in
+                                * any process other than the postmaster, but that doesn't
+                                * matter.  We're just holding onto it so that, if the segment
+                                * is unpinned, dsm_impl_unpin_segment can close it.
+                                */
+                               *impl_private_pm_handle = hmap;
+                               break;
+                       }
+#endif
+               default:
+                       break;
+       }
+}
+
+/*
+ * Implementation-specific actions that must be performed when a segment is no
+ * longer to be preserved, so that it will be cleaned up when all backends
+ * have detached from it.
+ *
+ * Except on Windows, we don't need to do anything at all.  For Windows, we
+ * close the extra handle that dsm_impl_pin_segment created in the
+ * postmaster's process space.
+ */
+void
+dsm_impl_unpin_segment(dsm_handle handle, void **impl_private)
+{
+       switch (dynamic_shared_memory_type)
+       {
+#ifdef USE_DSM_WINDOWS
+               case DSM_IMPL_WINDOWS:
+                       {
+                               if (*impl_private &&
+                                       !DuplicateHandle(PostmasterHandle, *impl_private,
+                                                                        NULL, NULL, 0, FALSE,
+                                                                        DUPLICATE_CLOSE_SOURCE))
+                               {
+                                       char            name[64];
+
+                                       snprintf(name, 64, "%s.%u", SEGMENT_NAME_PREFIX, handle);
+                                       _dosmaperr(GetLastError());
+                                       ereport(ERROR,
+                                                       (errcode_for_dynamic_shared_memory(),
+                                                 errmsg("could not duplicate handle for \"%s\": %m",
+                                                                name)));
+                               }
+
+                               *impl_private = NULL;
                                break;
                        }
 #endif