pgq.Consumer - complain on untagged events, instead of sending them to retry queue
authorMarko Kreen <markokr@gmail.com>
Tue, 22 Apr 2008 12:45:35 +0000 (12:45 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 22 Apr 2008 12:45:35 +0000 (12:45 +0000)
python/pgq/consumer.py
python/pgq/event.py

index cf47875c4deb69a85c53d2be9063db701ac47a6c..3b8230317be009f3b93ec3ded428aad9c54d90a9 100644 (file)
@@ -29,6 +29,8 @@ class _WalkerEvent(Event):
         self._walker.tag_event_retry(self, retry_time)
     def tag_failed(self, reason):
         self._walker.tag_failed(self, reason)
+    def get_status(self):
+        self._walker.get_status(self)
 
 class _BatchWalker(object):
     """Lazy iterator over batch events.
@@ -36,8 +38,8 @@ class _BatchWalker(object):
     Events are loaded using cursor.  It will be given
     as ev_list to process_batch(). It allows:
 
-    - one for loop over events
-    - len() after that
+     - one for loop over events
+     - len() after that
     """
     def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
         self.queue_name = queue_name
@@ -68,7 +70,7 @@ class _BatchWalker(object):
             self.length += len(rows)
             for row in rows:
                 ev = _WalkerEvent(self, self.queue_name, row)
-                ev.tag_retry()
+                self.status_map[ev.id] = (EV_UNTAGGED, None)
                 yield ev
 
         self.curs.execute("close %s" % self.sql_cursor)
@@ -89,6 +91,9 @@ class _BatchWalker(object):
     def tag_event_failed(self, event, reason):
         self.status_map[event.id] = (EV_FAILED, reason)
 
+    def get_status(self, event):
+        return self.status_map[event.id][0]
+
     def iter_status(self):
         for res in self.status_map.iteritems():
             yield res
@@ -249,6 +254,8 @@ class Consumer(skytools.DBScript):
                 elif stat[0] == EV_FAILED:
                     self._tag_failed(curs, batch_id, ev_id, stat[1])
                     failed += 1
+                elif stat[0] != EV_DONE:
+                    raise Exception("Untagged event: %d" % ev_id)
         else:
             for ev in list:
                 if ev.status == EV_FAILED:
@@ -257,6 +264,8 @@ class Consumer(skytools.DBScript):
                 elif ev.status == EV_RETRY:
                     self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
                     retry += 1
+                elif stat[0] != EV_DONE:
+                    raise Exception("Untagged event: %d" % ev_id)
 
         # report weird events
         if retry:
index d7b2d7ee60d507edf49968c4660fe8d4adc6da3e..0f14298cd8966ee9e257a3f109e49eb7f9e68238 100644 (file)
@@ -2,9 +2,10 @@
 """PgQ event container.
 """
 
-__all__ = ('EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event')
+__all__ = ['EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event']
 
 # Event status codes
+EV_UNTAGGED = -1
 EV_RETRY = 0
 EV_DONE = 1
 EV_FAILED = 2
@@ -39,7 +40,7 @@ class Event(object):
     """
     def __init__(self, queue_name, row):
         self._event_row = row
-        self.status = EV_RETRY
+        self._status = EV_UNTAGGED
         self.retry_time = 60
         self.fail_reason = "Buggy consumer"
         self.queue_name = queue_name
@@ -48,13 +49,16 @@ class Event(object):
         return self._event_row[_fldmap[key]]
 
     def tag_done(self):
-        self.status = EV_DONE
+        self._status = EV_DONE
 
     def tag_retry(self, retry_time = 60):
-        self.status = EV_RETRY
+        self._status = EV_RETRY
         self.retry_time = retry_time
 
     def tag_failed(self, reason):
-        self.status = EV_FAILED
+        self._status = EV_FAILED
         self.fail_reason = reason
 
+    def get_status(self):
+        return self._status
+