cmd_handlers = (
(('create-root', 'create-branch', 'create-leaf', 'members', 'tag-dead', 'tag-alive',
'change-provider', 'rename-node', 'status', 'pause', 'resume',
- 'switchover', 'failover', 'drop-node', 'takeover'), londiste.LondisteSetup),
+ 'drop-node', 'takeover'), londiste.LondisteSetup),
(('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs',
'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup),
(('worker', 'replay'), londiste.Replicator),
help = "switchover: target node")
g.add_option("--merge",
help = "create-leaf: combined queue name")
- g.add_option("--dead", action = 'store_true',
- help = "takeover: old one is dead")
+ g.add_option("--dead", action = 'append',
+ help = "cascade: assume node is dead")
+ g.add_option("--dead-root", action = 'store_true',
+ help = "takeover: old node was root")
+ g.add_option("--dead-branch", action = 'store_true',
+ help = "takeover: old node was branch")
p.add_option_group(g)
return p
g.add_option("--consumer",
help = "specify consumer name")
g.add_option("--target",
- help = "switchover: specify replacement node")
+ help = "takeover: specify node to take over")
g.add_option("--merge",
help = "create-node: combined queue name")
- g.add_option("--dead", action="store_true",
+ g.add_option("--dead", action="append",
+ help = "tag some node as dead")
+ g.add_option("--dead-root", action="store_true",
+ help = "tag some node as dead")
+ g.add_option("--dead-branch", action="store_true",
help = "tag some node as dead")
p.add_option_group(g)
return p
def cmd_status(self):
"""Show set status."""
- root_db = self.find_root_db()
- sinf = self.load_queue_info(root_db)
+ self.load_local_info()
+
+ for mname, minf in self.queue_info.member_map.iteritems():
+ #inf = self.get_node_info(mname)
+ #self.queue_info.add_node(inf)
+ #continue
- for mname, minf in sinf.member_map.iteritems():
+ if not self.node_alive(mname):
+ node = NodeInfo(self.queue_name, None, node_name = mname)
+ self.queue_info.add_node(node)
+ continue
db = self.get_database('look_db', connstr = minf.location, autocommit = 1)
curs = db.cursor()
curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name])
node = NodeInfo(self.queue_name, curs.fetchone())
node.load_status(curs)
self.load_extra_status(curs, node)
- sinf.add_node(node)
+ self.queue_info.add_node(node)
self.close_database('look_db')
- sinf.print_tree()
+ self.queue_info.print_tree()
def load_extra_status(self, curs, node):
"""Fetch extra info."""
return info
- def takeover_root(self, old_info, new_info, failover = False):
+ def takeover_root(self, old_node_name, new_node_name, failover = False):
"""Root switchover."""
- old = old_info.name
- new = new_info.name
- if self.node_alive(old):
- self.pause_node(old)
- self.demote_node(old, 1, new)
- last_tick = self.demote_node(old, 2, new)
- self.wait_for_catchup(new, last_tick)
+ new_info = self.get_node_info(new_node_name)
+ old_info = None
- self.pause_node(new)
- self.promote_branch(new)
+ if self.node_alive(old_node_name):
+ old_info = self.get_node_info(old_node_name)
+ self.pause_node(old_node_name)
+ self.demote_node(old_node_name, 1, new_node_name)
+ last_tick = self.demote_node(old_node_name, 2, new_node_name)
+ self.wait_for_catchup(new_node_name, last_tick)
- #self.subscribe_node(new, old, old_info.completed_tick)
- q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
- self.node_cmd(new, q, [self.queue_name, old, old_info.worker_name, last_tick])
+ self.pause_node(new_node_name)
+ self.promote_branch(new_node_name)
+
+ if self.node_alive(old_node_name):
+ q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
+ self.node_cmd(new_node_name, q, [self.queue_name, old_node_name, old_info.worker_name, last_tick])
- #self.unsubscribe_node(new_node.parent_node, new_node.name)
q = "select * from pgq_node.unregister_subscriber(%s, %s)"
- self.node_cmd(new_info.provider_node, q, [self.queue_name, new])
+ self.node_cmd(new_info.provider_node, q, [self.queue_name, new_node_name])
- self.resume_node(new)
+ self.resume_node(new_node_name)
- if self.node_alive(old):
- self.demote_node(old, 3, new)
- self.resume_node(old)
+ if self.node_alive(old_node_name):
+ self.demote_node(old_node_name, 3, new_node_name)
+ self.resume_node(old_node_name)
def takeover_nonroot(self, old_node_name, new_node_name, failover):
"""Non-root switchover."""
if not old_node_name:
raise UsageError('old node not given')
+ if old_node_name not in self.queue_info.member_map:
+ raise UsageError('Unknown node: %s' % old_node_name)
+
if self.options.dead_root:
otype = 'root'
failover = True
if not m:
self.log.error("get_node_database: cannot resolve %s" % node_name)
sys.exit(1)
- self.log.info("%s: dead=%s" % (m.name, m.dead))
+ #self.log.info("%s: dead=%s" % (m.name, m.dead))
if m.dead:
return None
loc = m.location
def node_alive(self, node_name):
m = self.queue_info.get_member(node_name)
if not m:
- return False
- if m.dead:
- return False
- return True
+ res = False
+ elif m.dead:
+ res = False
+ else:
+ res = True
+ #self.log.warning('node_alive(%s) = %s' % (node_name, res))
+ return res
def close_node_database(self, node_name):
"""Disconnect node's connection."""
q = "select * from pgq_node.get_queue_locations(%s)"
member_list = self.exec_query(db, q, [self.queue_name])
- return QueueInfo(self.queue_name, info, member_list)
+ qinf = QueueInfo(self.queue_name, info, member_list)
+ if self.options.dead:
+ for node in self.options.dead:
+ self.log.info("Assuming node '%s' as dead" % node)
+ qinf.tag_dead(node)
+ return qinf
def get_node_subscriber_list(self, node_name):
"""Fetch subscriber list from a node."""
class NodeInfo:
"""Detailed info about set node."""
- def __init__(self, queue_name, row, main_worker = True):
+
+ name = None
+ type = None
+ global_watermark = None
+ local_watermark = None
+ completed_tick = None
+ provider_node = None
+ provider_location = None
+ consumer_name = None #?
+ worker_name = None #?
+ paused = False
+ uptodate = True
+ combined_queue = None
+ combined_type = None
+
+ def __init__(self, queue_name, row, main_worker = True, node_name = None):
self.queue_name = queue_name
self.member_map = {}
self.main_worker = main_worker
+ self.parent = None
+ self.consumer_map = {}
+ self.queue_info = {}
+ self._info_lines = []
+
+ self._row = row
+
+ if not row:
+ self.name = node_name
+ self.type = 'dead'
+ return
+
self.name = row['node_name']
self.type = row['node_type']
self.global_watermark = row['global_watermark']
self.combined_queue = row['combined_queue']
self.combined_type = row['combined_type']
- self.parent = None
- self.consumer_map = {}
- self.queue_info = {}
-
- self._row = row
-
- self._info_lines = []
-
def __get_target_queue(self):
qname = None
if self.type == LEAF:
root = self.parent
while root.parent:
root = root.parent
- tick_time = self.parent.consumer_map[self.consumer_name]['tick_time']
- root_time = root.queue_info['now']
- lag = root_time - tick_time
- else:
+ cinfo = self.parent.consumer_map.get(self.consumer_name)
+ if cinfo and root.queue_info:
+ tick_time = cinfo['tick_time']
+ root_time = root.queue_info['now']
+ lag = root_time - tick_time
+ else:
+ lag = "(n/a?)"
+ elif self.queue_info:
lag = self.queue_info['ticker_lag']
+ else:
+ lag = "(n/a)"
txt = "lag: %s" % lag
if self.paused:
txt += ", PAUSED"
. ../testlib.sh
+v=-q
+
db_list="db1 db2 db3 db4 db5"
+( cd ../..; make -s install )
+
echo " * create configs * "
# create ticker conf
EOF
done
+psql -d template1 -c 'drop database if exists db2x'
+createdb db2
+
for db in $db_list; do
cleardb $db
done
-echo "clean logs"
-rm -f log/*.log
+clearlogs
set -e
done
run sleep 20
+if false; then
+
msg "Add column on root"
run cat ddl.sql
run londiste $v conf/londiste_db1.ini execute ddl.sql
-
msg "Insert data into new column"
for n in 5 6 7 8; do
run psql -d db1 -c "insert into mytable values ($n, 'row$n', 'data2')"
done
msg "Wait a bit"
run sleep 20
+msg "Check table structure"
+run psql -d db5 -c '\d mytable'
+run psql -d db5 -c 'select * from mytable'
+run sleep 5
+../zcheck.sh
+fi
-run psql -d db3 -c '\d mytable'
-run psql -d db3 -c 'select * from mytable'
+msg "Change provider"
+run londiste $v conf/londiste_db4.ini status
+run londiste $v conf/londiste_db4.ini change-provider --provider=node3
+run londiste $v conf/londiste_db4.ini status
+run londiste $v conf/londiste_db5.ini change-provider --provider=node2
+run londiste $v conf/londiste_db5.ini status
-run sleep 10
-../zcheck.sh
msg "Change topology"
run londiste $v conf/londiste_db1.ini status
-run londiste $v conf/londiste_db3.ini change-provider --provider=node1
-run londiste $v conf/londiste_db1.ini status
-run londiste $v conf/londiste_db1.ini switchover --target=node2
-run londiste $v conf/londiste_db1.ini status
+run londiste $v conf/londiste_db3.ini takeover node2
+run londiste $v conf/londiste_db2.ini status
+run londiste $v conf/londiste_db2.ini takeover node1
+run londiste $v conf/londiste_db2.ini status
run sleep 10
../zcheck.sh
-msg "Change topology"
-ps aux | grep "postres[:].* db2 " | awk '{print $2}' | xargs -r kill
+msg "Change topology / failover"
+ps aux | grep 'postgres[:].* db2 ' | awk '{print $2}' | xargs -r kill
+sleep 3
+ps aux | grep 'postgres[:].* db2 ' | awk '{print $2}' | xargs -r kill -9
+sleep 3
run psql -d db1 -c 'alter database db2 rename to db2x'
-run londiste $v conf/londiste_db4.ini takeover db2 --dead
-run londiste $v conf/londiste_db1.ini status
+run londiste $v conf/londiste_db1.ini status --dead=node2
+run londiste $v conf/londiste_db3.ini takeover db2 --dead-root || true
+run londiste $v conf/londiste_db3.ini takeover node2 --dead-root
+run londiste $v conf/londiste_db1.ini status --dead=node2
run sleep 10
../zcheck.sh