python/pgq: more set work
authorMarko Kreen <markokr@gmail.com>
Wed, 23 Apr 2008 15:07:04 +0000 (15:07 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 23 Apr 2008 15:07:04 +0000 (15:07 +0000)
- use worker_name == node_name
- make status command show lag
- pass root tick_time downstream

python/pgq/rawconsumer.py
python/pgq/setadmin.py
python/pgq/setconsumer.py
python/pgq/setinfo.py

index 1ab452fc919055b271c3480d56578056b8f351b0..a43b86b7d1a3089d8620e3284dd6fc4b33726b2c 100644 (file)
@@ -23,11 +23,12 @@ class RawQueue:
         if not self.batch_id:
             return self.batch_id
 
-        q = "select tick_id, prev_tick_id from pgq.get_batch_info(%s)"
+        q = "select tick_id, prev_tick_id, batch_end from pgq.get_batch_info(%s)"
         curs.execute(q, [self.batch_id])
         inf = curs.dictfetchone()
         self.cur_tick = inf['tick_id']
         self.prev_tick = inf['prev_tick_id']
+        self.tick_time = inf['batch_end']
 
         return self.batch_id
 
index 77c07a3eb64c14efbda8ec5dca713cf7337ef8bc..7dba1959609270a27c1cd886621201743d9f2388 100644 (file)
@@ -69,9 +69,6 @@ class SetAdmin(skytools.AdminScript):
         
         self.log.info("Initializing node")
 
-        # fixme
-        worker_name = "%s_%s_worker" % (self.set_name, node_name)
-
         # register member
         if node_type in ('root', 'combined-root'):
             global_watermark = None
@@ -79,12 +76,12 @@ class SetAdmin(skytools.AdminScript):
             provider_name = None
             self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
             provider_db = None
         else:
             root_db = self.find_root_db(provider_loc)
-            set = self.load_root_info(root_db)
+            set = self.load_set_info(root_db)
 
             # check if member already exists
             if set.get_member(node_name) is not None:
@@ -117,8 +114,8 @@ class SetAdmin(skytools.AdminScript):
             # register on provider
             self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)",
-                          [self.set_name, node_name, worker_name])
+            self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s)",
+                          [self.set_name, node_name])
             provider_db.commit()
 
             # initialize node itself
@@ -126,8 +123,8 @@ class SetAdmin(skytools.AdminScript):
                           [self.set_name, node_name, node_location])
             self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, provider_name, provider.location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, worker_name, provider_name,
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, provider_name,
                            global_watermark, combined_set])
             db.commit()
 
@@ -175,16 +172,16 @@ class SetAdmin(skytools.AdminScript):
                 self.log.info("Sub node provider not initialized?")
                 sys.exit(1)
 
-    def load_root_info(self, db):
+    def load_set_info(self, db):
         res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
         info = res[0]
 
         q = "select * from pgq_set.get_member_info(%s)"
-        node_list = self.exec_query(db, q, [self.set_name])
+        member_list = self.exec_query(db, q, [self.set_name])
 
         db.commit()
 
-        return SetInfo(self.set_name, info, node_list)
+        return SetInfo(self.set_name, info, member_list)
 
     def install_code(self, db):
         objs = [
@@ -200,19 +197,29 @@ class SetAdmin(skytools.AdminScript):
 
     def cmd_status(self, args):
         root_db = self.find_root_db()
-        sinf = self.load_root_info(root_db)
+        sinf = self.load_set_info(root_db)
 
         for mname, minf in sinf.member_map.iteritems():
             db = self.get_database('look_db', connstr = minf.location, autocommit = 1)
             curs = db.cursor()
             curs.execute("select * from pgq_set.get_node_info(%s)", [self.set_name])
-            node = NodeInfo(curs.fetchone())
+            node = NodeInfo(self.set_name, curs.fetchone())
+            node.load_status(curs)
+            self.load_extra_status(curs, node)
             sinf.add_node(node)
             self.close_database('look_db')
 
         sinf.print_tree()
 
-    def cmd_switch(self):
+    def load_extra_status(self, curs, node):
+        pass
+
+    def cmd_switch(self, node_name, new_provider):
+        node_db = self.get_node_database(node_name)
+        new_provider_db = self.get_node_database(new_provider)
+        node_info = self.load_set_info(node_db)
+
+        # 
         [['node', 'PAUSE']]
         [['new_parent', 'select * from pgq_set.subscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
         [['node', 'select * from pgq_set.change_provider(%(set_name)s, %(new_provider)s)']]
@@ -220,6 +227,10 @@ class SetAdmin(skytools.AdminScript):
         [['node', 'RESUME']]
 
     def cmd_promote(self):
+        old_root = 'foo'
+        new_root = ''
+        self.pause_node(old_root)
+        ctx = self.load_node_info(old_root)
         [['old-root', 'PAUSE']]
         [['old-root', 'demote, set-provider?']]
         [['new-root', 'wait-for-catch-up']]
index 268cd7a898f1a578ec63ecdf1e471bb82c1e6dee..baa18b28553c66cf75504354c84bacb04a967794 100644 (file)
@@ -27,7 +27,7 @@ class SetConsumer(skytools.DBScript):
 
         dst_node = self.load_node_info(dst_db)
         if self.main_worker:
-            self.consumer_name = dst_node.worker_name
+            self.consumer_name = dst_node.name
             if not dst_node.up_to_date:
                 self.tag_node_uptodate(dst_db)
 
@@ -188,7 +188,7 @@ class SetConsumer(skytools.DBScript):
         mbr_list = curs.dictfetchall()
         db.commit()
 
-        return NodeInfo(node_row, self.main_worker)
+        return NodeInfo(self.set_name, node_row, self.main_worker)
 
     def tag_node_uptodate(self, dst_db):
         dst_curs = dst_db.cursor()
@@ -197,8 +197,8 @@ class SetConsumer(skytools.DBScript):
         dst_db.commit()
 
     def copy_tick(self, dst_curs, src_queue, dst_queue):
-        q = "select * from pgq.ticker(%s, %s)"
-        dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
+        q = "select * from pgq.ticker(%s, %s, %s)"
+        dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick, src_queue.tick_time])
 
     def set_tick_complete(self, dst_curs, tick_id):
         q = "select * from pgq_set.set_completed_tick(%s, %s, %s)"
index 8013e114d9de9d0ffad7509c04aeeaa5df2d867f..da2e1ba91de06e4c60336a94679f9305ea63ddc6 100644 (file)
@@ -31,7 +31,8 @@ class MemberInfo:
         self.dead = row['dead']
 
 class NodeInfo:
-    def __init__(self, row, main_worker = True):
+    def __init__(self, set_name, row, main_worker = True):
+        self.set_name = set_name
         self.member_map = {}
         self.main_worker = main_worker
 
@@ -49,10 +50,11 @@ class NodeInfo:
         self.combined_set = row['combined_set']
         self.combined_type = row['combined_type']
         self.combined_queue = row['combined_queue']
-        self.worker_name = row['worker_name']
 
         self._row = row
 
+        self._info_lines = []
+
     def need_action(self, action_name):
         if not self.main_worker:
             return action_name in ('process-batch', 'process-events')
@@ -82,7 +84,38 @@ class NodeInfo:
         return qname
 
     def get_infolines(self):
-        return ['somestuff = 100.2', '']
+        lst = self._info_lines
+        if self.parent:
+            root = self.parent
+            while root.parent:
+                root = root.parent
+            tick_time = self.parent.consumer_map[self.name]['tick_time']
+            root_time = root.queue_info['now']
+            lag = root_time - tick_time
+        else:
+            lag = self.queue_info['ticker_lag']
+        lst.append("lag: %s" % lag)
+        return lst
+    
+    def add_info_line(self, ln):
+        self._info_lines.append(ln)
+
+    def load_status(self, curs):
+        self.consumer_map = {}
+        self.queue_info = {}
+        if self.queue_name:
+            q = "select consumer_name, current_timestamp - lag as tick_time,"\
+                "  lag, last_seen, last_tick "\
+                "from pgq.get_consumer_info(%s)"
+            curs.execute(q, [self.set_name])
+            for row in curs.fetchall():
+                cname = row['consumer_name']
+                self.consumer_map[cname] = row
+            q = "select current_timestamp - ticker_lag as tick_time,"\
+                "  ticker_lag, current_timestamp as now "\
+                "from pgq.get_queue_info(%s)"
+            curs.execute(q, [self.set_name])
+            self.queue_info = curs.fetchone()
 
 class SetInfo:
     def __init__(self, set_name, info_row, member_rows):
@@ -141,6 +174,9 @@ class SetInfo:
             if node.provider_node:
                 p = self.node_map[node.provider_node]
                 p.child_list.append(node)
+                node.parent = p
+            else:
+                node.parent = None
 
     def _tree_calc(self, node):
         total = len(node.child_list)