From 38016a166f0ae014bd6f341f61915eea0d08720a Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 11 Sep 2009 14:20:48 +0300 Subject: [PATCH] pgq.Consumer: use get_batch_cursor() --- python/pgq/consumer.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 57a74a3e..5a3a029c 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -46,8 +46,6 @@ class _BatchWalker(object): self.curs = curs self.length = 0 self.status_map = {} - curs.execute("select pgq.batch_event_sql(%s)", [batch_id]) - self.batch_sql = curs.fetchone()[0] self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done def __iter__(self): @@ -55,12 +53,12 @@ class _BatchWalker(object): raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status) self.fetch_status = 1 - q = "declare %s no scroll cursor for %s" % (self.sql_cursor, self.batch_sql) - self.curs.execute(q) + q = "select * from pgq.get_batch_cursor(%s, %s, %s)" + self.curs.execute(q, [self.queue_name, self.sql_cursor, self.fetch_size]) + # this will return first batch of rows q = "fetch %d from batch_walker" % self.fetch_size while 1: - self.curs.execute(q) rows = self.curs.dictfetchall() if not len(rows): break @@ -70,6 +68,13 @@ class _BatchWalker(object): ev = _WalkerEvent(self, self.queue_name, row) yield ev + # if less rows than requested, it was final block + if len(rows) < self.fetch_size: + break + + # request next block of rows + self.curs.execute(q) + self.curs.execute("close %s" % self.sql_cursor) self.fetch_status = 2 -- 2.39.5