From 0f571a13fed0fe8adffba9ee545b9add46075f34 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 22 Apr 2008 12:45:35 +0000 Subject: [PATCH] pgq.Consumer - complain on untagged events, instead of sending them to retry queue --- python/pgq/consumer.py | 15 ++++++++++++--- python/pgq/event.py | 14 +++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index cf47875c..3b823031 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -29,6 +29,8 @@ class _WalkerEvent(Event): self._walker.tag_event_retry(self, retry_time) def tag_failed(self, reason): self._walker.tag_failed(self, reason) + def get_status(self): + self._walker.get_status(self) class _BatchWalker(object): """Lazy iterator over batch events. @@ -36,8 +38,8 @@ class _BatchWalker(object): Events are loaded using cursor. It will be given as ev_list to process_batch(). It allows: - - one for loop over events - - len() after that + - one for loop over events + - len() after that """ def __init__(self, curs, batch_id, queue_name, fetch_size = 300): self.queue_name = queue_name @@ -68,7 +70,7 @@ class _BatchWalker(object): self.length += len(rows) for row in rows: ev = _WalkerEvent(self, self.queue_name, row) - ev.tag_retry() + self.status_map[ev.id] = (EV_UNTAGGED, None) yield ev self.curs.execute("close %s" % self.sql_cursor) @@ -89,6 +91,9 @@ class _BatchWalker(object): def tag_event_failed(self, event, reason): self.status_map[event.id] = (EV_FAILED, reason) + def get_status(self, event): + return self.status_map[event.id][0] + def iter_status(self): for res in self.status_map.iteritems(): yield res @@ -249,6 +254,8 @@ class Consumer(skytools.DBScript): elif stat[0] == EV_FAILED: self._tag_failed(curs, batch_id, ev_id, stat[1]) failed += 1 + elif stat[0] != EV_DONE: + raise Exception("Untagged event: %d" % ev_id) else: for ev in list: if ev.status == EV_FAILED: @@ -257,6 +264,8 @@ class Consumer(skytools.DBScript): elif ev.status == EV_RETRY: self._tag_retry(curs, batch_id, ev.id, ev.retry_time) retry += 1 + elif stat[0] != EV_DONE: + raise Exception("Untagged event: %d" % ev_id) # report weird events if retry: diff --git a/python/pgq/event.py b/python/pgq/event.py index d7b2d7ee..0f14298c 100644 --- a/python/pgq/event.py +++ b/python/pgq/event.py @@ -2,9 +2,10 @@ """PgQ event container. """ -__all__ = ('EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event') +__all__ = ['EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event'] # Event status codes +EV_UNTAGGED = -1 EV_RETRY = 0 EV_DONE = 1 EV_FAILED = 2 @@ -39,7 +40,7 @@ class Event(object): """ def __init__(self, queue_name, row): self._event_row = row - self.status = EV_RETRY + self._status = EV_UNTAGGED self.retry_time = 60 self.fail_reason = "Buggy consumer" self.queue_name = queue_name @@ -48,13 +49,16 @@ class Event(object): return self._event_row[_fldmap[key]] def tag_done(self): - self.status = EV_DONE + self._status = EV_DONE def tag_retry(self, retry_time = 60): - self.status = EV_RETRY + self._status = EV_RETRY self.retry_time = retry_time def tag_failed(self, reason): - self.status = EV_FAILED + self._status = EV_FAILED self.fail_reason = reason + def get_status(self): + return self._status + -- 2.39.5