"""
-import sys, time, optparse, skytools
+import sys, time, optparse, skytools, os.path
from skytools import UsageError, DBError
from pgq.cascade.nodeinfo import *
__all__ = ['CascadeAdmin']
+RESURRECT_DUMP_FILE = "resurrect-lost-events.json"
+
command_usage = """\
%prog [options] INI CMD [subcmd args]
self.demote_node(old_node_name, 1, new_node_name)
last_tick = self.demote_node(old_node_name, 2, new_node_name)
self.wait_for_catchup(new_node_name, last_tick)
+ else:
+ q = "select * from pgq.get_queue_info(%s)"
+ db = self.get_node_database(new_node_name)
+ curs = db.cursor()
+ curs.execute(q, [self.queue_name])
+ row = curs.fetchone()
+ last_tick = row['last_tick_id']
+ db.commit()
self.pause_node(new_node_name)
self.promote_branch(new_node_name)
if self.node_alive(old_node_name):
- q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
- self.node_cmd(new_node_name, q, [self.queue_name, old_node_name, old_info.worker_name, last_tick])
+ old_worker_name = old_info.worker_name
+ else:
+ old_worker_name = self.failover_consumer_name(old_node_name)
+ q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
+ self.node_cmd(new_node_name, q, [self.queue_name, old_node_name, old_worker_name, last_tick])
q = "select * from pgq_node.unregister_subscriber(%s, %s)"
self.node_cmd(new_info.provider_node, q, [self.queue_name, new_node_name])
self.sleep(2)
+ def cmd_resurrect(self):
+ """Convert out-of-sync old root to branch and sync queue contents.
+ """
+ self.load_local_info()
+
+ db = self.get_database(self.initial_db_name)
+ curs = db.cursor()
+
+ # stop if leaf
+ if self.queue_info.local_node.type == 'leaf':
+ self.log.info("Current node is leaf, nothing to do")
+ return
+
+ # stop if dump file exists
+ if os.path.lexists(RESURRECT_DUMP_FILE):
+ self.log.error("Dump file exists, cannot perform resurrection: %s", RESURRECT_DUMP_FILE)
+ sys.exit(1)
+
+ #
+ # Find failover position
+ #
+
+ self.log.info("** Searching for gravestone **")
+
+ # load subscribers
+ sub_list = []
+ q = "select * from pgq_node.get_subscriber_info(%s)"
+ curs.execute(q, [self.queue_name])
+ for row in curs.fetchall():
+ sub_list.append(row['node_name'])
+ db.commit()
+
+ # find backup subscription
+ this_node = self.queue_info.local_node.name
+ failover_cons = self.failover_consumer_name(this_node)
+ full_list = self.queue_info.member_map.keys()
+ done_nodes = { this_node: 1 }
+ prov_node = None
+ root_node = None
+ for node_name in sub_list + full_list:
+ if node_name in done_nodes:
+ continue
+ done_nodes[node_name] = 1
+ if not self.node_alive(node_name):
+ self.log.info('Node %s is dead, skipping', node_name)
+ continue
+ self.log.info('Looking on node %s', node_name)
+ node_db = None
+ try:
+ node_db = self.get_node_database(node_name)
+ node_curs = node_db.cursor()
+ node_curs.execute("select * from pgq.get_consumer_info(%s, %s)", [self.queue_name, failover_cons])
+ cons_rows = node_curs.fetchall()
+ node_curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name])
+ node_info = node_curs.fetchone()
+ node_db.commit()
+ if len(cons_rows) == 1:
+ if prov_node:
+ raise Exception('Unexcpeted situation: there are two gravestones - on nodes %s and %s' % (prov_node, node_name))
+ prov_node = node_name
+ failover_tick = cons_rows[0]['last_tick']
+ self.log.info("Found gravestone on node: %s", node_name)
+ if node_info['node_type'] == 'root':
+ self.log.info("Found new root node: %s", node_name)
+ root_node = node_name
+ self.close_node_database(node_name)
+ node_db = None
+ if root_node and prov_node:
+ break
+ except skytools.DBError:
+ self.log.warning("failed to check node %s", node_name)
+ if node_db:
+ self.close_node_database(node_name)
+ node_db = None
+
+ if not root_node:
+ self.log.error("Cannot find new root node", failover_cons)
+ sys.exit(1)
+ if not prov_node:
+ self.log.error("Cannot find failover position (%s)", failover_cons)
+ sys.exit(1)
+
+ # load worker state
+ q = "select * from pgq_node.get_worker_state(%s)"
+ rows = self.exec_cmd(db, q, [self.queue_name])
+ state = rows[0]
+
+ # demote & pause
+ self.log.info("** Demote & pause local node **")
+ if self.queue_info.local_node.type == 'root':
+ self.log.info('Node %s is root, demoting', this_node)
+ q = "select * from pgq_node.demote_root(%s, %s, %s)"
+ self.exec_cmd(db, q, [self.queue_name, 1, prov_node])
+ self.exec_cmd(db, q, [self.queue_name, 2, prov_node])
+
+ # change node type and set worker paused in same TX
+ curs = db.cursor()
+ self.exec_cmd(curs, q, [self.queue_name, 3, prov_node])
+ q = "select * from pgq_node.set_consumer_paused(%s, %s, true)"
+ self.exec_cmd(curs, q, [self.queue_name, state['worker_name']])
+ db.commit()
+ elif not state['paused']:
+ # pause worker, don't wait for reaction, as it may be dead
+ self.log.info('Node %s is branch, pausing worker: %s', this_node, state['worker_name'])
+ q = "select * from pgq_node.set_consumer_paused(%s, %s, true)"
+ self.exec_cmd(db, q, [self.queue_name, state['worker_name']])
+ else:
+ self.log.info('Node %s is branch and worker is paused', this_node)
+
+ #
+ # Drop old consumers and subscribers
+ #
+ self.log.info("** Dropping old subscribers and consumers **")
+
+ # unregister subscriber nodes
+ q = "select pgq_node.unregister_subscriber(%s, %s)"
+ for node_name in sub_list:
+ self.log.info("Dropping old subscriber node: %s", node_name)
+ curs.execute(q, [self.queue_name, node_name])
+
+ # unregister consumers
+ q = "select consumer_name from pgq.get_consumer_info(%s)"
+ curs.execute(q, [self.queue_name])
+ for row in curs.fetchall():
+ cname = row['consumer_name']
+ if cname[0] == '.':
+ self.log.info("Keeping consumer: %s", cname)
+ continue
+ self.log.info("Dropping old consumer: %s", cname)
+ q = "pgq.unregister_consumer(%s, %s)"
+ curs.execute(q, [self.queue_name, cname])
+ db.commit()
+
+ # dump events
+ self.log.info("** Dump & delete lost events **")
+ stats = self.resurrect_process_lost_events(db, failover_tick)
+
+ self.log.info("** Subscribing %s to %s **", this_node, prov_node)
+
+ # set local position
+ self.log.info("Reset local completed pos")
+ q = "select * from pgq_node.set_consumer_completed(%s, %s, %s)"
+ self.exec_cmd(db, q, [self.queue_name, state['worker_name'], failover_tick])
+
+ # rename gravestone
+ self.log.info("Rename gravestone to worker: %s", state['worker_name'])
+ prov_db = self.get_node_database(prov_node)
+ prov_curs = prov_db.cursor()
+ q = "select * from pgq_node.unregister_subscriber(%s, %s)"
+ self.exec_cmd(prov_curs, q, [self.queue_name, this_node], quiet = True)
+ q = "select ret_code, ret_note, global_watermark"\
+ " from pgq_node.register_subscriber(%s, %s, %s, %s)"
+ res = self.exec_cmd(prov_curs, q, [self.queue_name, this_node, state['worker_name'], failover_tick], quiet = True)
+ global_wm = res[0]['global_watermark']
+ prov_db.commit()
+
+ # import new global watermark
+ self.log.info("Reset global watermark")
+ q = "select * from pgq_node.set_global_watermark(%s, %s)"
+ self.exec_cmd(db, q, [self.queue_name, global_wm], quiet = True)
+
+ # show stats
+ if stats:
+ self.log.info("** Statistics **")
+ klist = stats.keys()
+ klist.sort()
+ for k in klist:
+ v = stats[k]
+ self.log.info(" %s: %s", k, str(v))
+ self.log.info("** Resurrection done, worker paused **")
+
+ def resurrect_process_lost_events(self, db, failover_tick):
+ curs = db.cursor()
+ this_node = self.queue_info.local_node.name
+ cons_name = this_node + '.dumper'
+
+ self.log.info("Dumping lost events")
+
+ # register temp consumer on queue
+ q = "select pgq.register_consumer_at(%s, %s, %s)"
+ curs.execute(q, [self.queue_name, cons_name, failover_tick])
+ db.commit()
+
+ # process events as usual
+ total_count = 0
+ final_tick_id = -1
+ stats = {}
+ while 1:
+ q = "select * from pgq.next_batch_info(%s, %s)"
+ curs.execute(q, [self.queue_name, cons_name])
+ b = curs.fetchone()
+ batch_id = b['batch_id']
+ if batch_id is None:
+ break
+ final_tick_id = b['cur_tick_id']
+ q = "select * from pgq.get_batch_events(%s)"
+ curs.execute(q, [batch_id])
+ cnt = 0
+ for ev in curs.fetchall():
+ cnt += 1
+ total_count += 1
+ self.resurrect_dump_event(ev, stats, b)
+
+ q = "select pgq.finish_batch(%s)"
+ curs.execute(q, [batch_id])
+ if cnt > 0:
+ db.commit()
+
+ stats['dumped_count'] = total_count
+
+ self.resurrect_dump_finish()
+
+ self.log.info("%s events dumped", total_count)
+
+ # unregiser consumer
+ q = "select pgq.unregister_consumer(%s, %s)"
+ curs.execute(q, [self.queue_name, cons_name])
+ db.commit()
+
+ if failover_tick == final_tick_id:
+ self.log.info("No batches found")
+ return None
+
+ #
+ # Delete the events from queue
+ #
+ # This is done snapshots, to make sure we delete only events
+ # that were dumped out previously. This uses the long-tx
+ # resustant logic described in pgq.batch_event_sql().
+ #
+
+ # find snapshots
+ q = "select t1.tick_snapshot as s1, t2.tick_snapshot as s2"\
+ " from pgq.tick t1, pgq.tick t2"\
+ " where t1.tick_id = %s"\
+ " and t2.tick_id = %s"
+ curs.execute(q, [failover_tick, final_tick_id])
+ ticks = curs.fetchone()
+ s1 = skytools.Snapshot(ticks['s1'])
+ s2 = skytools.Snapshot(ticks['s2'])
+
+ xlist = []
+ for tx in s1.txid_list:
+ if s2.contains(tx):
+ xlist.append(str(tx))
+
+ # create where clauses
+ W1 = None
+ if len(xlist) > 0:
+ W1 = "ev_txid in (%s)" % (",".join(xlist),)
+ W2 = "ev_txid >= %d AND ev_txid <= %d"\
+ " and not txid_visible_in_snapshot(ev_txid, '%s')"\
+ " and txid_visible_in_snapshot(ev_txid, '%s')" % (
+ s1.xmax, s2.xmax, ticks['s1'], ticks['s2'])
+
+ # loop over all queue data tables
+ q = "select * from pgq.queue where queue_name = %s"
+ curs.execute(q, [self.queue_name])
+ row = curs.fetchone()
+ ntables = row['queue_ntables']
+ tbl_pfx = row['queue_data_pfx']
+ schema, table = tbl_pfx.split('.')
+ total_del_count = 0
+ self.log.info("Deleting lost events")
+ for i in range(ntables):
+ del_count = 0
+ self.log.debug("Deleting events from table %d" % i)
+ qtbl = "%s.%s" % (skytools.quote_ident(schema),
+ skytools.quote_ident(table + '_' + str(i)))
+ q = "delete from " + qtbl + " where "
+ if W1:
+ self.log.debug(q + W1)
+ curs.execute(q + W1)
+ if curs.rowcount and curs.rowcount > 0:
+ del_count += curs.rowcount
+ self.log.debug(q + W2)
+ curs.execute(q + W2)
+ if curs.rowcount and curs.rowcount > 0:
+ del_count += curs.rowcount
+ total_del_count += del_count
+ self.log.debug('%d events deleted', del_count)
+ self.log.info('%d events deleted', total_del_count)
+ stats['deleted_count'] = total_del_count
+
+ # delete new ticks
+ q = "delete from pgq.tick t using pgq.queue q"\
+ " where q.queue_name = %s"\
+ " and t.tick_queue = q.queue_id"\
+ " and t.tick_id > %s"\
+ " and t.tick_id <= %s"
+ curs.execute(q, [self.queue_name, failover_tick, final_tick_id])
+ self.log.info("%s ticks deleted", curs.rowcount)
+
+ db.commit()
+
+ return stats
+
+ _json_dump_file = None
+ def resurrect_dump_event(self, ev, stats, batch_info):
+ if self._json_dump_file is None:
+ self._json_dump_file = open(RESURRECT_DUMP_FILE, 'w')
+ sep = '['
+ else:
+ sep = ','
+
+ # create orinary dict to avoid problems with row class and datetime
+ d = {
+ 'ev_id': ev.ev_id,
+ 'ev_type': ev.ev_type,
+ 'ev_data': ev.ev_data,
+ 'ev_extra1': ev.ev_extra1,
+ 'ev_extra2': ev.ev_extra2,
+ 'ev_extra3': ev.ev_extra3,
+ 'ev_extra4': ev.ev_extra4,
+ 'ev_time': ev.ev_time.isoformat(),
+ 'ev_txid': ev.ev_txid,
+ 'ev_retry': ev.ev_retry,
+ 'tick_id': batch_info['cur_tick_id'],
+ 'prev_tick_id': batch_info['prev_tick_id'],
+ }
+ jsev = skytools.json_encode(d)
+ s = sep + '\n' + jsev
+ self._json_dump_file.write(s)
+
+ def resurrect_dump_finish(self):
+ if self._json_dump_file:
+ self._json_dump_file.write('\n]\n')
+ self._json_dump_file.close()
+ self._json_dump_file = None
+
+ def failover_consumer_name(self, node_name):
+ return node_name + ".gravestone"
+
#
# Shortcuts for operating on nodes.
#
--- /dev/null
+#! /bin/bash
+
+. ../testlib.sh
+
+../zstop.sh
+
+rm -f resurrect-lost-events.json
+
+v='-q'
+v=''
+nocheck=1
+
+db_list="db1 db2 db3 db4 db5"
+
+kdb_list=`echo $db_list | sed 's/ /,/g'`
+
+#( cd ../..; make -s install )
+
+do_check() {
+ test $nocheck = 1 || ../zcheck.sh
+}
+
+title Resurrect test
+
+# create ticker conf
+cat > conf/pgqd.ini <<EOF
+[pgqd]
+database_list = $kdb_list
+logfile = log/pgqd.log
+pidfile = pid/pgqd.pid
+EOF
+
+# londiste3 configs
+for db in $db_list; do
+cat > conf/londiste_$db.ini <<EOF
+[londiste3]
+job_name = londiste_$db
+db = dbname=$db
+queue_name = replika
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+
+pgq_autocommit = 1
+pgq_lazy_fetch = 0
+EOF
+done
+
+for n in 1 2 3; do
+cat > conf/gen$n.ini <<EOF
+[loadgen]
+job_name = loadgen$n
+db = dbname=db$n
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+EOF
+done
+
+psql -d template1 -c 'drop database if exists db1x'
+psql -d template1 -c 'drop database if exists db2x'
+createdb db1
+createdb db2
+
+for db in $db_list; do
+ cleardb $db
+done
+
+clearlogs
+
+set -e
+
+msg "Basic config"
+run cat conf/pgqd.ini
+run cat conf/londiste_db1.ini
+
+msg "Install londiste3 and initialize nodes"
+run londiste3 $v conf/londiste_db1.ini create-root node1 'dbname=db1'
+run londiste3 $v conf/londiste_db2.ini create-branch node2 'dbname=db2' --provider='dbname=db1'
+run londiste3 $v conf/londiste_db3.ini create-branch node3 'dbname=db3' --provider='dbname=db2'
+run londiste3 $v conf/londiste_db4.ini create-branch node4 'dbname=db4' --provider='dbname=db2'
+run londiste3 $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db2'
+
+msg "Run ticker"
+run pgqd $v -d conf/pgqd.ini
+run sleep 5
+
+msg "See topology"
+run londiste3 $v conf/londiste_db1.ini status
+
+msg "Run londiste3 daemon for each node"
+for db in $db_list; do
+ run psql -d $db -c "update pgq.queue set queue_ticker_idle_period='2 secs'"
+ run londiste3 $v -d conf/londiste_$db.ini worker
+done
+
+msg "Create table on root node and fill couple of rows"
+run psql -d db1 -c "create table mytable (id serial primary key, data text)"
+for n in 1 2 3 4; do
+ run psql -d db1 -c "insert into mytable (data) values ('row$n')"
+done
+
+msg "Run loadgen on table"
+run ./loadgen.py -d conf/gen1.ini
+
+msg "Register table on root node"
+run londiste3 $v conf/londiste_db1.ini add-table mytable
+run londiste3 $v conf/londiste_db1.ini add-seq mytable_id_seq
+
+msg "Register table on other node with creation"
+for db in db2 db3 db4 db5; do
+ run psql -d $db -c "create sequence mytable_id_seq"
+ run londiste3 $v conf/londiste_$db.ini add-seq mytable_id_seq
+ run londiste3 $v conf/londiste_$db.ini add-table mytable --create-full
+done
+
+msg "Wait until tables are in sync on db3"
+
+run londiste3 conf/londiste_db2.ini wait-sync
+run londiste3 conf/londiste_db3.ini wait-sync
+
+run londiste3 conf/londiste_db3.ini status
+
+###################
+
+#msg "Stop Londiste on Node2"
+#run londiste3 conf/londiste_db2.ini worker -s
+#sleep 1
+
+#msg "Wait a bit"
+#run sleep 10
+#############################
+msg "Force lag on db2"
+run londiste3 $v conf/londiste_db2.ini worker -s
+run sleep 20
+
+msg "Stop Londiste on Node1"
+run londiste3 conf/londiste_db1.ini worker -s
+
+msg "Stop loadgen"
+run sleep 5
+run ./loadgen.py -s conf/gen1.ini
+
+#msg "Kill old root"
+#ps aux | grep 'postgres[:].* db1 ' | awk '{print $2}' | xargs -r kill
+#sleep 3
+#ps aux | grep 'postgres[:].* db1 ' | awk '{print $2}' | xargs -r kill -9
+#sleep 3
+
+run londiste3 $v conf/londiste_db2.ini status --dead=node1
+
+#msg "Stop Ticker"
+run pgqd -s conf/pgqd.ini
+run psql -d db2 -c 'alter database db1 rename to db1x'
+run pgqd -d conf/pgqd.ini
+#run londiste3 $v conf/londiste_db2.ini tag-dead node1
+run londiste3 $v conf/londiste_db2.ini worker -d
+
+
+msg "Take over root role"
+run londiste3 $v conf/londiste_db2.ini takeover node1 --dead-root
+run londiste3 $v conf/londiste_db2.ini tag-dead node1
+run londiste3 $v conf/londiste_db2.ini status
+
+msg "Move database back"
+run psql -d db2 -c 'alter database db1x rename to db1'
+
+msg "Do resurrection ritual"
+run londiste3 conf/londiste_db1.ini resurrect
+
+exit 0
+
+
+
+
+##
+## basic setup done
+##
+
+# test lagged takeover
+if true; then
+
+msg "Force lag on db2"
+run londiste3 $v conf/londiste_db2.ini worker -s
+run sleep 20
+
+msg "Kill old root"
+ps aux | grep 'postgres[:].* db1 ' | awk '{print $2}' | xargs -r kill
+sleep 3
+ps aux | grep 'postgres[:].* db1 ' | awk '{print $2}' | xargs -r kill -9
+sleep 3
+run psql -d db2 -c 'drop database db1'
+run psql -d db2 -c 'create database db1'
+run londiste3 $v conf/londiste_db2.ini status --dead=node1
+
+msg "Change db2 to read from db3"
+run londiste3 $v conf/londiste_db2.ini worker -d
+run londiste3 $v conf/londiste_db2.ini change-provider --provider=node3 --dead=node1
+
+msg "Wait until catchup"
+run londiste3 $v conf/londiste_db2.ini wait-provider
+
+msg "Promoting db2 to root"
+run londiste3 $v conf/londiste_db2.ini takeover node1 --dead-root
+run londiste3 $v conf/londiste_db2.ini tag-dead node1
+run londiste3 $v conf/londiste_db2.ini status
+
+run sleep 5
+
+msg "Done"
+
+do_check
+
+exit 0
+fi
+
+
+
+msg "Change provider"
+run londiste3 $v conf/londiste_db4.ini status
+run londiste3 $v conf/londiste_db4.ini change-provider --provider=node3
+run londiste3 $v conf/londiste_db4.ini status
+run londiste3 $v conf/londiste_db5.ini change-provider --provider=node2
+run londiste3 $v conf/londiste_db5.ini status
+
+msg "Change topology"
+run londiste3 $v conf/londiste_db1.ini status
+run londiste3 $v conf/londiste_db3.ini takeover node2
+run londiste3 $v conf/londiste_db2.ini status
+run londiste3 $v conf/londiste_db2.ini takeover node1
+run londiste3 $v conf/londiste_db2.ini status
+
+msg "Restart loadgen"
+run ./loadgen.py -s conf/gen1.ini
+run ./loadgen.py -d conf/gen2.ini
+
+run sleep 10
+do_check
+
+msg "Change topology / failover"
+ps aux | grep 'postgres[:].* db2 ' | awk '{print $2}' | xargs -r kill
+sleep 3
+ps aux | grep 'postgres[:].* db2 ' | awk '{print $2}' | xargs -r kill -9
+sleep 3
+run psql -d db1 -c 'alter database db2 rename to db2x'
+run londiste3 $v conf/londiste_db1.ini status --dead=node2
+run londiste3 $v conf/londiste_db3.ini takeover db2 --dead-root || true
+run londiste3 $v conf/londiste_db3.ini takeover node2 --dead-root
+run londiste3 $v conf/londiste_db1.ini status
+
+msg "Restart loadgen"
+run ./loadgen.py -s conf/gen2.ini
+run ./loadgen.py -d conf/gen3.ini
+
+
+run sleep 10
+do_check
+
+msg Done