self.curs = curs
self.length = 0
self.status_map = {}
- curs.execute("select pgq.batch_event_sql(%s)", [batch_id])
- self.batch_sql = curs.fetchone()[0]
self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
def __iter__(self):
raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
self.fetch_status = 1
- q = "declare %s no scroll cursor for %s" % (self.sql_cursor, self.batch_sql)
- self.curs.execute(q)
+ q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
+ self.curs.execute(q, [self.queue_name, self.sql_cursor, self.fetch_size])
+ # this will return first batch of rows
q = "fetch %d from batch_walker" % self.fetch_size
while 1:
- self.curs.execute(q)
rows = self.curs.dictfetchall()
if not len(rows):
break
ev = _WalkerEvent(self, self.queue_name, row)
yield ev
+ # if less rows than requested, it was final block
+ if len(rows) < self.fetch_size:
+ break
+
+ # request next block of rows
+ self.curs.execute(q)
+
self.curs.execute("close %s" % self.sql_cursor)
self.fetch_status = 2