self.consumer_map = {}
self.queue_info = {}
self._info_lines = []
+ self.cascaded_consumer_map = {}
self._row = row
if not self.uptodate:
txt += ", NOT UPTODATE"
lst.append(txt)
+
+ for cname, row in self.cascaded_consumer_map.items():
+ err = row['cur_error']
+ if err:
+ lst.append("ERR: %s: %s" % (cname, err))
return lst
def add_info_line(self, ln):
def load_status(self, curs):
self.consumer_map = {}
self.queue_info = {}
+ self.cascaded_consumer_map = {}
if self.queue_name:
q = "select consumer_name, current_timestamp - lag as tick_time,"\
" lag, last_seen, last_tick "\
curs.execute(q, [self.queue_name])
self.queue_info = curs.fetchone()
+ q = "select * from pgq_node.get_consumer_info(%s)"
+ curs.execute(q, [self.queue_name])
+ for row in curs.fetchall():
+ cname = row['consumer_name']
+ self.cascaded_consumer_map[cname] = row
+
class QueueInfo:
"""Info about cascaded queue.