From 551a0d0f2adb892a0fb9c4fca5257cbc534ec558 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Thu, 5 Jul 2012 14:11:29 +0300 Subject: [PATCH] londiste: Wait commands for londiste add-table --wait-sync Don't return until all pending tables are synced wait-sync Wait until all pending tables are synced. wait-root Wait until root's next tick has appeared locally. wait-provider Wait until provider's next tick has appeared locally. --- doc/londiste3.txt | 16 +++++++++++ python/londiste.py | 6 +++- python/londiste/setup.py | 56 ++++++++++++++++++++++++++++++++++++ python/pgq/cascade/admin.py | 57 +++++++++++++++++++++++++++++++++++++ tests/londiste/regen.sh | 29 +++++-------------- 5 files changed, 141 insertions(+), 23 deletions(-) diff --git a/doc/londiste3.txt b/doc/londiste3.txt index 0a0405c9..92ca7a63 100644 --- a/doc/londiste3.txt +++ b/doc/londiste3.txt @@ -240,6 +240,9 @@ Do full copy of the table, again. --all:: Include all possible tables. + --wait-sync:: + Wait until newly added tables are synced fully. + --dest-table='table':: Redirect changes to different table. @@ -330,6 +333,19 @@ Execute SQL files on each nodes of the set. Show info about all or a specific handler. +=== wait-sync === + +Wait until all added tables are copied over. + +=== wait-provider === + +Wait until local node passes latest queue position on provider. + +=== wait-root === + +Wait until local node passes latest queue position on root. + + == INTERNAL COMMAND == === copy === diff --git a/python/londiste.py b/python/londiste.py index 3a183ab6..78d90d6f 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -30,6 +30,7 @@ Replication Administration: seqs show all sequences on provider missing list tables subscriber has not yet attached to resync TBL ... do full copy again + wait-sync wait until all tables are in sync Replication Extra: check compare table structure on both sides @@ -48,7 +49,8 @@ cmd_handlers = ( 'change-provider', 'rename-node', 'status', 'pause', 'resume', 'node-info', 'drop-node', 'takeover'), londiste.LondisteSetup), (('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs', - 'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup), + 'missing', 'resync', 'wait-sync', 'wait-root', 'wait-provider', + 'check', 'fkeys', 'execute'), londiste.LondisteSetup), (('show-handlers',), londiste.LondisteSetup), (('worker',), londiste.Replicator), (('compare',), londiste.Comparator), @@ -110,6 +112,8 @@ class Londiste(skytools.DBScript): g = optparse.OptionGroup(p, "options for add") g.add_option("--all", action="store_true", help = "add: include add possible tables") + g.add_option("--wait-sync", action="store_true", + help = "add: wait until all tables are in sync"), g.add_option("--dest-table", help = "add: redirect changes to different table") g.add_option("--expect-sync", action="store_true", dest="expect_sync", diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 6976f60a..b055fcb1 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -53,6 +53,8 @@ class LondisteSetup(CascadeAdmin): help="force", default=False) p.add_option("--all", action="store_true", help="include all tables", default=False) + p.add_option("--wait-sync", action="store_true", + help = "add: wait until all tables are in sync"), p.add_option("--create", action="store_true", help="create, minimal", default=False) p.add_option("--create-full", action="store_true", @@ -161,6 +163,11 @@ class LondisteSetup(CascadeAdmin): for tbl in args: self.add_table(src_db, dst_db, tbl, create_flags, src_tbls) + # wait + if self.options.wait_sync: + self.wait_for_sync(dst_db) + + def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls): # use full names tbl = skytools.fq_name(tbl) @@ -551,3 +558,52 @@ class LondisteSetup(CascadeAdmin): n_half += 1 node.add_info_line('Tables: %d/%d/%d' % (n_ok, n_half, n_ign)) + def cmd_wait_sync(self): + self.load_local_info() + + dst_db = self.get_database('db') + self.wait_for_sync(dst_db) + + def wait_for_sync(self, dst_db): + self.log.info("Waiting until all tables are in sync") + q = "select table_name, merge_state, local"\ + " from londiste.get_table_list(%s) where local" + dst_curs = dst_db.cursor() + + partial = {} + done_pos = 1 + startup_info = 0 + while 1: + dst_curs.execute(q, [self.queue_name]) + rows = dst_curs.fetchall() + dst_db.commit() + + cur_count = 0 + done_list = [] + for row in rows: + if not row['local']: + continue + tbl = row['table_name'] + if row['merge_state'] != 'ok': + partial[tbl] = 0 + cur_count += 1 + elif tbl in partial: + if partial[tbl] == 0: + partial[tbl] = 1 + done_list.append(tbl) + + if not startup_info: + self.log.info("%d table(s) to copy", len(partial)) + startup_info = 1 + + for done in done_list: + self.log.info("%s: finished (%d/%d)", done, done_pos, len(partial)) + done_pos += 1 + + if cur_count == 0: + break + + self.sleep(2) + + self.log.info("All done") + diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index e695ae6c..f8f294c8 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -766,6 +766,63 @@ class CascadeAdmin(skytools.AdminScript): if n.combined_queue: print('Combined Queue: %s (node type: %s)' % (n.combined_queue, n.combined_type)) + def cmd_wait_root(self): + """Wait for next tick from root.""" + + self.load_local_info() + + if self.queue_info.local_node.type == 'root': + self.log.info("Current node is root, no need to wait") + return + + self.log.info("Finding root node") + root_node = self.find_root_node() + self.log.info("Root is %s", root_node) + + dst_db = self.get_database('db') + self.wait_for_node(dst_db, root_node) + + def cmd_wait_provider(self): + """Wait for next tick from provider.""" + + self.load_local_info() + + if self.queue_info.local_node.type == 'root': + self.log.info("Current node is root, no need to wait") + return + + dst_db = self.get_database('db') + node = self.queue_info.local_node.provider_node + self.log.info("Provider is %s", node) + self.wait_for_node(dst_db, node) + + def wait_for_node(self, dst_db, node_name): + """Core logic for waiting.""" + + self.log.info("Fetching last tick for %s", node_name) + node_info = self.load_node_info(node_name) + tick_id = node_info.last_tick + + self.log.info("Waiting for tick > %d", tick_id) + + q = "select * from pgq_node.get_node_info(%s)" + dst_curs = dst_db.cursor() + + while 1: + dst_curs.execute(q, [self.queue_name]) + row = dst_curs.fetchone() + dst_db.commit() + + if row['ret_code'] >= 300: + self.log.warning("Problem: %s", row['ret_code'], row['ret_note']) + return + + if row['worker_last_tick'] > tick_id: + self.log.info("Got tick %d, exiting", row['worker_last_tick']) + break + + self.sleep(2) + # # Shortcuts for operating on nodes. # diff --git a/tests/londiste/regen.sh b/tests/londiste/regen.sh index 46741f7e..72e0d73f 100755 --- a/tests/londiste/regen.sh +++ b/tests/londiste/regen.sh @@ -103,7 +103,7 @@ run londiste3 $v conf/londiste_db1.ini add-seq mytable_id_seq msg "Register table on other node with creation" for db in db2 db3 db4 db5; do run psql -d $db -c "create sequence mytable_id_seq" - run londiste3 $v conf/londiste_db1.ini add-seq mytable_id_seq + run londiste3 $v conf/londiste_$db.ini add-seq mytable_id_seq run londiste3 $v conf/londiste_$db.ini add-table mytable --create-full done @@ -115,21 +115,14 @@ for db in db2 db3 db4 db5; do done msg "Wait until tables are in sync on db5" -cnt=0 -while test $cnt -ne 2; do - sleep 5 - cnt=`psql -A -t -d db5 -c "select count(*) from londiste.table_info where merge_state = 'ok'"` - echo " cnt=$cnt" -done + +run londiste3 conf/londiste_db5.ini wait-sync msg "Unregister table2 from root" run londiste3 $v conf/londiste_db1.ini remove-table mytable2 msg "Wait until unregister reaches db5" -while test $cnt -ne 1; do - sleep 5 - cnt=`psql -A -t -d db5 -c "select count(*) from londiste.table_info where merge_state = 'ok'"` - echo " cnt=$cnt" -done + +run londiste3 conf/londiste_db5.ini wait-root ## ## basic setup done @@ -156,16 +149,8 @@ run londiste3 $v conf/londiste_db2.ini worker -d run londiste3 $v conf/londiste_db2.ini change-provider --provider=node3 --dead=node1 msg "Wait until catchup" -top=$(psql -A -t -d db3 -c "select max(tick_id) from pgq.queue join pgq.tick on (tick_queue = queue_id) where queue_name = 'replika'") -echo " top=$top" -while test $cnt -ne 2; do - cur=$(psql -A -t -d db2 -c "select max(tick_id) from pgq.queue join pgq.tick on (tick_queue = queue_id) where queue_name = 'replika'") - echo " cur=$cur" - if test "$cur" = "$top"; then - break - fi - sleep 5 -done +run londiste3 $v conf/londiste_db2.ini wait-provider + msg "Promoting db2 to root" run londiste3 $v conf/londiste_db2.ini takeover node1 --dead-root run londiste3 $v conf/londiste_db2.ini tag-dead node1 -- 2.39.5