self._walker.tag_event_retry(self, retry_time)
def tag_failed(self, reason):
self._walker.tag_failed(self, reason)
+ def get_status(self):
+ self._walker.get_status(self)
class _BatchWalker(object):
"""Lazy iterator over batch events.
Events are loaded using cursor. It will be given
as ev_list to process_batch(). It allows:
- - one for loop over events
- - len() after that
+ - one for loop over events
+ - len() after that
"""
def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
self.queue_name = queue_name
self.length += len(rows)
for row in rows:
ev = _WalkerEvent(self, self.queue_name, row)
- ev.tag_retry()
+ self.status_map[ev.id] = (EV_UNTAGGED, None)
yield ev
self.curs.execute("close %s" % self.sql_cursor)
def tag_event_failed(self, event, reason):
self.status_map[event.id] = (EV_FAILED, reason)
+ def get_status(self, event):
+ return self.status_map[event.id][0]
+
def iter_status(self):
for res in self.status_map.iteritems():
yield res
elif stat[0] == EV_FAILED:
self._tag_failed(curs, batch_id, ev_id, stat[1])
failed += 1
+ elif stat[0] != EV_DONE:
+ raise Exception("Untagged event: %d" % ev_id)
else:
for ev in list:
if ev.status == EV_FAILED:
elif ev.status == EV_RETRY:
self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
retry += 1
+ elif stat[0] != EV_DONE:
+ raise Exception("Untagged event: %d" % ev_id)
# report weird events
if retry:
"""PgQ event container.
"""
-__all__ = ('EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event')
+__all__ = ['EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event']
# Event status codes
+EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1
EV_FAILED = 2
"""
def __init__(self, queue_name, row):
self._event_row = row
- self.status = EV_RETRY
+ self._status = EV_UNTAGGED
self.retry_time = 60
self.fail_reason = "Buggy consumer"
self.queue_name = queue_name
return self._event_row[_fldmap[key]]
def tag_done(self):
- self.status = EV_DONE
+ self._status = EV_DONE
def tag_retry(self, retry_time = 60):
- self.status = EV_RETRY
+ self._status = EV_RETRY
self.retry_time = retry_time
def tag_failed(self, reason):
- self.status = EV_FAILED
+ self._status = EV_FAILED
self.fail_reason = reason
+ def get_status(self):
+ return self._status
+