From c9de6870b604e6aa99c42ef1ba12bc4b06c64a31 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 3 Nov 2009 14:26:19 +0200 Subject: [PATCH] fix pgq brokenness related to recent commits - get_batch_cursor() - loop_delay --- python/pgq/cascade/admin.py | 9 +++++---- python/pgq/consumer.py | 5 +++-- python/pgqadm.py | 4 ++++ sql/pgq/functions/pgq.get_batch_cursor.sql | 5 ----- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 48a582fe..5f42a324 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -237,7 +237,7 @@ class CascadeAdmin(skytools.AdminScript): else: loc = self.cf.get(self.initial_db_name) - while self.looping: + while 1: db = self.get_database('root_db', connstr = loc) @@ -389,6 +389,7 @@ class CascadeAdmin(skytools.AdminScript): node_db = self.get_node_database(node) qinfo = self.load_queue_info(node_db) ninfo = qinfo.local_node + node_location = qinfo.get_member(node).location # reload consumer info cmap = self.get_node_consumer_map(node) @@ -398,8 +399,8 @@ class CascadeAdmin(skytools.AdminScript): is_worker = (ninfo.worker_name == consumer) # fixme: expect the node to be described already - #q = "select * from pgq_node.add_member(%s, %s, %s, false)" - #self.node_cmd(new_provider, q, [self.queue_name, node_name, node_location]) + q = "select * from pgq_node.register_location(%s, %s, %s, false)" + self.node_cmd(new_provider, q, [self.queue_name, node, node_location]) # subscribe on new provider if is_worker: @@ -729,7 +730,7 @@ class CascadeAdmin(skytools.AdminScript): self.node_cmd(node, q, [self.queue_name, consumer, pause_flag]) self.log.info('Waiting for worker to accept') - while self.looping: + while 1: q = "select * from pgq_node.get_consumer_state(%s, %s)" stat = self.node_cmd(node, q, [self.queue_name, consumer], quiet = 1)[0] if stat['paused'] != pause_flag: diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index ef6283d6..04915614 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -46,6 +46,7 @@ class _BatchWalker(object): self.curs = curs self.length = 0 self.status_map = {} + self.batch_id = batch_id self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done def __iter__(self): @@ -54,7 +55,7 @@ class _BatchWalker(object): self.fetch_status = 1 q = "select * from pgq.get_batch_cursor(%s, %s, %s)" - self.curs.execute(q, [self.queue_name, self.sql_cursor, self.fetch_size]) + self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size]) # this will return first batch of rows q = "fetch %d from batch_walker" % self.fetch_size @@ -167,7 +168,7 @@ class Consumer(skytools.DBScript): self.consumer_id = self.consumer_name def reload(self): - DBScript.reload(self) + skytools.DBScript.reload(self) self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch) # set following ones to None if not set diff --git a/python/pgqadm.py b/python/pgqadm.py index bd1ff8a6..7fdfdc3b 100755 --- a/python/pgqadm.py +++ b/python/pgqadm.py @@ -118,6 +118,10 @@ class PGQAdmin(skytools.DBScript): else: skytools.DBScript.start(self) + def reload(self): + skytools.DBScript.reload(self) + self.set_single_loop(1) + def init_optparse(self, parser=None): p = skytools.DBScript.init_optparse(self, parser) p.set_usage(command_usage.strip()) diff --git a/sql/pgq/functions/pgq.get_batch_cursor.sql b/sql/pgq/functions/pgq.get_batch_cursor.sql index 47746ac3..ec60b6b7 100644 --- a/sql/pgq/functions/pgq.get_batch_cursor.sql +++ b/sql/pgq/functions/pgq.get_batch_cursor.sql @@ -54,11 +54,6 @@ begin return next; end loop; - -- close cursor if all events have been returned - if _rcnt < i_quick_limit then - execute 'close ' || _cname; - end if; - return; end; $$ language plpgsql strict; -- no perms needed -- 2.39.5