fix pgq brokenness related to recent commits
authorMarko Kreen <markokr@gmail.com>
Tue, 3 Nov 2009 12:26:19 +0000 (14:26 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 3 Nov 2009 12:43:44 +0000 (14:43 +0200)
- get_batch_cursor()
- loop_delay

python/pgq/cascade/admin.py
python/pgq/consumer.py
python/pgqadm.py
sql/pgq/functions/pgq.get_batch_cursor.sql

index 48a582fe4aa373ac4a9b8fa6bd28f7d4bafa12e4..5f42a324159b6561831f6a46f69fa083ef0eeaf2 100644 (file)
@@ -237,7 +237,7 @@ class CascadeAdmin(skytools.AdminScript):
         else:
             loc = self.cf.get(self.initial_db_name)
 
-        while self.looping:
+        while 1:
             db = self.get_database('root_db', connstr = loc)
 
 
@@ -389,6 +389,7 @@ class CascadeAdmin(skytools.AdminScript):
         node_db = self.get_node_database(node)
         qinfo = self.load_queue_info(node_db)
         ninfo = qinfo.local_node
+        node_location = qinfo.get_member(node).location
 
         # reload consumer info
         cmap = self.get_node_consumer_map(node)
@@ -398,8 +399,8 @@ class CascadeAdmin(skytools.AdminScript):
         is_worker = (ninfo.worker_name == consumer)
 
         # fixme: expect the node to be described already
-        #q = "select * from pgq_node.add_member(%s, %s, %s, false)"
-        #self.node_cmd(new_provider, q, [self.queue_name, node_name, node_location])
+        q = "select * from pgq_node.register_location(%s, %s, %s, false)"
+        self.node_cmd(new_provider, q, [self.queue_name, node, node_location])
 
         # subscribe on new provider
         if is_worker:
@@ -729,7 +730,7 @@ class CascadeAdmin(skytools.AdminScript):
         self.node_cmd(node, q, [self.queue_name, consumer, pause_flag])
 
         self.log.info('Waiting for worker to accept')
-        while self.looping:
+        while 1:
             q = "select * from pgq_node.get_consumer_state(%s, %s)"
             stat = self.node_cmd(node, q, [self.queue_name, consumer], quiet = 1)[0]
             if stat['paused'] != pause_flag:
index ef6283d629c956d0bc235625e7a4aa66af9039f4..04915614e1532b6263c7788a822ea4772972646c 100644 (file)
@@ -46,6 +46,7 @@ class _BatchWalker(object):
         self.curs = curs
         self.length = 0
         self.status_map = {}
+        self.batch_id = batch_id
         self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
 
     def __iter__(self):
@@ -54,7 +55,7 @@ class _BatchWalker(object):
         self.fetch_status = 1
 
         q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
-        self.curs.execute(q, [self.queue_name, self.sql_cursor, self.fetch_size])
+        self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size])
         # this will return first batch of rows
 
         q = "fetch %d from batch_walker" % self.fetch_size
@@ -167,7 +168,7 @@ class Consumer(skytools.DBScript):
         self.consumer_id = self.consumer_name
 
     def reload(self):
-        DBScript.reload(self)
+        skytools.DBScript.reload(self)
 
         self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch)
         # set following ones to None if not set
index bd1ff8a656e13f73a244ecdabdaad9a1cbecda79..7fdfdc3b693b60639cd7c48be903b6b42e7a237f 100755 (executable)
@@ -118,6 +118,10 @@ class PGQAdmin(skytools.DBScript):
         else:
             skytools.DBScript.start(self)
 
+    def reload(self):
+        skytools.DBScript.reload(self)
+        self.set_single_loop(1)
+
     def init_optparse(self, parser=None):
         p = skytools.DBScript.init_optparse(self, parser)
         p.set_usage(command_usage.strip())
index 47746ac3f910f875f979c717119b8463f41b7d1d..ec60b6b7bf40e4d3f81bb8d2e7cb804b957c41ad 100644 (file)
@@ -54,11 +54,6 @@ begin
         return next;
     end loop;
 
-    -- close cursor if all events have been returned
-    if _rcnt < i_quick_limit then
-        execute 'close ' || _cname;
-    end if;
-
     return;
 end;
 $$ language plpgsql strict; -- no perms needed