From d8fe8b8ca939089d3fbe31b8904aa532bac294a5 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 13 Oct 2016 13:41:43 +0200 Subject: [PATCH] fix lockups in PostgreSQL collector When the workers produce results and return them through a queue, we need to first drain the queue and then wait for the thread to join(). Otherwise it seems to lock up - the join() call with wait as long as there are items in the queue. This also happens because we do both things (reading from queue and waiting for the thread) in the same thread. --- client/collectors/postgres.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/client/collectors/postgres.py b/client/collectors/postgres.py index 306c2b7..7096c27 100644 --- a/client/collectors/postgres.py +++ b/client/collectors/postgres.py @@ -3,11 +3,10 @@ import multiprocessing import os import psycopg2 import psycopg2.extras -import Queue import time from multiprocessing import Process, Queue - +from utils.logging import log class PostgresCollector(object): 'collects basic PostgreSQL-level statistics (bgwriter, databases, tables, indexes)' @@ -17,22 +16,29 @@ class PostgresCollector(object): def start(self): - self._in_queue = multiprocessing.Queue() - self._out_queue = multiprocessing.Queue() + self._in_queue = Queue() + self._out_queue = Queue() self._worker = Process(target=run_collector, args=(self._in_queue, self._out_queue, self._dbname)) self._worker.start() def stop(self): + # signal the worker process to stop by writing a value into the queue self._in_queue.put(True) - # FIXME this gets stuck for some reason (but we'll wait for queue anyway) - # self._worker.join() + log("stopping the PostgreSQL statistics collector") - # and then read the result + # Wait for collector to place result into the output queue. This needs + # to happen before calling join() otherwise it causes a deadlock. + log("waiting for collector result in a queue") self._result = self._out_queue.get() + # And wait for the worker to terminate. This should be pretty fast as + # the collector places result into the queue right before terminating. + log("waiting for collector process to terminate") + self._worker.join() + self._worker = None self._in_queue = None self._out_queue = None @@ -67,6 +73,7 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0): # if we've received message in the input queue (not empty), terminate if not in_queue.empty(): + log("PostgreSQL collector received request to terminate") break # open connection to the benchmark database (if can't open, continue) @@ -128,6 +135,8 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0): conn.close() + log("PostgreSQL collector generates CSV results") + # close the CSV writers bgwriter_log = None tables_log = None @@ -155,3 +164,5 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0): os.remove('database.csv') out_queue.put(result) + + log("PostgreSQL collector put results into output queue and terminates") -- 2.39.5