pgq.Consumer: use get_batch_cursor()
authorMarko Kreen <markokr@gmail.com>
Fri, 11 Sep 2009 11:20:48 +0000 (14:20 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 11 Sep 2009 11:20:48 +0000 (14:20 +0300)
python/pgq/consumer.py

index 57a74a3ecc2173c49db935b5fe6505ff97d70527..5a3a029cb7ee043847385d07d48dbdead5b189cb 100644 (file)
@@ -46,8 +46,6 @@ class _BatchWalker(object):
         self.curs = curs
         self.length = 0
         self.status_map = {}
-        curs.execute("select pgq.batch_event_sql(%s)", [batch_id])
-        self.batch_sql = curs.fetchone()[0]
         self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
 
     def __iter__(self):
@@ -55,12 +53,12 @@ class _BatchWalker(object):
             raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
         self.fetch_status = 1
 
-        q = "declare %s no scroll cursor for %s" % (self.sql_cursor, self.batch_sql)
-        self.curs.execute(q)
+        q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
+        self.curs.execute(q, [self.queue_name, self.sql_cursor, self.fetch_size])
+        # this will return first batch of rows
 
         q = "fetch %d from batch_walker" % self.fetch_size
         while 1:
-            self.curs.execute(q)
             rows = self.curs.dictfetchall()
             if not len(rows):
                 break
@@ -70,6 +68,13 @@ class _BatchWalker(object):
                 ev = _WalkerEvent(self, self.queue_name, row)
                 yield ev
 
+            # if less rows than requested, it was final block
+            if len(rows) < self.fetch_size:
+                break
+
+            # request next block of rows
+            self.curs.execute(q)
+
         self.curs.execute("close %s" % self.sql_cursor)
 
         self.fetch_status = 2