status Show set state [set-status]
members Show members in set [nodes]
switchover --target NODE [--all]
+ takeover FROMNODE [--all] [--dead]
Broken:
queue_info = None
extra_objs = []
local_node = None
+ root_node_name = None
def __init__(self, svc_name, dbname, args, worker_setup = False):
skytools.AdminScript.__init__(self, svc_name, args)
help = "switchover: specify replacement node")
g.add_option("--merge",
help = "create-node: combined queue name")
+ g.add_option("--dead", action="store_true",
+ help = "tag some node as dead")
p.add_option_group(g)
return p
# Node initialization.
#
+ def cmd_install(self):
+ db = self.get_database("db")
+ self.install_code(db)
+
def cmd_create_root(self, node_name, node_location):
return self.create_node('root', node_name, node_location)
# configured db may not be root anymore, walk upwards then
if node_type in ('root', 'combined-root'):
db.commit()
+ self.root_node_name = info['node_name']
return db
self.close_database('root_db')
sys.exit(1)
raise Exception('process canceled')
+ def find_root_node(self):
+ self.find_root_db()
+ return self.root_node_name
+
def find_consumer_check(self, node, consumer):
cmap = self.get_node_consumer_map(node)
return (consumer in cmap)
"""Downgrade old root?"""
q = "select * from pgq_node.demote_root(%s, %s, %s)"
res = self.node_cmd(oldnode, q, [self.queue_name, step, newnode])
- return res[0]['last_tick']
+ if res:
+ return res[0]['last_tick']
def promote_branch(self, node):
"""Promote old branch as root."""
return info
- def switchover_root(self, old_info, new_info):
+ def takeover_root(self, old_info, new_info, failover = False):
"""Root switchover."""
old = old_info.name
new = new_info.name
- self.pause_node(old)
-
- self.demote_node(old, 1, new)
-
- last_tick = self.demote_node(old, 2, new)
-
- self.wait_for_catchup(new, last_tick)
+ if self.node_alive(old):
+ self.pause_node(old)
+ self.demote_node(old, 1, new)
+ last_tick = self.demote_node(old, 2, new)
+ self.wait_for_catchup(new, last_tick)
self.pause_node(new)
self.promote_branch(new)
self.resume_node(new)
- self.demote_node(old, 3, new)
+ if self.node_alive(old):
+ self.demote_node(old, 3, new)
+ self.resume_node(old)
- self.resume_node(old)
-
- def switchover_nonroot(self, old_node, new_node):
+ def takeover_nonroot(self, old_node_name, new_node_name, failover):
"""Non-root switchover."""
- if self.node_depends(new_node.name, old_node.name):
+ if self.node_depends(new_node_name, old_node_name):
# yes, old_node is new_nodes provider,
# switch it around
- self.node_change_provider(new_node.name, old_node.provider_node)
+ pnode = self.find_provider(old_node_name)
+ self.node_change_provider(new_node_name, pnode)
- self.node_change_provider(old_node.name, new_node.name)
+ self.node_change_provider(old_node_name, new_node_name)
- def cmd_switchover(self):
+ def cmd_takeover(self, old_node_name):
"""Generic node switchover."""
+ self.log.info("old: %s" % old_node_name)
self.load_local_info()
- old_node_name = self.options.node
- new_node_name = self.options.target
- if not old_node_name:
+ new_node_name = self.options.node
+ if not new_node_name:
worker = self.options.consumer
if not worker:
- raise Exception('old node not given')
+ raise UsageError('old node not given')
if self.queue_info.local_node.worker_name != worker:
- raise Exception('old node not given')
- old_node_name = self.local_node
- if not new_node_name:
- raise Exception('new node not given')
- old_node = self.get_node_info(old_node_name)
- new_node = self.get_node_info(new_node_name)
+ raise UsageError('old node not given')
+ new_node_name = self.local_node
+ if not old_node_name:
+ raise UsageError('old node not given')
+
+ if self.options.dead_root:
+ otype = 'root'
+ failover = True
+ elif self.options.dead_branch:
+ otype = 'branch'
+ failover = True
+ else:
+ onode = self.get_node_info(old_node_name)
+ otype = onode.type
+ failover = False
+
+ if failover:
+ self.cmd_tag_dead(old_node_name)
- if old_node.name == new_node.name:
+ new_node = self.get_node_info(new_node_name)
+ if old_node_name == new_node.name:
self.log.info("same node?")
return
- if old_node.type == 'root':
- self.switchover_root(old_node, new_node)
+ if otype == 'root':
+ self.takeover_root(old_node_name, new_node_name, failover)
else:
- self.switchover_nonroot(old_node, new_node)
+ self.takeover_nonroot(old_node_name, new_node_name, failover)
# switch subscribers around
- if self.options.all:
- for n in self.get_node_subscriber_list(old_node.name):
- self.node_change_provider(n, new_node.name)
+ if self.options.all or failover:
+ for n in self.find_subscribers_for(old_node_name):
+ self.node_change_provider(n, new_node_name)
+
+ def find_provider(self, node_name):
+ if self.node_alive(node_name):
+ info = self.get_node_info(node_name)
+ return info.provider_name
+ nodelist = self.queue_info.member_map.keys()
+ for n in nodelist:
+ if n == node_name:
+ continue
+ if not self.node_alive(n):
+ continue
+ if node_name in self.get_node_subscriber_list(n):
+ return n
+ return self.find_root_node()
+
+ def find_subscribers_for(self, parent_node_name):
+ """Find root node, having start point."""
+
+ # use dict to eliminate duplicates
+ res = {}
+
+ nodelist = self.queue_info.member_map.keys()
+ for node_name in nodelist:
+ if node_name == parent_node_name:
+ continue
+ if not self.node_alive(node_name):
+ continue
+ n = self.get_node_info(node_name)
+ if not n:
+ continue
+ if n.provider_node == parent_node_name:
+ res[n.name] = 1
+ return res.keys()
+
+ def cmd_tag_dead(self, node_name):
+ # todo: write to db
+ self.log.info("Tagging node '%s' as dead" % node_name)
+ self.queue_info.tag_dead(node_name)
def cmd_pause(self):
"""Pause a node"""
if not m:
self.log.error("get_node_database: cannot resolve %s" % node_name)
sys.exit(1)
+ self.log.info("%s: dead=%s" % (m.name, m.dead))
+ if m.dead:
+ return None
loc = m.location
db = self.get_database('node.' + node_name, connstr = loc)
return db
+ def node_alive(self, node_name):
+ m = self.queue_info.get_member(node_name)
+ if not m:
+ return False
+ if m.dead:
+ return False
+ return True
+
def close_node_database(self, node_name):
"""Disconnect node's connection."""
if node_name == self.queue_info.local_node.name:
def node_cmd(self, node_name, sql, args, quiet = False):
"""Execute SQL command on particular node."""
db = self.get_node_database(node_name)
+ if not db:
+ self.log.warning("ignoring cmd for dead node '%s': %s" % (
+ node_name, skytools.quote_statement(sql, args)))
+ return None
return self.exec_cmd(db, sql, args, quiet = quiet)
#
def resume_node(self, node):
"""Shortcut for resuming by name."""
state = self.get_node_info(node)
- self.resume_consumer(node, state.worker_name)
+ if state:
+ self.resume_consumer(node, state.worker_name)
def subscribe_node(self, target_node, subscriber_node, tick_pos):
"""Subscribing one node to another."""
def load_node_info(self, node_name):
"""Non-cached node info lookup."""
db = self.get_node_database(node_name)
+ if not db:
+ self.log.warning('load_node_info(%s): ignoring dead node' % node_name)
+ return None
q = "select * from pgq_node.get_node_info(%s)"
rows = self.exec_query(db, q, [self.queue_name])
return NodeInfo(self.queue_name, rows[0])