cascade: more takeover work
authorMarko Kreen <markokr@gmail.com>
Fri, 29 May 2009 11:46:01 +0000 (14:46 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 1 Jun 2009 07:13:48 +0000 (10:13 +0300)
python/londiste.py
python/pgq/cascade/admin.py
python/pgq/cascade/nodeinfo.py
tests/londiste/regen.sh

index 8b783c13933fd886a806cd27c1542671dfc1b60a..822df9e4e1ad3f0347ef9dcc12c49a32c26b31ab 100755 (executable)
@@ -40,7 +40,7 @@ Internal Commands:
 cmd_handlers = (
     (('create-root', 'create-branch', 'create-leaf', 'members', 'tag-dead', 'tag-alive',
       'change-provider', 'rename-node', 'status', 'pause', 'resume',
-      'switchover', 'failover', 'drop-node', 'takeover'), londiste.LondisteSetup),
+      'drop-node', 'takeover'), londiste.LondisteSetup),
     (('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs',
       'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup),
     (('worker', 'replay'), londiste.Replicator),
@@ -92,8 +92,12 @@ class Londiste(skytools.DBScript):
                 help = "switchover: target node")
         g.add_option("--merge",
                 help = "create-leaf: combined queue name")
-        g.add_option("--dead", action = 'store_true',
-                help = "takeover: old one is dead")
+        g.add_option("--dead", action = 'append',
+                help = "cascade: assume node is dead")
+        g.add_option("--dead-root", action = 'store_true',
+                help = "takeover: old node was root")
+        g.add_option("--dead-branch", action = 'store_true',
+                help = "takeover: old node was branch")
         p.add_option_group(g)
         return p
 
index 9d9c485fc2c342580a665e74fc671d4b51e6798b..60378de711da254767407c9e2ec328b2d7660468 100644 (file)
@@ -88,10 +88,14 @@ class CascadeAdmin(skytools.AdminScript):
         g.add_option("--consumer",
                      help = "specify consumer name")
         g.add_option("--target",
-                    help = "switchover: specify replacement node")
+                    help = "takeover: specify node to take over")
         g.add_option("--merge",
                     help = "create-node: combined queue name")
-        g.add_option("--dead", action="store_true",
+        g.add_option("--dead", action="append",
+                    help = "tag some node as dead")
+        g.add_option("--dead-root", action="store_true",
+                    help = "tag some node as dead")
+        g.add_option("--dead-branch", action="store_true",
                     help = "tag some node as dead")
         p.add_option_group(g)
         return p
@@ -319,20 +323,27 @@ class CascadeAdmin(skytools.AdminScript):
 
     def cmd_status(self):
         """Show set status."""
-        root_db = self.find_root_db()
-        sinf = self.load_queue_info(root_db)
+        self.load_local_info()
+
+        for mname, minf in self.queue_info.member_map.iteritems():
+            #inf = self.get_node_info(mname)
+            #self.queue_info.add_node(inf)
+            #continue
 
-        for mname, minf in sinf.member_map.iteritems():
+            if not self.node_alive(mname):
+                node = NodeInfo(self.queue_name, None, node_name = mname)
+                self.queue_info.add_node(node)
+                continue
             db = self.get_database('look_db', connstr = minf.location, autocommit = 1)
             curs = db.cursor()
             curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name])
             node = NodeInfo(self.queue_name, curs.fetchone())
             node.load_status(curs)
             self.load_extra_status(curs, node)
-            sinf.add_node(node)
+            self.queue_info.add_node(node)
             self.close_database('look_db')
 
-        sinf.print_tree()
+        self.queue_info.print_tree()
 
     def load_extra_status(self, curs, node):
         """Fetch extra info."""
@@ -510,33 +521,34 @@ class CascadeAdmin(skytools.AdminScript):
                 return info
 
 
-    def takeover_root(self, old_info, new_info, failover = False):
+    def takeover_root(self, old_node_name, new_node_name, failover = False):
         """Root switchover."""
-        old = old_info.name
-        new = new_info.name
 
-        if self.node_alive(old):
-            self.pause_node(old)
-            self.demote_node(old, 1, new)
-            last_tick = self.demote_node(old, 2, new)
-            self.wait_for_catchup(new, last_tick)
+        new_info = self.get_node_info(new_node_name)
+        old_info = None
 
-        self.pause_node(new)
-        self.promote_branch(new)
+        if self.node_alive(old_node_name):
+            old_info = self.get_node_info(old_node_name)
+            self.pause_node(old_node_name)
+            self.demote_node(old_node_name, 1, new_node_name)
+            last_tick = self.demote_node(old_node_name, 2, new_node_name)
+            self.wait_for_catchup(new_node_name, last_tick)
 
-        #self.subscribe_node(new, old, old_info.completed_tick)
-        q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
-        self.node_cmd(new, q, [self.queue_name, old, old_info.worker_name, last_tick])
+        self.pause_node(new_node_name)
+        self.promote_branch(new_node_name)
+
+        if self.node_alive(old_node_name):
+            q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
+            self.node_cmd(new_node_name, q, [self.queue_name, old_node_name, old_info.worker_name, last_tick])
 
-        #self.unsubscribe_node(new_node.parent_node, new_node.name)
         q = "select * from pgq_node.unregister_subscriber(%s, %s)"
-        self.node_cmd(new_info.provider_node, q, [self.queue_name, new])
+        self.node_cmd(new_info.provider_node, q, [self.queue_name, new_node_name])
 
-        self.resume_node(new)
+        self.resume_node(new_node_name)
 
-        if self.node_alive(old):
-            self.demote_node(old, 3, new)
-            self.resume_node(old)
+        if self.node_alive(old_node_name):
+            self.demote_node(old_node_name, 3, new_node_name)
+            self.resume_node(old_node_name)
 
     def takeover_nonroot(self, old_node_name, new_node_name, failover):
         """Non-root switchover."""
@@ -563,6 +575,9 @@ class CascadeAdmin(skytools.AdminScript):
         if not old_node_name:
             raise UsageError('old node not given')
 
+        if old_node_name not in self.queue_info.member_map:
+            raise UsageError('Unknown node: %s' % old_node_name)
+
         if self.options.dead_root:
             otype = 'root'
             failover = True
@@ -669,7 +684,7 @@ class CascadeAdmin(skytools.AdminScript):
             if not m:
                 self.log.error("get_node_database: cannot resolve %s" % node_name)
                 sys.exit(1)
-            self.log.info("%s: dead=%s" % (m.name, m.dead))
+            #self.log.info("%s: dead=%s" % (m.name, m.dead))
             if m.dead:
                 return None
             loc = m.location
@@ -679,10 +694,13 @@ class CascadeAdmin(skytools.AdminScript):
     def node_alive(self, node_name):
         m = self.queue_info.get_member(node_name)
         if not m:
-            return False
-        if m.dead:
-            return False
-        return True
+            res = False
+        elif m.dead:
+            res = False
+        else:
+            res = True
+        #self.log.warning('node_alive(%s) = %s' % (node_name, res))
+        return res
 
     def close_node_database(self, node_name):
         """Disconnect node's connection."""
@@ -781,7 +799,12 @@ class CascadeAdmin(skytools.AdminScript):
         q = "select * from pgq_node.get_queue_locations(%s)"
         member_list = self.exec_query(db, q, [self.queue_name])
 
-        return QueueInfo(self.queue_name, info, member_list)
+        qinf = QueueInfo(self.queue_name, info, member_list)
+        if self.options.dead:
+            for node in self.options.dead:
+                self.log.info("Assuming node '%s' as dead" % node)
+                qinf.tag_dead(node)
+        return qinf
 
     def get_node_subscriber_list(self, node_name):
         """Fetch subscriber list from a node."""
index 9bd1cead077be51ae5d0261a77939bc3a1687a2a..44e988a33ba0ad6d47d0a2c7fd957469c7685219 100644 (file)
@@ -19,11 +19,38 @@ class MemberInfo:
 
 class NodeInfo:
     """Detailed info about set node."""
-    def __init__(self, queue_name, row, main_worker = True):
+
+    name = None
+    type = None
+    global_watermark = None
+    local_watermark = None
+    completed_tick = None
+    provider_node = None
+    provider_location = None
+    consumer_name = None #?
+    worker_name = None #?
+    paused = False
+    uptodate = True
+    combined_queue = None
+    combined_type = None
+
+    def __init__(self, queue_name, row, main_worker = True, node_name = None):
         self.queue_name = queue_name
         self.member_map = {}
         self.main_worker = main_worker
 
+        self.parent = None
+        self.consumer_map = {}
+        self.queue_info = {}
+        self._info_lines = []
+
+        self._row = row
+
+        if not row:
+            self.name = node_name
+            self.type = 'dead'
+            return
+
         self.name = row['node_name']
         self.type = row['node_type']
         self.global_watermark = row['global_watermark']
@@ -38,14 +65,6 @@ class NodeInfo:
         self.combined_queue = row['combined_queue']
         self.combined_type = row['combined_type']
 
-        self.parent = None
-        self.consumer_map = {}
-        self.queue_info = {}
-
-        self._row = row
-
-        self._info_lines = []
-
     def __get_target_queue(self):
         qname = None
         if self.type == LEAF:
@@ -69,11 +88,17 @@ class NodeInfo:
             root = self.parent
             while root.parent:
                 root = root.parent
-            tick_time = self.parent.consumer_map[self.consumer_name]['tick_time']
-            root_time = root.queue_info['now']
-            lag = root_time - tick_time
-        else:
+            cinfo = self.parent.consumer_map.get(self.consumer_name)
+            if cinfo and root.queue_info:
+                tick_time = cinfo['tick_time']
+                root_time = root.queue_info['now']
+                lag = root_time - tick_time
+            else:
+                lag = "(n/a?)"
+        elif self.queue_info:
             lag = self.queue_info['ticker_lag']
+        else:
+            lag = "(n/a)"
         txt = "lag: %s" % lag
         if self.paused:
             txt += ", PAUSED"
index 9db4df8c033b5f285f49cd25f037b0aab31c713f..56d23e72c511977a92f895bffe034268543b9ff2 100755 (executable)
@@ -2,8 +2,12 @@
 
 . ../testlib.sh
 
+v=-q
+
 db_list="db1 db2 db3 db4 db5"
 
+( cd ../..; make -s install )
+
 echo " * create configs * "
 
 # create ticker conf
@@ -29,12 +33,14 @@ pidfile = pid/%(job_name)s.pid
 EOF
 done
 
+psql -d template1 -c 'drop database if exists db2x'
+createdb db2
+
 for db in $db_list; do
   cleardb $db
 done
 
-echo "clean logs"
-rm -f log/*.log
+clearlogs
 
 set -e
 
@@ -80,37 +86,51 @@ for db in db2 db3 db4 db5; do
 done
 run sleep 20
 
+if false; then
+
 msg "Add column on root"
 run cat ddl.sql
 run londiste $v conf/londiste_db1.ini execute ddl.sql
-
 msg "Insert data into new column"
 for n in 5 6 7 8; do
   run psql -d db1 -c "insert into mytable values ($n, 'row$n', 'data2')"
 done
 msg "Wait a bit"
 run sleep 20
+msg "Check table structure"
+run psql -d db5 -c '\d mytable'
+run psql -d db5 -c 'select * from mytable'
+run sleep 5
+../zcheck.sh
+fi
 
-run psql -d db3 -c '\d mytable'
-run psql -d db3 -c 'select * from mytable'
+msg "Change provider"
+run londiste $v conf/londiste_db4.ini status
+run londiste $v conf/londiste_db4.ini change-provider --provider=node3
+run londiste $v conf/londiste_db4.ini status
+run londiste $v conf/londiste_db5.ini change-provider --provider=node2
+run londiste $v conf/londiste_db5.ini status
 
-run sleep 10
-../zcheck.sh
 msg "Change topology"
 run londiste $v conf/londiste_db1.ini status
-run londiste $v conf/londiste_db3.ini change-provider --provider=node1
-run londiste $v conf/londiste_db1.ini status
-run londiste $v conf/londiste_db1.ini switchover --target=node2
-run londiste $v conf/londiste_db1.ini status
+run londiste $v conf/londiste_db3.ini takeover node2
+run londiste $v conf/londiste_db2.ini status
+run londiste $v conf/londiste_db2.ini takeover node1
+run londiste $v conf/londiste_db2.ini status
 
 run sleep 10
 ../zcheck.sh
 
-msg "Change topology"
-ps aux | grep "postres[:].* db2 " | awk '{print $2}' | xargs -r kill
+msg "Change topology / failover"
+ps aux | grep 'postgres[:].* db2 ' | awk '{print $2}' | xargs -r kill
+sleep 3
+ps aux | grep 'postgres[:].* db2 ' | awk '{print $2}' | xargs -r kill -9
+sleep 3
 run psql -d db1 -c 'alter database db2 rename to db2x'
-run londiste $v conf/londiste_db4.ini takeover db2 --dead
-run londiste $v conf/londiste_db1.ini status
+run londiste $v conf/londiste_db1.ini status --dead=node2
+run londiste $v conf/londiste_db3.ini takeover db2 --dead-root || true
+run londiste $v conf/londiste_db3.ini takeover node2 --dead-root
+run londiste $v conf/londiste_db1.ini status --dead=node2
 
 run sleep 10
 ../zcheck.sh