cascading work:
authorMarko Kreen <markokr@gmail.com>
Wed, 30 Apr 2008 14:24:01 +0000 (14:24 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 30 Apr 2008 14:24:01 +0000 (14:24 +0000)
- let AdminScript detect number of args by introspection
- track event_id_seq downstream
- on rename update subscribers too
- slightly more fleshed out switchover

python/londiste.py
python/londiste/setup.py
python/pgq/setadmin.py
python/pgq/setconsumer.py
python/pgq/setinfo.py
python/skytools/adminscript.py
sql/pgq/sql/pgq_core.sql
sql/pgq/structure/func_internal.sql
sql/pgq_set/expected/pgq_set.out
sql/pgq_set/sql/pgq_set.sql
sql/pgq_set/structure/functions.sql

index e7e4f95ea01e4d7eabe9ecbe092c717fb9f108a0..44818d8bb51fec2072ceaa6c13483bdf74b5e971 100755 (executable)
@@ -10,33 +10,9 @@ if os.path.exists(os.path.join(sys.path[0], 'londiste.py')) \
     and not os.path.exists(os.path.join(sys.path[0], 'londiste')):
     del sys.path[0]
 
-import londiste
-
-command_usage = """
-%prog [options] INI CMD [subcmd args]
-
-Node Initialization:
-  init-root   NODE_NAME NODE_CONSTR
-  init-branch NODE_NAME NODE_CONSTR --provider=<constr>
-  init-leaf   NODE_NAME NODE_CONSTR --provider=<constr>
-    Initializes node.  Given connstr is kept as global connstring
-    for that node.  Those commands ignore node_db in .ini.
-    The --provider connstr is used only for initial set info
-    fetching, later actual provider's connect string is used.
-
-Node Administration:
-  status                Show set state
-  members               Show members in set
-  rename-node OLD NEW   Rename a node
-  change-provider NODE NEWSRC   
-  pause NODE
-  resume NODE
-
-  switchover NEWROOT
-  failover NEWROOT
-  tag-dead NODE ..      Tag node as dead
-  tag-alive NODE ..     Tag node as alive
+import londiste, pgq.setadmin
 
+command_usage = pgq.setadmin.command_usage + """
 Replication Daemon:
   worker                replay events to subscriber
 
index 31394e8d742d8dd1f081a3304855cafa3c78ccf8..fea3e154b92b74e62ae1d2061e3170ba81fb61c3 100644 (file)
@@ -42,51 +42,51 @@ class LondisteSetup(pgq.setadmin.SetAdmin):
         node_db.commit()
         provider_db.commit()
 
-    def cmd_add(self, args = []):
+    def cmd_add(self, *args):
         q = "select * from londiste.node_add_table(%s, %s)"
         db = self.get_database('node_db')
         self.exec_cmd_many(db, q, [self.set_name], args)
 
-    def cmd_remove(self, args = []):
+    def cmd_remove(self, *args):
         q = "select * from londiste.node_remove_table(%s, %s)"
         db = self.get_database('node_db')
         self.exec_cmd_many(db, q, [self.set_name], args)
 
-    def cmd_add_seq(self, args = []):
+    def cmd_add_seq(self, *args):
         q = "select * from londiste.node_add_seq(%s, %s)"
         db = self.get_database('node_db')
         self.exec_cmd_many(db, q, [self.set_name], args)
 
-    def cmd_remove_seq(self, args = []):
+    def cmd_remove_seq(self, *args):
         q = "select * from londiste.node_remove_seq(%s, %s)"
         db = self.get_database('node_db')
         self.exec_cmd_many(db, q, [self.set_name], args)
 
-    def cmd_resync(self, args = []):
+    def cmd_resync(self, *args):
         q = "select * from londiste.node_resync_table(%s, %s)"
         db = self.get_database('node_db')
         self.exec_cmd_many(db, q, [self.set_name], args)
 
-    def cmd_tables(self, args = []):
+    def cmd_tables(self):
         q = "select table_name, merge_state from londiste.node_get_table_list(%s)"
         db = self.get_database('node_db')
         self.db_display_table(db, "Tables on node", q, [self.set_name])
 
-    def cmd_seqs(self, args = []):
+    def cmd_seqs(self):
         q = "select seq_namefrom londiste.node_get_seq_list(%s)"
         db = self.get_database('node_db')
         self.db_display_table(db, "Sequences on node", q, [self.set_name])
 
-    def cmd_missing(self, args = []):
+    def cmd_missing(self):
         q = "select * from londiste.node_show_missing(%s)"
         db = self.get_database('node_db')
         self.db_display_table(db, "Missing objects on node", q, [self.set_name])
 
-    def cmd_check(self, args = []):
+    def cmd_check(self):
         pass
-    def cmd_fkeys(self, args = []):
+    def cmd_fkeys(self):
         pass
-    def cmd_triggers(self, args = []):
+    def cmd_triggers(self):
         pass
 
 #
index 4f33794a99abff336b4e7526d2c7e76188c64557..0c500468340cec69480598537246ae820e2b63b2 100644 (file)
@@ -6,10 +6,30 @@ from pgq.setinfo import *
 
 __all__ = ['SetAdmin']
 
-command_usage = """
+command_usage = """\
 %prog [options] INI CMD [subcmd args]
 
-commands:
+Node Initialization:
+  init-root   NODE_NAME NODE_CONSTR
+  init-branch NODE_NAME NODE_CONSTR --provider=<constr>
+  init-leaf   NODE_NAME NODE_CONSTR --provider=<constr>
+    Initializes node.  Given connstr is kept as global connstring
+    for that node.  Those commands ignore node_db in .ini.
+    The --provider connstr is used only for initial set info
+    fetching, later actual provider's connect string is used.
+
+Node Administration:
+  status                Show set state
+  members               Show members in set
+  rename-node OLD NEW   Rename a node
+  change-provider NODE NEWSRC   
+  pause NODE
+  resume NODE
+
+  switchover NEWROOT
+  failover NEWROOT
+  tag-dead NODE ..      Tag node as dead
+  tag-alive NODE ..     Tag node as alive
 """
 
 class SetAdmin(skytools.AdminScript):
@@ -37,20 +57,16 @@ class SetAdmin(skytools.AdminScript):
     # Node initialization.
     #
 
-    def cmd_init_root(self, args):
-        if len(args) != 2:
-            raise Exception('init-root needs 2 args')
-        self.init_node('root', args[0], args[1])
+    def cmd_init_root(self, node_name, node_location):
+        self.init_node('root', node_name, node_location)
 
-    def cmd_init_branch(self, args):
+    def cmd_init_branch(self, node_name, node_location):
         if len(args) != 2:
             raise Exception('init-branch needs 2 args')
-        self.init_node('branch', args[0], args[1])
+        self.init_node('branch', node_name, node_location)
 
-    def cmd_init_leaf(self, args):
-        if len(args) != 2:
-            raise Exception('init-leaf needs 2 args')
-        self.init_node('leaf', args[0], args[1])
+    def cmd_init_leaf(self, node_name, node_location):
+        self.init_node('leaf', node_name, node_location)
 
     def init_node(self, node_type, node_name, node_location):
         provider_loc = self.options.provider
@@ -186,7 +202,7 @@ class SetAdmin(skytools.AdminScript):
     # Print status of whole set.
     #
 
-    def cmd_status(self, args):
+    def cmd_status(self):
         root_db = self.find_root_db()
         sinf = self.load_set_info(root_db)
 
@@ -209,9 +225,7 @@ class SetAdmin(skytools.AdminScript):
     # Normal commands.
     #
 
-    def cmd_change_provider(self, args):
-        node_name = args[0]
-        new_provider = args[1]
+    def cmd_change_provider(self, node_name, new_provider):
         old_provider = None
 
         self.load_local_info()
@@ -248,9 +262,7 @@ class SetAdmin(skytools.AdminScript):
         # resume node
         self.resume_node(node_name)
 
-    def cmd_rename_node(self, args):
-        old_name = args[0]
-        new_name = args[1]
+    def cmd_rename_node(self, old_name, new_name):
 
         self.load_local_info()
 
@@ -260,6 +272,7 @@ class SetAdmin(skytools.AdminScript):
         self.pause_node(old_name)
         node = self.load_node_info(old_name)
         provider_node = node.provider_node
+        subscriber_list = self.get_node_subscriber_list(old_name)
 
 
         # create copy of member info / subscriber+queue info
@@ -271,41 +284,76 @@ class SetAdmin(skytools.AdminScript):
         self.exec_cmd(root_db, step1, [self.set_name, old_name, new_name])
         self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
         self.node_cmd(old_name, step1, [self.set_name, old_name, new_name])
+        for child in subscriber_list:
+            self.node_cmd(child, step1, [self.set_name, old_name, new_name])
 
         # step1
         self.node_cmd(old_name, step2, [self.set_name, old_name, new_name])
         self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
+        for child in subscriber_list:
+            self.node_cmd(child, step2, [self.set_name, old_name, new_name])
         self.exec_cmd(root_db, step2, [self.set_name, old_name, new_name])
 
         # resume node
         self.resume_node(old_name)
 
-    def cmd_promote(self):
-        old_root = 'foo'
-        new_root = ''
-        self.pause_node(old_root)
-        ctx = self.load_node_info(old_root)
-        [['old-root', 'PAUSE']]
-        [['old-root', 'demote, set-provider?']]
-        [['new-root', 'wait-for-catch-up']]
-        [['new-root', 'pause']]
-        [['new-root', 'promote']]
-        [['new-root', 'resume']]
-        [['old-root', 'resume']]
-        [['new_parent', 'select * from pgq_set.subscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
-        [['node', 'select * from pgq_set.change_provider(%(set_name)s, %(new_provider)s)']]
-        [['old_parent', 'select * from pgq_set.unsubscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
-        [['node', 'RESUME']]
-
-    def cmd_pause(self, args):
+    def switchover_nonroot(self, old_node, new_node):
+        # see if we need to change new nodes' provider
+        tmp_node = new_node
+        while 1:
+            if tmp_node.is_root():
+                break
+            if tmp_node.name == old_node:
+                # yes, old_node is new_nodes provider,
+                # switch it around
+                self.change_provider(new_node, old_node.parent_node)
+                break
+        self.change_provider(old_node.name, new_node.name)
+
+    def switchover_root(self, old_node, new_node):
+        self.pause_node(old_node.name)
+        self.extra_lockdown(old_node)
+
+        self.wait_for_catchup(new_node, old_node)
+        self.pause_node(new_node.name)
+        self.promote_node(new_node.name)
+        self.subscribe_node(new_node.name, old_node.name, tick_pos)
+        self.unsubscribe_node(new_node.parent_node, new_node.name)
+        self.resume_node(new_node.name)
+
+        # demote & set provider on node
+        q = 'select * from pgq_set.demote_root(%s, %s)'
+        self.node_cmd(old_node.name, q, [self.set_name, new_node.name])
+
+        self.resume_node(old_node.name)
+
+    def cmd_switchover(self, old_node_name, new_node_name):
         self.load_local_info()
-        self.pause_node(args[0])
+        old_node = self.get_node_info(old_node_name)
+        new_node = self.get_node_info(new_node_name)
+        if old_node.name == new_node.name:
+            self.log.info("same node?")
+            return
 
-    def cmd_resume(self, args):
+        if old_node.is_root():
+            self.switchover_root(old_node, new_node)
+        else:
+            self.switchover_nonroot(old_node, new_node)
+
+        # switch subscribers around
+        if self.options.all:
+            for n in self.get_node_subscriber_list(old_node.name):
+                self.change_provider(n, new_node.name)
+
+    def cmd_pause(self, node_name):
+        self.load_local_info()
+        self.pause_node(node_name)
+
+    def cmd_resume(self, node_name):
         self.load_local_info()
-        self.resume_node(args[0])
+        self.resume_node(node_name)
 
-    def cmd_members(self, args):
+    def cmd_members(self):
         db = self.get_database(self.initial_db_name)
         q = "select node_name from pgq_set.get_node_info(%s)"
         rows = self.exec_query(db, q, [self.set_name])
@@ -382,11 +430,11 @@ class SetAdmin(skytools.AdminScript):
 
     def subscribe_node(self, target_node, subscriber_node, tick_pos):
         q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
-        self.node_cmd(target_node, q, [self.set_name, target_node, tick_pos])
+        self.node_cmd(target_node, q, [self.set_name, subscribe_node, tick_pos])
 
-    def unsubscribe_node(self, target_node, subscriber_node, tick_pos):
-        q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
-        self.node_cmd(target_node, q, [self.set_name, target_node, tick_pos])
+    def unsubscribe_node(self, target_node, subscriber_node):
+        q = "select * from pgq_set.unsubscribe_node(%s, %s)"
+        self.node_cmd(target_node, q, [self.set_name, subscribe_node])
 
     def load_node_info(self, node_name):
         db = self.get_node_database(node_name)
@@ -403,6 +451,12 @@ class SetAdmin(skytools.AdminScript):
 
         return SetInfo(self.set_name, info, member_list)
 
+    def get_node_subscriber_list(self, node_name):
+        q = "select node_name, local_watermark from pgq_set.get_subscriber_info(%s)"
+        db = self.get_node_database(node_name)
+        rows = self.exec_query(db, q, [self.set_name])
+        return [r['node_name'] for r in rows]
+
 if __name__ == '__main__':
     script = SetAdmin('set_admin', sys.argv[1:])
     script.start()
index 4ec754e78b7e7eb4516be8396da901921a13c4e2..d17ecf5f1d72146fd3bef52636cd77f45d1ddcee 100644 (file)
@@ -12,6 +12,9 @@ class SetConsumer(skytools.DBScript):
     last_global_wm_publish_time = 0
     main_worker = True
     reg_ok = False
+    actual_dst_event_id = 0
+    batch_max_event_id = 0
+    seq_buffer = 10000
     def __init__(self, service_name, args,
                  node_db_name = 'node_db'):
         skytools.DBScript.__init__(self, service_name, args)
@@ -81,6 +84,8 @@ class SetConsumer(skytools.DBScript):
             # COMBINED_BRANCH needs to sync with part sets
             if dst_node.need_action('sync-part-pos'):
                 self.move_part_positions(dst_curs)
+            if dst_node.need_action('update-event-seq'):
+                self.update_event_seq(dst_curs)
 
         # we are done on target
         self.set_tick_complete(dst_curs, src_queue.cur_tick)
@@ -99,12 +104,31 @@ class SetConsumer(skytools.DBScript):
 
     def process_set_batch(self, src_db, dst_db, ev_list):
         dst_curs = dst_db.cursor()
+        max_id = 0
         for ev in ev_list:
             self.process_set_event(dst_curs, ev)
             if self.dst_queue:
                 self.dst_queue.bulk_insert(dst_curs, ev)
+            if ev.id > max_id:
+                max_id = ev.id
+        self.batch_max_event_id = max_id
         self.stat_increase('count', len(ev_list))
 
+    def update_event_seq(self, dst_curs):
+        qname = self.dst_queue.queue_name
+        if self.actual_dst_event_id == 0:
+            q = "select pgq.seq_getval(queue_event_seq) from pgq.queue where queue_name = %s"
+            dst_curs.execute(q, [qname])
+            self.actual_dst_event_id = dst_curs.fetchone()[0]
+            self.log.debug('got local event_id value = %d' % self.actual_dst_event_id)
+
+        if self.batch_max_event_id + self.seq_buffer >= self.actual_dst_event_id:
+            next_id = self.batch_max_event_id + 2 * self.seq_buffer
+            q = "select pgq.seq_setval(queue_event_seq, %s) from  pgq.queue where queue_name = %s"
+            self.log.debug('set local event_id value = %d' % next_id)
+            dst_curs.execute(q, [next_id, qname])
+            self.actual_dst_event_id = next_id
+
     def process_set_event(self, dst_curs, ev):
         if ev.type == 'set-tick':
             self.handle_set_tick(dst_curs, ev)
index 17ff781c8e5114f2ef8d476037a2a1966a223c1f..25a7f3c3d02b7ef14ba80fd475cb45861c54f3f2 100644 (file)
@@ -16,6 +16,7 @@ MERGE_LEAF = 'merge-leaf'
 action_map = {
 'process-batch':   {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
 'process-events':  {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
+'update-event-seq':{'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
 'copy-events':     {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
 'tick-event':      {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
 'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
index 838b990f16c5a82fef73aa84f554b8c17c523162..399e5cd35003f429f467df63aaea128309e6cf2f 100644 (file)
@@ -3,7 +3,7 @@
 """Admin scripting.
 """
 
-import sys, os
+import sys, os, inspect
 
 from skytools.scripting import DBScript
 from skytools.quoting import quote_statement
@@ -21,13 +21,30 @@ class AdminScript(DBScript):
 
     def work(self):
         self.set_single_loop(1)
+
         cmd = self.args[1]
+        cmdargs = self.args[2:]
+
+        # find function
         fname = "cmd_" + cmd.replace('-', '_')
-        if hasattr(self, fname):
-            getattr(self, fname)(self.args[2:])
-        else:
+        if not hasattr(self, fname):
             self.log.error('bad subcommand, see --help for usage')
             sys.exit(1)
+        fn = getattr(self, fname)
+
+        # check if correct number of arguments
+        (args, varargs, varkw, defaults) = inspect.getargspec(fn)
+        n_args = len(args) - 1 # drop 'self'
+        if varargs is None and n_args != len(cmdargs):
+            helpstr = ""
+            if n_args:
+                helpstr = ": " + " ".join(args[1:])
+            self.log.error("command '%s' got %d args, but expects %d%s"
+                    % (cmd, len(cmdargs), n_args, helpstr))
+            sys.exit(1)
+
+        # run command
+        fn(*cmdargs)
 
     def fetch_list(self, db, sql, args, keycol = None):
         curs = db.cursor()
@@ -53,7 +70,7 @@ class AdminScript(DBScript):
 
         if not fields:
             fields = [f[0] for f in curs.description]
-        
+
         widths = [15] * len(fields)
         for row in rows:
             for i, k in enumerate(fields):
@@ -67,7 +84,7 @@ class AdminScript(DBScript):
             print desc
         print fmt % tuple(fields)
         print fmt % tuple(['-'*15] * len(fields))
-            
+
         for row in rows:
             print fmt % tuple([row[k] for k in fields])
         print '\n'
index ca8de0cf43be49090c4b049977e185e89577da81..247b4a64afea8b6e5924de02a4fd0531615db82a 100644 (file)
@@ -77,3 +77,12 @@ select nextval(queue_event_seq) from pgq.queue where queue_name = 'myqueue';
 select pgq.force_tick('myqueue');
 select nextval(queue_event_seq) from pgq.queue where queue_name = 'myqueue';
 
+create sequence tmptest_seq;
+
+select pgq.seq_getval('tmptest_seq');
+select pgq.seq_setval('tmptest_seq', 10);
+select pgq.seq_setval('tmptest_seq', 5);
+select pgq.seq_setval('tmptest_seq', 15);
+select pgq.seq_getval('tmptest_seq');
+
+
index 8e7939c5f35601308d63181e7a15f0b8a7db2b62..1531cb5194f3a75ee178fdbb6a29c0b9b22a1921 100644 (file)
@@ -23,4 +23,5 @@
 
 \i functions/pgq.grant_perms.sql
 \i functions/pgq.force_tick.sql
+\i functions/pgq.seq_funcs.sql
 
index 5f57f970e66d24e127e231b532b713ab8337345a..00d5a0b261ed4be19f428d553b35f3b042c975e3 100644 (file)
@@ -98,3 +98,9 @@ select * from pgq_set.rename_node_step2('aset', 'node2', 'node2x');
       200 | Ok
 (1 row)
 
+select * from pgq_set.get_subscriber_info('aset');
+ node_name | local_watermark 
+-----------+-----------------
+ node2x    |               1
+(1 row)
+
index d7aa3fb4e6219f396523b547802ffd8bb71a2564..f8129c8159a836abe05e5416936e332aabcd582b 100644 (file)
@@ -30,3 +30,5 @@ select * from pgq_set.is_root(null);
 select * from pgq_set.rename_node_step1('aset', 'node2', 'node2x');
 select * from pgq_set.rename_node_step2('aset', 'node2', 'node2x');
 
+select * from pgq_set.get_subscriber_info('aset');
+
index c19b7449c8e44ffe30d8dd6339fbe57f44d01ec4..50070832f0d95c77cf25505bf7f3723c47e2c48e 100644 (file)
@@ -17,6 +17,7 @@
 
 -- Group: Node Info
 \i functions/pgq_set.get_node_info.sql
+\i functions/pgq_set.get_subscriber_info.sql
 \i functions/pgq_set.is_root.sql
 
 -- Group: Watermark tracking