cascade: takeover
authorMarko Kreen <markokr@gmail.com>
Fri, 24 Apr 2009 14:33:39 +0000 (17:33 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 1 Jun 2009 07:13:47 +0000 (10:13 +0300)
Instead of switchover/failover commands have 'takeover' command,
to be launched from new node.

python/londiste.py
python/pgq/cascade/admin.py
python/pgq/cascade/nodeinfo.py
tests/londiste/init.sh
tests/londiste/regen.sh

index 8f284b2bc3a85baa6823d38a48984803ec05b88c..8b783c13933fd886a806cd27c1542671dfc1b60a 100755 (executable)
@@ -40,7 +40,7 @@ Internal Commands:
 cmd_handlers = (
     (('create-root', 'create-branch', 'create-leaf', 'members', 'tag-dead', 'tag-alive',
       'change-provider', 'rename-node', 'status', 'pause', 'resume',
-      'switchover', 'failover', 'drop-node'), londiste.LondisteSetup),
+      'switchover', 'failover', 'drop-node', 'takeover'), londiste.LondisteSetup),
     (('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs',
       'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup),
     (('worker', 'replay'), londiste.Replicator),
@@ -92,6 +92,8 @@ class Londiste(skytools.DBScript):
                 help = "switchover: target node")
         g.add_option("--merge",
                 help = "create-leaf: combined queue name")
+        g.add_option("--dead", action = 'store_true',
+                help = "takeover: old one is dead")
         p.add_option_group(g)
         return p
 
index f37027687fa6b1c6f6d6b6a7deeebade4d45d4ed..9d9c485fc2c342580a665e74fc671d4b51e6798b 100644 (file)
@@ -42,6 +42,7 @@ Works, naming problems:
   status                Show set state      [set-status]
   members               Show members in set [nodes]
   switchover --target NODE [--all]
+  takeover FROMNODE [--all] [--dead]
 
 Broken:
 
@@ -59,6 +60,7 @@ class CascadeAdmin(skytools.AdminScript):
     queue_info = None
     extra_objs = []
     local_node = None
+    root_node_name = None
 
     def __init__(self, svc_name, dbname, args, worker_setup = False):
         skytools.AdminScript.__init__(self, svc_name, args)
@@ -89,6 +91,8 @@ class CascadeAdmin(skytools.AdminScript):
                     help = "switchover: specify replacement node")
         g.add_option("--merge",
                     help = "create-node: combined queue name")
+        g.add_option("--dead", action="store_true",
+                    help = "tag some node as dead")
         p.add_option_group(g)
         return p
 
@@ -108,6 +112,10 @@ class CascadeAdmin(skytools.AdminScript):
     # Node initialization.
     #
 
+    def cmd_install(self):
+        db = self.get_database("db")
+        self.install_code(db)
+
     def cmd_create_root(self, node_name, node_location):
         return self.create_node('root', node_name, node_location)
 
@@ -241,6 +249,7 @@ class CascadeAdmin(skytools.AdminScript):
             # configured db may not be root anymore, walk upwards then
             if node_type in ('root', 'combined-root'):
                 db.commit()
+                self.root_node_name = info['node_name']
                 return db
 
             self.close_database('root_db')
@@ -252,6 +261,10 @@ class CascadeAdmin(skytools.AdminScript):
                 sys.exit(1)
         raise Exception('process canceled')
 
+    def find_root_node(self):
+        self.find_root_db()
+        return self.root_node_name
+
     def find_consumer_check(self, node, consumer):
         cmap = self.get_node_consumer_map(node)
         return (consumer in cmap)
@@ -471,7 +484,8 @@ class CascadeAdmin(skytools.AdminScript):
         """Downgrade old root?"""
         q = "select * from pgq_node.demote_root(%s, %s, %s)"
         res = self.node_cmd(oldnode, q, [self.queue_name, step, newnode])
-        return res[0]['last_tick']
+        if res:
+            return res[0]['last_tick']
 
     def promote_branch(self, node):
         """Promote old branch as root."""
@@ -496,18 +510,16 @@ class CascadeAdmin(skytools.AdminScript):
                 return info
 
 
-    def switchover_root(self, old_info, new_info):
+    def takeover_root(self, old_info, new_info, failover = False):
         """Root switchover."""
         old = old_info.name
         new = new_info.name
 
-        self.pause_node(old)
-
-        self.demote_node(old, 1, new)
-
-        last_tick = self.demote_node(old, 2, new)
-
-        self.wait_for_catchup(new, last_tick)
+        if self.node_alive(old):
+            self.pause_node(old)
+            self.demote_node(old, 1, new)
+            last_tick = self.demote_node(old, 2, new)
+            self.wait_for_catchup(new, last_tick)
 
         self.pause_node(new)
         self.promote_branch(new)
@@ -522,49 +534,101 @@ class CascadeAdmin(skytools.AdminScript):
 
         self.resume_node(new)
 
-        self.demote_node(old, 3, new)
+        if self.node_alive(old):
+            self.demote_node(old, 3, new)
+            self.resume_node(old)
 
-        self.resume_node(old)
-
-    def switchover_nonroot(self, old_node, new_node):
+    def takeover_nonroot(self, old_node_name, new_node_name, failover):
         """Non-root switchover."""
-        if self.node_depends(new_node.name, old_node.name):
+        if self.node_depends(new_node_name, old_node_name):
             # yes, old_node is new_nodes provider,
             # switch it around
-            self.node_change_provider(new_node.name, old_node.provider_node)
+            pnode = self.find_provider(old_node_name)
+            self.node_change_provider(new_node_name, pnode)
 
-        self.node_change_provider(old_node.name, new_node.name)
+        self.node_change_provider(old_node_name, new_node_name)
 
-    def cmd_switchover(self):
+    def cmd_takeover(self, old_node_name):
         """Generic node switchover."""
+        self.log.info("old: %s" % old_node_name)
         self.load_local_info()
-        old_node_name = self.options.node
-        new_node_name = self.options.target
-        if not old_node_name:
+        new_node_name = self.options.node
+        if not new_node_name:
             worker = self.options.consumer
             if not worker:
-                raise Exception('old node not given')
+                raise UsageError('old node not given')
             if self.queue_info.local_node.worker_name != worker:
-                raise Exception('old node not given')
-            old_node_name = self.local_node
-        if not new_node_name:
-            raise Exception('new node not given')
-        old_node = self.get_node_info(old_node_name)
-        new_node = self.get_node_info(new_node_name)
+                raise UsageError('old node not given')
+            new_node_name = self.local_node
+        if not old_node_name:
+            raise UsageError('old node not given')
+
+        if self.options.dead_root:
+            otype = 'root'
+            failover = True
+        elif self.options.dead_branch:
+            otype = 'branch'
+            failover = True
+        else:
+            onode = self.get_node_info(old_node_name)
+            otype = onode.type
+            failover = False
+
+        if failover:
+            self.cmd_tag_dead(old_node_name)
 
-        if old_node.name == new_node.name:
+        new_node = self.get_node_info(new_node_name)
+        if old_node_name == new_node.name:
             self.log.info("same node?")
             return
 
-        if old_node.type == 'root':
-            self.switchover_root(old_node, new_node)
+        if otype == 'root':
+            self.takeover_root(old_node_name, new_node_name, failover)
         else:
-            self.switchover_nonroot(old_node, new_node)
+            self.takeover_nonroot(old_node_name, new_node_name, failover)
 
         # switch subscribers around
-        if self.options.all:
-            for n in self.get_node_subscriber_list(old_node.name):
-                self.node_change_provider(n, new_node.name)
+        if self.options.all or failover:
+            for n in self.find_subscribers_for(old_node_name):
+                self.node_change_provider(n, new_node_name)
+
+    def find_provider(self, node_name):
+        if self.node_alive(node_name):
+            info = self.get_node_info(node_name)
+            return info.provider_name
+        nodelist = self.queue_info.member_map.keys()
+        for n in nodelist:
+            if n == node_name:
+                continue
+            if not self.node_alive(n):
+                continue
+            if node_name in self.get_node_subscriber_list(n):
+                return n
+        return self.find_root_node()
+
+    def find_subscribers_for(self, parent_node_name):
+        """Find root node, having start point."""
+
+        # use dict to eliminate duplicates
+        res = {}
+
+        nodelist = self.queue_info.member_map.keys()
+        for node_name in nodelist:
+            if node_name == parent_node_name:
+                continue
+            if not self.node_alive(node_name):
+                continue
+            n = self.get_node_info(node_name)
+            if not n:
+                continue
+            if n.provider_node == parent_node_name:
+                res[n.name] = 1
+        return res.keys()
+
+    def cmd_tag_dead(self, node_name):
+        # todo: write to db
+        self.log.info("Tagging node '%s' as dead" % node_name)
+        self.queue_info.tag_dead(node_name)
 
     def cmd_pause(self):
         """Pause a node"""
@@ -605,10 +669,21 @@ class CascadeAdmin(skytools.AdminScript):
             if not m:
                 self.log.error("get_node_database: cannot resolve %s" % node_name)
                 sys.exit(1)
+            self.log.info("%s: dead=%s" % (m.name, m.dead))
+            if m.dead:
+                return None
             loc = m.location
             db = self.get_database('node.' + node_name, connstr = loc)
         return db
 
+    def node_alive(self, node_name):
+        m = self.queue_info.get_member(node_name)
+        if not m:
+            return False
+        if m.dead:
+            return False
+        return True
+
     def close_node_database(self, node_name):
         """Disconnect node's connection."""
         if node_name == self.queue_info.local_node.name:
@@ -619,6 +694,10 @@ class CascadeAdmin(skytools.AdminScript):
     def node_cmd(self, node_name, sql, args, quiet = False):
         """Execute SQL command on particular node."""
         db = self.get_node_database(node_name)
+        if not db:
+            self.log.warning("ignoring cmd for dead node '%s': %s" % (
+                node_name, skytools.quote_statement(sql, args)))
+            return None
         return self.exec_cmd(db, sql, args, quiet = quiet)
 
     #
@@ -662,7 +741,8 @@ class CascadeAdmin(skytools.AdminScript):
     def resume_node(self, node):
         """Shortcut for resuming by name."""
         state = self.get_node_info(node)
-        self.resume_consumer(node, state.worker_name)
+        if state:
+            self.resume_consumer(node, state.worker_name)
 
     def subscribe_node(self, target_node, subscriber_node, tick_pos):
         """Subscribing one node to another."""
@@ -686,6 +766,9 @@ class CascadeAdmin(skytools.AdminScript):
     def load_node_info(self, node_name):
         """Non-cached node info lookup."""
         db = self.get_node_database(node_name)
+        if not db:
+            self.log.warning('load_node_info(%s): ignoring dead node' % node_name)
+            return None
         q = "select * from pgq_node.get_node_info(%s)"
         rows = self.exec_query(db, q, [self.queue_name])
         return NodeInfo(self.queue_name, rows[0])
index 6d4017926fbd43627cd9e41cbcb46915d03a6dee..6b661c07b1536f98edcf3a4256115bed84d77d30 100644 (file)
@@ -125,6 +125,13 @@ class QueueInfo:
     def add_node(self, node):
         self.node_map[node.name] = node
 
+    def tag_dead(self, node_name):
+        if node_name in self.node_map:
+            self.member_map[node_name].dead = True
+        else:
+            row = {'node_name': node_name, 'node_location': None, 'dead': True}
+            m = MemberInfo(row)
+            self.member_map[node_name] = m
     #
     # Rest is about printing the tree
     #
index b6519723bb692e2eac99c4b38911092fe70f2af3..bd022d1afbfc258e1224088c7b5de2c6f6d242f5 100755 (executable)
@@ -2,9 +2,9 @@
 
 . ../env.sh
 
-lst="db1 db2 db3 db4"
+lst="db1 db2 db3 db4 db5"
 
-for db in $lst; do
+for db in $lst db2x; do
   echo dropdb $db
   dropdb $db
 done
index 6017fbcf3f6af20c59697fc971eff5dac1c8c555..f0646f6a6693523e7f627eb6896e9cce77ee475b 100755 (executable)
@@ -35,7 +35,7 @@ msg() {
   echo "##"
 }
 
-db_list="db1 db2 db3 db4"
+db_list="db1 db2 db3 db4 db5"
 
 echo " * create configs * "
 
@@ -88,6 +88,7 @@ run londiste $v conf/londiste_db1.ini create-root node1 'dbname=db1'
 run londiste $v conf/londiste_db2.ini create-branch node2 'dbname=db2' --provider='dbname=db1'
 run londiste $v conf/londiste_db3.ini create-branch node3 'dbname=db3' --provider='dbname=db1'
 run londiste $v conf/londiste_db4.ini create-leaf node4 'dbname=db4' --provider='dbname=db2'
+run londiste $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db3'
 
 msg "See topology"
 run londiste $v conf/londiste_db4.ini status
@@ -107,7 +108,7 @@ msg "Register table on root node"
 run londiste $v conf/londiste_db1.ini add-table mytable
 
 msg "Register table on other node with creation"
-for db in db2 db3 db4; do
+for db in db2 db3 db4 db5; do
   run londiste $v conf/londiste_$db.ini add-table mytable --create
 done
 run sleep 20
@@ -138,3 +139,12 @@ run londiste $v conf/londiste_db1.ini status
 run sleep 10
 ./zcheck.sh
 
+msg "Change topology"
+ps aux | grep "postres[:].* db2 " | awk '{print $2}' | xargs kill
+run psql -d db1 -c 'alter database db2 rename to db2x'
+run londiste $v conf/londiste_db4.ini takeover db2 --dead
+run londiste $v conf/londiste_db1.ini status
+
+run sleep 10
+./zcheck.sh
+