nodeinfo: show failed consumers
authorMarko Kreen <markokr@gmail.com>
Thu, 10 Sep 2009 14:54:34 +0000 (17:54 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 10 Sep 2009 14:54:34 +0000 (17:54 +0300)
python/pgq/cascade/nodeinfo.py

index 44e988a33ba0ad6d47d0a2c7fd957469c7685219..97d24f36062938a97f9a4b58eb4998ed262ae48c 100644 (file)
@@ -43,6 +43,7 @@ class NodeInfo:
         self.consumer_map = {}
         self.queue_info = {}
         self._info_lines = []
+        self.cascaded_consumer_map = {}
 
         self._row = row
 
@@ -105,6 +106,11 @@ class NodeInfo:
         if not self.uptodate:
             txt += ", NOT UPTODATE"
         lst.append(txt)
+
+        for cname, row in self.cascaded_consumer_map.items():
+            err = row['cur_error']
+            if err:
+                lst.append("ERR: %s: %s" % (cname, err))
         return lst
     
     def add_info_line(self, ln):
@@ -113,6 +119,7 @@ class NodeInfo:
     def load_status(self, curs):
         self.consumer_map = {}
         self.queue_info = {}
+        self.cascaded_consumer_map = {}
         if self.queue_name:
             q = "select consumer_name, current_timestamp - lag as tick_time,"\
                 "  lag, last_seen, last_tick "\
@@ -127,6 +134,12 @@ class NodeInfo:
             curs.execute(q, [self.queue_name])
             self.queue_info = curs.fetchone()
 
+            q = "select * from pgq_node.get_consumer_info(%s)"
+            curs.execute(q, [self.queue_name])
+            for row in curs.fetchall():
+                cname = row['consumer_name']
+                self.cascaded_consumer_map[cname] = row
+
 class QueueInfo:
     """Info about cascaded queue.