pgq_set: pause/resume/change-provider/rename-node work now
authorMarko Kreen <markokr@gmail.com>
Fri, 25 Apr 2008 14:48:32 +0000 (14:48 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 25 Apr 2008 14:48:32 +0000 (14:48 +0000)
31 files changed:
python/londiste.py
python/londiste/setup.py
python/pgq/setadmin.py
python/pgq/setconsumer.py
python/pgq/setinfo.py
python/skytools/adminscript.py
sql/londiste/expected/londiste_fkeys.out
sql/londiste/expected/londiste_provider.out
sql/londiste/expected/londiste_subscriber.out
sql/londiste/functions/londiste.node_add_table.sql
sql/londiste/functions/londiste.node_disable_triggers.sql
sql/londiste/functions/londiste.node_prepare_triggers.sql
sql/londiste/functions/londiste.node_refresh_triggers.sql
sql/londiste/functions/londiste.node_remove_seq.sql
sql/londiste/functions/londiste.node_remove_table.sql
sql/londiste/functions/londiste.set_add_table.sql
sql/londiste/functions/londiste.set_remove_table.sql
sql/pgq_set/expected/pgq_set.out
sql/pgq_set/functions/pgq_set.add_member.sql
sql/pgq_set/functions/pgq_set.change_provider.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.create_node.sql
sql/pgq_set/functions/pgq_set.drop_member.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.get_node_info.sql
sql/pgq_set/functions/pgq_set.rename_node.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.set_node_paused.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.set_node_uptodate.sql
sql/pgq_set/functions/pgq_set.subscribe_node.sql
sql/pgq_set/sql/pgq_set.sql
sql/pgq_set/structure/functions.sql
sql/pgq_set/structure/pgq_set.sql
tests/londiste/gendb.sh

index 06ec17db6a81f6066f6793199521d967a726af8c..e7e4f95ea01e4d7eabe9ecbe092c717fb9f108a0 100755 (executable)
@@ -25,13 +25,18 @@ Node Initialization:
     fetching, later actual provider's connect string is used.
 
 Node Administration:
+  status                Show set state
   members               Show members in set
+  rename-node OLD NEW   Rename a node
+  change-provider NODE NEWSRC   
+  pause NODE
+  resume NODE
+
+  switchover NEWROOT
+  failover NEWROOT
   tag-dead NODE ..      Tag node as dead
   tag-alive NODE ..     Tag node as alive
 
-  redirect              Switch provider
-  make-root             Promote to root
-
 Replication Daemon:
   worker                replay events to subscriber
 
@@ -57,7 +62,8 @@ Internal Commands:
 
 cmd_handlers = (
     (('init-root', 'init-branch', 'init-leaf', 'members', 'tag-dead', 'tag-alive',
-      'redirect', 'promote-root', 'status'), londiste.LondisteSetup),
+      'change-provider', 'rename-node', 'status', 'pause', 'resume',
+      'switchover', 'failover'), londiste.LondisteSetup),
     (('add', 'remove', 'add-seq', 'remove-seq', 'tables', 'seqs',
       'missing', 'resync', 'check', 'fkeys'), londiste.LondisteSetup),
     (('worker', 'replay'), londiste.Replicator),
index 7ab6093d6dd2008dd45db7e6da32767eb5638bb2..31394e8d742d8dd1f081a3304855cafa3c78ccf8 100644 (file)
@@ -45,27 +45,27 @@ class LondisteSetup(pgq.setadmin.SetAdmin):
     def cmd_add(self, args = []):
         q = "select * from londiste.node_add_table(%s, %s)"
         db = self.get_database('node_db')
-        self.db_cmd_many(db, q, [self.set_name], args)
+        self.exec_cmd_many(db, q, [self.set_name], args)
 
     def cmd_remove(self, args = []):
         q = "select * from londiste.node_remove_table(%s, %s)"
         db = self.get_database('node_db')
-        self.db_cmd_many(db, q, [self.set_name], args)
+        self.exec_cmd_many(db, q, [self.set_name], args)
 
     def cmd_add_seq(self, args = []):
         q = "select * from londiste.node_add_seq(%s, %s)"
         db = self.get_database('node_db')
-        self.db_cmd_many(db, q, [self.set_name], args)
+        self.exec_cmd_many(db, q, [self.set_name], args)
 
     def cmd_remove_seq(self, args = []):
         q = "select * from londiste.node_remove_seq(%s, %s)"
         db = self.get_database('node_db')
-        self.db_cmd_many(db, q, [self.set_name], args)
+        self.exec_cmd_many(db, q, [self.set_name], args)
 
     def cmd_resync(self, args = []):
         q = "select * from londiste.node_resync_table(%s, %s)"
         db = self.get_database('node_db')
-        self.db_cmd_many(db, q, [self.set_name], args)
+        self.exec_cmd_many(db, q, [self.set_name], args)
 
     def cmd_tables(self, args = []):
         q = "select table_name, merge_state from londiste.node_get_table_list(%s)"
index 7dba1959609270a27c1cd886621201743d9f2388..4f33794a99abff336b4e7526d2c7e76188c64557 100644 (file)
@@ -1,6 +1,6 @@
 #! /usr/bin/env python
 
-import sys, optparse, skytools
+import sys, time, optparse, skytools
 
 from pgq.setinfo import *
 
@@ -13,9 +13,6 @@ commands:
 """
 
 class SetAdmin(skytools.AdminScript):
-    root_name = None
-    root_info = None
-    member_map = {}
     set_name = None
     extra_objs = []
     initial_db_name = 'node_db'
@@ -36,6 +33,10 @@ class SetAdmin(skytools.AdminScript):
         skytools.AdminScript.reload(self)
         self.set_name = self.cf.get('set_name')
 
+    #
+    # Node initialization.
+    #
+
     def cmd_init_root(self, args):
         if len(args) != 2:
             raise Exception('init-root needs 2 args')
@@ -74,9 +75,9 @@ class SetAdmin(skytools.AdminScript):
             global_watermark = None
             combined_set = None
             provider_name = None
-            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+            self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
+            self.exec_cmd(db, "select * from pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
                           [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
             provider_db = None
         else:
@@ -88,22 +89,19 @@ class SetAdmin(skytools.AdminScript):
                 self.log.error("Node '%s' already exists" % node_name)
                 sys.exit(1)
 
-            global_watermark = set.global_watermark
             combined_set = None
 
             provider_db = self.get_database('provider_db', connstr = provider_loc)
-            curs = provider_db.cursor()
-            curs.execute("select node_type, node_name from pgq_set.get_node_info(%s)", [self.set_name])
-            provider_db.commit()
-            row = curs.fetchone()
-            if not row:
+            q = "select node_type, node_name from pgq_set.get_node_info(%s)"
+            res = self.exec_query(provider_db, q, [self.set_name])
+            row = res[0]
+            if not row['node_name']:
                 raise Exception("provider node not found")
             provider_name = row['node_name']
 
             # register member on root
-            self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)",
+            self.exec_cmd(root_db, "select * from pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            root_db.commit()
 
             # lookup provider
             provider = set.get_member(provider_name)
@@ -112,21 +110,26 @@ class SetAdmin(skytools.AdminScript):
                 sys.exit(1)
 
             # register on provider
-            self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
+            self.exec_cmd(provider_db, "select * from pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s)",
-                          [self.set_name, node_name])
-            provider_db.commit()
+            rows = self.exec_cmd(provider_db, "select * from pgq_set.subscribe_node(%s, %s)",
+                                 [self.set_name, node_name])
+            global_watermark = rows[0]['global_watermark']
 
             # initialize node itself
-            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+
+            # insert members
+            self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, provider_name, provider.location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
+            for m in set.member_map.values():
+                self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, %s)",
+                              [self.set_name, m.name, m.location, m.dead])
+
+            # real init
+            self.exec_cmd(db, "select * from pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
                           [self.set_name, node_type, node_name, provider_name,
                            global_watermark, combined_set])
-            db.commit()
+
 
         self.extra_init(node_type, db, provider_db)
 
@@ -145,11 +148,6 @@ class SetAdmin(skytools.AdminScript):
             db = self.get_database('root_db', connstr = loc)
 
 
-            if 1:
-                curs = db.cursor()
-                curs.execute("select current_database()")
-                n = curs.fetchone()[0]
-                self.log.debug("real dbname=%s" % n)
             # query current status
             res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
             info = res[0]
@@ -169,20 +167,9 @@ class SetAdmin(skytools.AdminScript):
                 raise Exception("find_root_db: got loop: %s" % loc)
             loc = info['provider_location']
             if loc is None:
-                self.log.info("Sub node provider not initialized?")
+                self.log.error("Sub node provider not initialized?")
                 sys.exit(1)
 
-    def load_set_info(self, db):
-        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-        info = res[0]
-
-        q = "select * from pgq_set.get_member_info(%s)"
-        member_list = self.exec_query(db, q, [self.set_name])
-
-        db.commit()
-
-        return SetInfo(self.set_name, info, member_list)
-
     def install_code(self, db):
         objs = [
             skytools.DBLanguage("plpgsql"),
@@ -195,6 +182,10 @@ class SetAdmin(skytools.AdminScript):
         skytools.db_install(db.cursor(), objs, self.log)
         db.commit()
 
+    #
+    # Print status of whole set.
+    #
+
     def cmd_status(self, args):
         root_db = self.find_root_db()
         sinf = self.load_set_info(root_db)
@@ -214,17 +205,80 @@ class SetAdmin(skytools.AdminScript):
     def load_extra_status(self, curs, node):
         pass
 
-    def cmd_switch(self, node_name, new_provider):
+    #
+    # Normal commands.
+    #
+
+    def cmd_change_provider(self, args):
+        node_name = args[0]
+        new_provider = args[1]
+        old_provider = None
+
+        self.load_local_info()
+        node_location = self.set_info.get_member(node_name).location
         node_db = self.get_node_database(node_name)
-        new_provider_db = self.get_node_database(new_provider)
-        node_info = self.load_set_info(node_db)
+        node_set_info = self.load_set_info(node_db)
+        node = node_set_info.local_node
+        old_provider = node.provider_node
 
-        # 
-        [['node', 'PAUSE']]
-        [['new_parent', 'select * from pgq_set.subscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
-        [['node', 'select * from pgq_set.change_provider(%(set_name)s, %(new_provider)s)']]
-        [['old_parent', 'select * from pgq_set.unsubscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
-        [['node', 'RESUME']]
+        if old_provider == new_provider:
+            self.log.info("Node %s has already %s as provider" % (node_name, new_provider))
+
+        # pause target node
+        self.pause_node(node_name)
+
+        # reload node info
+        node_set_info = self.load_set_info(node_db)
+        node = node_set_info.local_node
+
+        # subscribe on new provider
+        q = "select * from pgq_set.add_member(%s, %s, %s, false)"
+        self.node_cmd(new_provider, q, [self.set_name, node_name, node_location])
+        q = 'select * from pgq_set.subscribe_node(%s, %s, %s)'
+        self.node_cmd(new_provider, q, [self.set_name, node_name, node.completed_tick])
+
+        # change provider on node
+        q = 'select * from pgq_set.change_provider(%s, %s)'
+        self.node_cmd(node_name, q, [self.set_name, new_provider])
+
+        # unsubscribe from old provider
+        q = "select * from pgq_set.unsubscribe_node(%s, %s)"
+        self.node_cmd(old_provider, q, [self.set_name, node_name])
+
+        # resume node
+        self.resume_node(node_name)
+
+    def cmd_rename_node(self, args):
+        old_name = args[0]
+        new_name = args[1]
+
+        self.load_local_info()
+
+        root_db = self.find_root_db()
+
+        # pause target node
+        self.pause_node(old_name)
+        node = self.load_node_info(old_name)
+        provider_node = node.provider_node
+
+
+        # create copy of member info / subscriber+queue info
+        step1 = 'select * from pgq_set.rename_node_step1(%s, %s, %s)'
+        # rename node itself, drop copies
+        step2 = 'select * from pgq_set.rename_node_step2(%s, %s, %s)'
+
+        # step1
+        self.exec_cmd(root_db, step1, [self.set_name, old_name, new_name])
+        self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
+        self.node_cmd(old_name, step1, [self.set_name, old_name, new_name])
+
+        # step1
+        self.node_cmd(old_name, step2, [self.set_name, old_name, new_name])
+        self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
+        self.exec_cmd(root_db, step2, [self.set_name, old_name, new_name])
+
+        # resume node
+        self.resume_node(old_name)
 
     def cmd_promote(self):
         old_root = 'foo'
@@ -243,27 +297,111 @@ class SetAdmin(skytools.AdminScript):
         [['old_parent', 'select * from pgq_set.unsubscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
         [['node', 'RESUME']]
 
+    def cmd_pause(self, args):
+        self.load_local_info()
+        self.pause_node(args[0])
+
+    def cmd_resume(self, args):
+        self.load_local_info()
+        self.resume_node(args[0])
+
+    def cmd_members(self, args):
+        db = self.get_database(self.initial_db_name)
+        q = "select node_name from pgq_set.get_node_info(%s)"
+        rows = self.exec_query(db, q, [self.set_name])
+
+        desc = 'Member info on %s:' % rows[0]['node_name']
+        q = "select node_name, dead, node_location"\
+            " from pgq_set.get_member_info(%s) order by 1"
+        self.display_table(db, desc, q, [self.set_name])
+
+    #
+    # Shortcuts for operating on nodes.
+    #
+
+    def load_local_info(self):
+        """fetch set info from local node."""
+        db = self.get_database(self.initial_db_name)
+        self.set_info = self.load_set_info(db)
+
+    def get_node_database(self, node_name):
+        """Connect to node."""
+        if node_name == self.set_info.local_node.name:
+            db = self.get_database(self.initial_db_name)
+        else:
+            m = self.set_info.get_member(node_name)
+            if not m:
+                self.log.error("cannot resolve %s" % node_name)
+                sys.exit(1)
+            loc = m.location
+            db = self.get_database('node.' + node_name, connstr = loc)
+        return db
+
+    def close_node_database(self, node_name):
+        """Disconnect node's connection."""
+        if node_name == self.set_info.local_node.name:
+            self.close_database(self.initial_db_name)
+        else:
+            self.close_database("node." + node_name)
+
+    def node_cmd(self, node_name, sql, args):
+        """Execute SQL command on particular node."""
+        db = self.get_node_database(node_name)
+        return self.exec_cmd(db, sql, args)
+
+    #
+    # Various operation on nodes.
+    #
+
+    def set_paused(self, db, pause_flag):
+        q = "select * from pgq_set.set_node_paused(%s, %s)"
+        self.exec_cmd(db, q, [self.set_name, pause_flag])
+
+        self.log.info('Waiting for worker to accept')
+        while 1:
+            q = "select * from pgq_set.get_node_info(%s)"
+            stat = self.exec_query(db, q, [self.set_name])[0]
+            if stat['paused'] != pause_flag:
+                raise Exception('operation canceled? %s <> %s' % (repr(stat['paused']), repr(pause_flag)))
+
+            if stat['uptodate']:
+                break
+            time.sleep(1)
+
+        op = pause_flag and "paused" or "resumed"
+
+        self.log.info("Node %s %s" % (stat['node_name'], op))
+
+    def pause_node(self, node_name):
+        db = self.get_node_database(node_name)
+        self.set_paused(db, True)
+
+    def resume_node(self, node_name):
+        db = self.get_node_database(node_name)
+        self.set_paused(db, False)
+
     def subscribe_node(self, target_node, subscriber_node, tick_pos):
         q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
-        self.node_exec(target_node, q, [self.set_name, target_node, tick_pos])
+        self.node_cmd(target_node, q, [self.set_name, target_node, tick_pos])
 
     def unsubscribe_node(self, target_node, subscriber_node, tick_pos):
         q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
-        self.node_exec(target_node, q, [self.set_name, target_node, tick_pos])
+        self.node_cmd(target_node, q, [self.set_name, target_node, tick_pos])
 
-    def node_cmd(self, node_name, sql, args, commit = True):
-        m = self.lookup_member(node_name)
-        db = self.get_database('node_'+node_name)
-        self.db_cmd(db, sql, args, commit = commit)
+    def load_node_info(self, node_name):
+        db = self.get_node_database(node_name)
+        q = "select * from pgq_set.get_node_info(%s)"
+        rows = self.exec_query(db, q, [self.set_name])
+        return NodeInfo(self.set_name, rows[0])
 
-    def connect_node(self, node_name):
-        sinf = self.get_set_info()
-        m = sinf.get_member(node_name)
-        loc = m.node_location
-        db = self.get_database("node." + node_name, connstr = loc)
+    def load_set_info(self, db):
+        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+        info = res[0]
 
-    def disconnect_node(self, node_name):
-        self.close_database("node." + node_name)
+        q = "select * from pgq_set.get_member_info(%s)"
+        member_list = self.exec_query(db, q, [self.set_name])
+
+        return SetInfo(self.set_name, info, member_list)
 
 if __name__ == '__main__':
     script = SetAdmin('set_admin', sys.argv[1:])
index baa18b28553c66cf75504354c84bacb04a967794..4ec754e78b7e7eb4516be8396da901921a13c4e2 100644 (file)
@@ -28,7 +28,7 @@ class SetConsumer(skytools.DBScript):
         dst_node = self.load_node_info(dst_db)
         if self.main_worker:
             self.consumer_name = dst_node.name
-            if not dst_node.up_to_date:
+            if not dst_node.uptodate:
                 self.tag_node_uptodate(dst_db)
 
         if dst_node.paused:
@@ -108,7 +108,7 @@ class SetConsumer(skytools.DBScript):
     def process_set_event(self, dst_curs, ev):
         if ev.type == 'set-tick':
             self.handle_set_tick(dst_curs, ev)
-        elif ev.type == 'set-member-info':
+        elif ev.type == 'member-info':
             self.handle_member_info(dst_curs, ev)
         elif ev.type == 'global-watermark':
             self.handle_global_watermark(dst_curs, ev)
@@ -133,11 +133,10 @@ class SetConsumer(skytools.DBScript):
             dst_curs.execute(q, [self.set_name, set_name, tick_id])
 
     def handle_member_info(self, dst_curs, ev):
-        data = skytools.db_urldecode(ev.data)
-        set_name = data['set_name']
-        node_name = data['node_name']
-        node_location = data['node_location']
-        dead = data['dead']
+        node_name = ev.ev_data
+        set_name = ev.ev_extra1
+        node_location = ev.ev_extra2
+        dead = ev.ev_extra3
         # this can also be member for part set, ignore then
         if set_name != self.set_name:
             return
index da2e1ba91de06e4c60336a94679f9305ea63ddc6..17ff781c8e5114f2ef8d476037a2a1966a223c1f 100644 (file)
@@ -1,11 +1,10 @@
 #! /usr/bin/env python
 
-import skytools
-
 __all__ = ['MemberInfo', 'NodeInfo', 'SetInfo',
         'ROOT', 'BRANCH', 'LEAF', 'COMBINED_ROOT',
         'COMBINED_BRANCH', 'MERGE_LEAF']
 
+# node types
 ROOT = 'root'
 BRANCH = 'branch'
 LEAF = 'leaf'
@@ -13,6 +12,7 @@ COMBINED_ROOT = 'combined-root'
 COMBINED_BRANCH = 'combined-branch'
 MERGE_LEAF = 'merge-leaf'
 
+# which nodes need to do what actions
 action_map = {
 'process-batch':   {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
 'process-events':  {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
@@ -46,7 +46,7 @@ class NodeInfo:
         self.provider_location = row['provider_location']
         self.paused = row['paused']
         self.resync = row['resync']
-        self.up_to_date = row['up_to_date']
+        self.uptodate = row['uptodate']
         self.combined_set = row['combined_set']
         self.combined_type = row['combined_type']
         self.combined_queue = row['combined_queue']
@@ -56,15 +56,18 @@ class NodeInfo:
         self._info_lines = []
 
     def need_action(self, action_name):
+        """Returns True if worker for this node needs
+        to do specified action.
+        """
         if not self.main_worker:
             return action_name in ('process-batch', 'process-events')
 
         typ = self.type
-        if type == 'merge-leaf':
-            if self.target_type == 'combined-branch':
-                typ += "merge-leaf-to-branch"
-            elif self.target_type == 'combined-root':
-                typ += "merge-leaf-to-root"
+        if type == MERGE_LEAF:
+            if self.target_type == COMBINED_BRANCH:
+                typ = "merge-leaf-to-branch"
+            elif self.target_type == COMBINED_ROOT:
+                typ = "merge-leaf-to-root"
             else:
                 raise Exception('bad target type')
 
@@ -94,7 +97,12 @@ class NodeInfo:
             lag = root_time - tick_time
         else:
             lag = self.queue_info['ticker_lag']
-        lst.append("lag: %s" % lag)
+        txt = "lag: %s" % lag
+        if self.paused:
+            txt += ", PAUSED"
+        if not self.uptodate:
+            txt += ", NOT UPTODATE"
+        lst.append(txt)
         return lst
     
     def add_info_line(self, ln):
@@ -119,13 +127,11 @@ class NodeInfo:
 
 class SetInfo:
     def __init__(self, set_name, info_row, member_rows):
-        self.root_info = info_row
+        self.local_node = NodeInfo(set_name, info_row)
         self.set_name = set_name
         self.member_map = {}
         self.node_map = {}
-        self.root_name = info_row['node_name']
-        self.root_type = info_row['node_type']
-        self.global_watermark = info_row['global_watermark']
+        self.add_node(self.local_node)
 
         for r in member_rows:
             n = MemberInfo(r)
@@ -137,22 +143,23 @@ class SetInfo:
     def get_node(self, name):
         return self.node_map.get(name)
 
-    def get_root_node(self):
-        return self.get_node(self.root_name)
-
     def add_node(self, node):
         self.node_map[node.name] = node
 
     _DATAFMT = "%-30s%s"
     def print_tree(self):
-        self._prepare_tree()
-        root = self.get_root_node()
+        """Print ascii-tree for set.
+        Expects that data for all nodes is filled in."""
+
+        root = self._prepare_tree()
         self._tree_calc(root)
         datalines = self._print_node(root, '', [])
         for ln in datalines:
             print self._DATAFMT % (' ', ln)
 
     def _print_node(self, node, pfx, datalines):
+        # print a tree fragment for node and info
+        # returns list of unprinted data rows
         for ln in datalines:
             print self._DATAFMT % (_setpfx(pfx, '|'), ln)
         datalines = node.get_infolines()
@@ -160,16 +167,20 @@ class SetInfo:
 
         for i, n in enumerate(node.child_list):
             sfx = ((i < len(node.child_list) - 1) and '  |' or '   ')
-            tmppfx = pfx + sfx
-            datalines = self._print_node(n, tmppfx, datalines)
+            datalines = self._print_node(n, pfx + sfx, datalines)
 
         return datalines
 
     def _prepare_tree(self):
+        # reset vars, fill parent and child_list for each node
+        # returns root
+        root = None
         for node in self.node_map.itervalues():
             node.total_childs = 0
             node.levels = 0
             node.child_list = []
+            if node.type in (ROOT, COMBINED_ROOT):
+                root = node
         for node in self.node_map.itervalues():
             if node.provider_node:
                 p = self.node_map[node.provider_node]
@@ -178,7 +189,13 @@ class SetInfo:
             else:
                 node.parent = None
 
+        if root is None:
+            raise Exception("root nod enot found")
+        return root
+
     def _tree_calc(self, node):
+        # calculate levels and count total childs
+        # sort the tree based on them
         total = len(node.child_list)
         levels = 1
         for subnode in node.child_list:
@@ -199,7 +216,7 @@ def _setpfx(pfx, sfx):
 def _cmp_node(n1, n2):
     # returns neg if n1 smaller
     cmp = n1.levels - n2.levels
-    if cmp:
-        return cmp
-    return n1.total_childs - n2.total_childs
+    if cmp == 0:
+        cmp = n1.total_childs - n2.total_childs
+    return cmp
 
index 15aa13e7e499aa84e6aae31bfe09023c773bdcbc..838b990f16c5a82fef73aa84f554b8c17c523162 100644 (file)
@@ -3,9 +3,10 @@
 """Admin scripting.
 """
 
-import sys, os, skytools
+import sys, os
 
 from skytools.scripting import DBScript
+from skytools.quoting import quote_statement
 
 __all__ = ['AdminScript']
 
@@ -28,20 +29,25 @@ class AdminScript(DBScript):
             self.log.error('bad subcommand, see --help for usage')
             sys.exit(1)
 
-    def fetch_list(self, curs, sql, args, keycol = None):
+    def fetch_list(self, db, sql, args, keycol = None):
+        curs = db.cursor()
         curs.execute(sql, args)
-        rows = curs.dictfetchall()
+        rows = curs.fetchall()
+        db.commit()
         if not keycol:
             res = rows
         else:
             res = [r[keycol] for r in rows]
         return res
 
-    def display_table(self, desc, curs, sql, args = [], fields = []):
+    def display_table(self, db, desc, sql, args = [], fields = []):
         """Display multirow query as a table."""
 
+        self.log.debug("display_table: %s" % quote_statement(sql, args))
+        curs = db.cursor()
         curs.execute(sql, args)
         rows = curs.fetchall()
+        db.commit()
         if len(rows) == 0:
             return 0
 
@@ -67,70 +73,80 @@ class AdminScript(DBScript):
         print '\n'
         return 1
 
-    def db_display_table(self, db, desc, sql, args = [], fields = []):
-        curs = db.cursor()
-        res = self.display_table(desc, curs, sql, args, fields)
-        db.commit()
-        return res
-        
 
-    def exec_checked(self, curs, sql, args):
+    def _exec_cmd(self, curs, sql, args):
+        self.log.debug("exec_cmd: %s" % quote_statement(sql, args))
         curs.execute(sql, args)
         ok = True
-        for row in curs.fetchall():
-            level = row['ret_code'] / 100
+        rows = curs.fetchall()
+        for row in rows:
+            try:
+                code = row['ret_code']
+                msg = row['ret_note']
+            except KeyError:
+                self.log.error("Query does not conform to exec_cmd API:")
+                self.log.error("SQL: %s" % quote_statement(sql, args))
+                self.log.error("Row: %s" % repr(row.copy()))
+                sys.exit(1)
+            level = code / 100
             if level == 1:
-                self.log.debug("%d %s" % (row[0], row[1]))
+                self.log.debug("%d %s" % (code, msg))
             elif level == 2:
-                self.log.info("%d %s" % (row[0], row[1]))
+                self.log.info("%d %s" % (code, msg))
             elif level == 3:
-                self.log.warning("%d %s" % (row[0], row[1]))
+                self.log.warning("%d %s" % (code, msg))
             else:
-                self.log.error("%d %s" % (row[0], row[1]))
+                self.log.error("%d %s" % (code, msg))
+                self.log.error("Query was: %s" % skytools.quote_statement(sql, args))
                 ok = False
-        return ok
+        return (ok, rows)
 
-    def exec_many(self, curs, sql, baseargs, extra_list):
+    def _exec_cmd_many(self, curs, sql, baseargs, extra_list):
         ok = True
+        rows = []
         for a in extra_list:
-            tmp = self.exec_checked(curs, sql, baseargs + [a])
-            ok = tmp and ok
-        return ok
+            (tmp_ok, tmp_rows) = self._exec_cmd(curs, sql, baseargs + [a])
+            ok = tmp_ok and ok
+            rows += tmp_rows
+        return (ok, rows)
 
-    def db_cmd(self, db, q, args, commit = True):
-        ok = self.exec_checked(db.cursor(), q, args)
+    def exec_cmd(self, db, q, args, commit = True):
+        (ok, rows) = self._exec_cmd(db.cursor(), q, args)
         if ok:
             if commit:
                 self.log.info("COMMIT")
                 db.commit()
+            return rows
         else:
             self.log.info("ROLLBACK")
             db.rollback()
             raise EXception("rollback")
 
-    def db_cmd_many(self, db, sql, baseargs, extra_list, commit = True):
+    def exec_cmd_many(self, db, sql, baseargs, extra_list, commit = True):
         curs = db.cursor()
-        ok = self.exec_many(curs, sql, baseargs, extra_list)
+        (ok, rows) = self._exec_cmd_many(curs, sql, baseargs, extra_list)
         if ok:
             if commit:
                 self.log.info("COMMIT")
                 db.commit()
+            return rows
         else:
             self.log.info("ROLLBACK")
             db.rollback()
+            raise EXception("rollback")
 
 
-    def exec_sql(self, db, q, args):
-        self.log.debug(q)
+    def exec_stmt(self, db, sql, args):
+        self.log.debug("exec_stmt: %s" % quote_statement(sql, args))
         curs = db.cursor()
-        curs.execute(q, args)
+        curs.execute(sql, args)
         db.commit()
 
-    def exec_query(self, db, q, args):
-        self.log.debug(q)
+    def exec_query(self, db, sql, args):
+        self.log.debug("exec_query: %s" % quote_statement(sql, args))
         curs = db.cursor()
-        curs.execute(q, args)
-        res = curs.dictfetchall()
+        curs.execute(sql, args)
+        res = curs.fetchall()
         db.commit()
         return res
 
index 5c2a0cfbf711516413373ee745c63de46791cec0..ed220cc827e68c23414bfb37fec3946aadafeac2 100644 (file)
@@ -17,37 +17,37 @@ create table ref_3 (
 );
 NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "ref_3_pkey" for table "ref_3"
 select * from londiste.set_add_table('branch_set', 'public.ref_1');
- ret_code | ret_desc 
+ ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
 select * from londiste.set_add_table('branch_set', 'public.ref_2');
- ret_code | ret_desc 
+ ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
 select * from londiste.set_add_table('branch_set', 'public.ref_3');
- ret_code | ret_desc 
+ ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
 select * from londiste.node_add_table('branch_set', 'public.ref_1');
- ret_code |         ret_desc          
+ ret_code |         ret_note          
 ----------+---------------------------
       200 | Table added: public.ref_1
 (1 row)
 
 select * from londiste.node_add_table('branch_set', 'public.ref_2');
- ret_code |         ret_desc          
+ ret_code |         ret_note          
 ----------+---------------------------
       200 | Table added: public.ref_2
 (1 row)
 
 select * from londiste.node_add_table('branch_set', 'public.ref_3');
- ret_code |         ret_desc          
+ ret_code |         ret_note          
 ----------+---------------------------
       200 | Table added: public.ref_3
 (1 row)
index 52f12e88a0a0cd3644597cd2c345406418022246..bfa0d4a4716ef81314f9d229aae8016e5d03fe1f 100644 (file)
@@ -24,19 +24,19 @@ select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
 (1 row)
 
 select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
- ret_code | ret_desc 
+ ret_code | ret_note 
 ----------+----------
       200 | Ok
 (1 row)
 
 select * from londiste.node_add_table('aset', 'public.testdata_nopk');
- ret_code |                      ret_desc                      
+ ret_code |                      ret_note                      
 ----------+----------------------------------------------------
       400 | Primary key missing on table: public.testdata_nopk
 (1 row)
 
 select * from londiste.node_add_table('aset', 'public.testdata');
- ret_code |           ret_desc           
+ ret_code |           ret_note           
 ----------+------------------------------
       200 | Table added: public.testdata
 (1 row)
@@ -49,13 +49,13 @@ select * from londiste.node_get_table_list('aset');
 (1 row)
 
 select * from londiste.node_remove_table('aset', 'public.testdata');
- ret_code |            ret_desc            
+ ret_code |            ret_note            
 ----------+--------------------------------
       200 | Table removed: public.testdata
 (1 row)
 
 select * from londiste.node_remove_table('aset', 'public.testdata');
- ret_code |          ret_desc          
+ ret_code |          ret_note          
 ----------+----------------------------
       400 | Not found: public.testdata
 (1 row)
index b09399ae1b61ece5e4627fcbf13466530cc26188..feb73397c33a7516704ce086e5b560f140ce5368 100644 (file)
@@ -26,25 +26,25 @@ select * from pgq_set.add_member('branch_set', 'pnode', 'dbname=db2', false);
 (1 row)
 
 select * from pgq_set.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
- ret_code | ret_desc 
+ ret_code | ret_note 
 ----------+----------
       200 | Ok
 (1 row)
 
 select * from londiste.node_add_table('branch_set', 'public.slavedata');
- ret_code |                   ret_desc                    
+ ret_code |                   ret_note                    
 ----------+-----------------------------------------------
       400 | Table not registered in set: public.slavedata
 (1 row)
 
 select * from londiste.set_add_table('branch_set', 'public.slavedata');
- ret_code | ret_desc 
+ ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
 select * from londiste.node_add_table('branch_set', 'public.slavedata');
- ret_code |           ret_desc            
+ ret_code |           ret_note            
 ----------+-------------------------------
       200 | Table added: public.slavedata
 (1 row)
@@ -56,13 +56,13 @@ select * from londiste.node_get_table_list('branch_set');
 (1 row)
 
 select * from londiste.node_remove_table('branch_set', 'public.slavedata');
- ret_code |            ret_desc             
+ ret_code |            ret_note             
 ----------+---------------------------------
       200 | Table removed: public.slavedata
 (1 row)
 
 select * from londiste.node_remove_table('branch_set', 'public.slavedata');
- ret_code |          ret_desc           
+ ret_code |          ret_note           
 ----------+-----------------------------
       400 | Not found: public.slavedata
 (1 row)
index a2cebe050313b03dd29c41db9b80b54a3497dc78..061251674c0d09e3d7a079ca93cacc14b31834c6 100644 (file)
@@ -2,7 +2,7 @@ create or replace function londiste.node_add_table(
     in i_set_name       text,
     in i_table_name     text,
     out ret_code        int4,
-    out ret_desc        text)
+    out ret_note        text)
 as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_add_table(2)
@@ -21,24 +21,24 @@ begin
     fq_table_name := londiste.make_fqname(i_table_name);
     col_types := londiste.find_column_types(fq_table_name);
     if position('k' in col_types) < 1 then
-        select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_desc;
+        select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
         return;
     end if;
 
     perform 1 from pgq_set.set_info where set_name = i_set_name;
     if not found then
-        select 400, 'No such set: ' || i_set_name into ret_code, ret_desc;
+        select 400, 'No such set: ' || i_set_name into ret_code, ret_note;
         return;
     end if;
 
     perform 1 from londiste.node_table where set_name = i_set_name and table_name = fq_table_name;
     if found then
-        select 200, 'Table already added: ' || fq_table_name into ret_code, ret_desc;
+        select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
         return;
     end if;
 
     if pgq_set.is_root(i_set_name) then
-        select * into ret_code, ret_desc
+        select * into ret_code, ret_note
             from londiste.set_add_table(i_set_name, fq_table_name);
         if ret_code <> 200 then
             return;
@@ -48,7 +48,7 @@ begin
     else
         perform 1 from londiste.set_table where set_name = i_set_name and table_name = fq_table_name;
         if not found then
-            select 400, 'Table not registered in set: ' || fq_table_name into ret_code, ret_desc;
+            select 400, 'Table not registered in set: ' || fq_table_name into ret_code, ret_note;
             return;
         end if;
         new_state := NULL;
@@ -57,8 +57,8 @@ begin
     insert into londiste.node_table (set_name, table_name, merge_state)
         values (i_set_name, fq_table_name, new_state);
 
-    for ret_code, ret_desc in
-        select f.ret_code, f.ret_desc
+    for ret_code, ret_note in
+        select f.ret_code, f.ret_note
         from londiste.node_prepare_triggers(i_set_name, fq_table_name) f
     loop
         if ret_code > 299 then
@@ -66,8 +66,8 @@ begin
         end if;
     end loop;
 
-    for ret_code, ret_desc in
-        select f.ret_code, f.ret_desc
+    for ret_code, ret_note in
+        select f.ret_code, f.ret_note
         from londiste.node_refresh_triggers(i_set_name, fq_table_name) f
     loop
         if ret_code > 299 then
@@ -75,7 +75,7 @@ begin
         end if;
     end loop;
 
-    select 200, 'Table added: ' || fq_table_name into ret_code, ret_desc;
+    select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql strict;
index df5a0759c13813683867d89ec425cea3a852f0e5..f86dc8699b4eabbe1d532866d387cd25bc8f33de 100644 (file)
@@ -3,7 +3,7 @@ create or replace function londiste.node_disable_triggers(
     in i_set_name   text,
     in i_table_name text,
     out ret_code    int4,
-    out ret_desc    text)
+    out ret_note    text)
 returns setof record strict as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_disable_triggers(2)
@@ -38,7 +38,7 @@ begin
                 || ' on ' || londiste.quote_fqname(fq_table_name);
             select 200, 'Dropped trigger ' || tg.tg_name
                 || ' from table ' || fq_table_name
-                into ret_code, ret_desc;
+                into ret_code, ret_note;
                 return next;
         end if;
     end loop;
@@ -49,7 +49,7 @@ $$ language plpgsql security definer;
 create or replace function londiste.node_disable_triggers(
     in i_set_name   text,
     out ret_code    int4,
-    out ret_desc    text)
+    out ret_note    text)
 returns setof record strict as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_disable_triggers(1)
@@ -64,8 +64,8 @@ begin
          where set_name = i_set_name
          order by nr
     loop
-        for ret_code, ret_desc in
-            select f.ret_code, f.ret_desc
+        for ret_code, ret_note in
+            select f.ret_code, f.ret_note
                 from londiste.node_disable_triggers(i_set_name, t.table_name) f
         loop
             return next;
index ade870d46436038cdadaa201582bd31b5421641f..6a82f1dd442836221f5b51327ba3fc9288efe596 100644 (file)
@@ -3,7 +3,7 @@ create or replace function londiste.node_prepare_triggers(
     in i_set_name   text,
     in i_table_name text,
     out ret_code    int4,
-    out ret_desc    text)
+    out ret_note    text)
 returns setof record strict as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_prepare_triggers(2)
@@ -22,7 +22,7 @@ begin
     fq_table_name := londiste.make_fqname(i_table_name);
     select queue_name into qname from pgq_set.set_info where set_name = i_set_name;
     if not found then
-        select 400, 'Set not found: ' || i_set_name into ret_code, ret_desc;
+        select 400, 'Set not found: ' || i_set_name into ret_code, ret_note;
         return next;
         return;
     end if;
@@ -33,7 +33,7 @@ begin
             || ' for each row execute procedure pgq.sqltriga(' || quote_literal(qname) || ')';
         insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
         values (i_set_name, fq_table_name, logtrg_name, 'root', logtrg);
-        select 200, logtrg into ret_code, ret_desc;
+        select 200, logtrg into ret_code, ret_note;
         return next;
     end if;
 
@@ -43,7 +43,7 @@ begin
         || ' for each row execute procedure pgq.denytriga(' || quote_literal(i_set_name) || ')';
     insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
     values (i_set_name, fq_table_name, denytrg_name, 'non-root', denytrg);
-    select 200, denytrg into ret_code, ret_desc;
+    select 200, denytrg into ret_code, ret_note;
     return next;
 
     return;
index 76da2d229ce979a915ebe979bdb2566ec1633fb4..68d3f985a3fb085eddd26314c999ed2a22552164 100644 (file)
@@ -3,7 +3,7 @@ create or replace function londiste.node_refresh_triggers(
     in i_set_name   text,
     in i_table_name text,
     out ret_code    int4,
-    out ret_desc    text)
+    out ret_note    text)
 returns setof record strict as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_refresh_triggers(2)
@@ -35,7 +35,7 @@ begin
             select 400, 'trigger ' || tg.tg_name
                 || ' on table ' || fq_table_name
                 || ' had unsupported type: ' || tg.tg_type
-                into ret_code, ret_desc;
+                into ret_code, ret_note;
             return next;
         else
             -- check if active
@@ -51,7 +51,7 @@ begin
                     execute tg.tg_def;
                     select 200, 'Created trigger ' || tg.tg_name
                         || ' on table ' || fq_table_name
-                        into ret_code, ret_desc;
+                        into ret_code, ret_note;
                     return next;
                 end if;
             else
@@ -61,7 +61,7 @@ begin
                         || ' on ' || londiste.quote_fqname(fq_table_name);
                     select 200, 'Dropped trigger ' || tg.tg_name
                         || ' from table ' || fq_table_name
-                        into ret_code, ret_desc;
+                        into ret_code, ret_note;
                     return next;
                 end if;
             end if;
@@ -74,7 +74,7 @@ $$ language plpgsql security definer;
 create or replace function londiste.node_refresh_triggers(
     in i_set_name   text,
     out ret_code    int4,
-    out ret_desc    text)
+    out ret_note    text)
 returns setof record strict as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_refresh_triggers(2)
@@ -89,8 +89,8 @@ begin
          where set_name = i_set_name
          order by nr
     loop
-        for ret_code, ret_desc in
-            select f.ret_code, f.ret_desc
+        for ret_code, ret_note in
+            select f.ret_code, f.ret_note
                 from londiste.node_refresh_triggers(i_set_name, t.table_name) f
         loop
             return next;
index 3906ca5d7fadc4032618842e2b62044aa65bb448..ae14254736fab020b454db3c7c0c9b3e134dae95 100644 (file)
@@ -1,19 +1,19 @@
 
 create or replace function londiste.provider_remove_seq(
     in i_set_name text, in i_seq_name text,
-    out ret_code int4, out ret_desc text)
+    out ret_code int4, out ret_note text)
 as $$
 begin
     delete from londiste.node_seq
         where set_name = i_set_name
           and seq_name = i_seq_name;
     if not found then
-        select 400, 'Not found: '||i_seq_name into ret_code, ret_desc;
+        select 400, 'Not found: '||i_seq_name into ret_code, ret_note;
         return;
     end if;
 
     -- perform londiste.provider_notify_change(i_queue_name);
-    select 200, 'OK' into ret_code, ret_desc;
+    select 200, 'OK' into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql strict;
index 93b7d21b05378089b2a640f3e229ab5cf286e1a8..9e4ff637ebc43879662d07f794c0188b2113fdcf 100644 (file)
@@ -1,15 +1,15 @@
 
 create or replace function londiste.node_remove_table(
     in i_set_name text, in i_table_name text,
-    out ret_code int4, out ret_desc text)
+    out ret_code int4, out ret_note text)
 as $$
 declare
     fq_table_name text;
 begin
     fq_table_name := londiste.make_fqname(i_table_name);
 
-    for ret_code, ret_desc in
-        select f.ret_code, f.ret_desc from londiste.node_disable_triggers(i_set_name, fq_table_name) f
+    for ret_code, ret_note in
+        select f.ret_code, f.ret_note from londiste.node_disable_triggers(i_set_name, fq_table_name) f
     loop
         if ret_code > 299 then
             return;
@@ -22,7 +22,7 @@ begin
         where set_name = i_set_name
           and table_name = fq_table_name;
     if not found then
-        select 400, 'Not found: ' || fq_table_name into ret_code, ret_desc;
+        select 400, 'Not found: ' || fq_table_name into ret_code, ret_note;
         return;
     end if;
 
@@ -31,7 +31,7 @@ begin
         perform londiste.root_notify_change(i_set_name, 'remove-table', fq_table_name);
     end if;
 
-    select 200, 'Table removed: ' || fq_table_name into ret_code, ret_desc;
+    select 200, 'Table removed: ' || fq_table_name into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql strict;
index 7e11a84cad3781eaeb124e5b03e1790096444dc9..8a6461c83b9e456c55d7564d568c7c5fb16c03a8 100644 (file)
@@ -3,7 +3,7 @@ create or replace function londiste.set_add_table(
     in i_set_name       text,
     in i_table_name     text,
     out ret_code        int4,
-    out ret_desc        text)
+    out ret_note        text)
 as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.node_add_table(x)
@@ -29,19 +29,19 @@ begin
 
     perform 1 from pgq_set.set_info where set_name = i_set_name;
     if not found then
-        select 400, 'No such set: ' || i_set_name into ret_code, ret_desc;
+        select 400, 'No such set: ' || i_set_name into ret_code, ret_note;
         return;
     end if;
 
     perform 1 from londiste.set_table where set_name = i_set_name and table_name = fq_table_name;
     if found then
-        select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_desc;
+        select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_note;
         return;
     end if;
 
     insert into londiste.set_table (set_name, table_name)
         values (i_set_name, fq_table_name);
-    select 200, 'OK' into ret_code, ret_desc;
+    select 200, 'OK' into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql strict;
index 080785a4f99aa10d40df8d6e6f92998895347742..683ea837ee799c33d843c5f05417fe98e3f26e0f 100644 (file)
@@ -1,7 +1,7 @@
 
 create or replace function londiste.set_remove_table(
     in i_set_name text, in i_table_name text,
-    out ret_code int4, out ret_desc text)
+    out ret_code int4, out ret_note text)
 as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.set_remove_table(2)
@@ -29,10 +29,10 @@ begin
         where set_name = i_set_name
           and table_name = fq_table_name;
     if not found then
-        select 400, 'Not found: '||fq_table_name into ret_code, ret_desc;
+        select 400, 'Not found: '||fq_table_name into ret_code, ret_note;
         return;
     end if;
-    select 200, 'OK' into ret_code, ret_desc;
+    select 200, 'OK' into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql strict;
index e99129f0418238572bd6c1fb168bd8829c71ee5f..5f57f970e66d24e127e231b532b713ab8337345a 100644 (file)
@@ -2,25 +2,25 @@
 select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false);
  ret_code | ret_note 
 ----------+----------
-      200 | Ok
+      100 | Ok
 (1 row)
 
 select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false);
  ret_code | ret_note 
 ----------+----------
-      200 | Ok
+      100 | Ok
 (1 row)
 
 select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false);
  ret_code | ret_note 
 ----------+----------
-      200 | Ok
+      100 | Ok
 (1 row)
 
 select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false);
  ret_code | ret_note 
 ----------+----------
-      200 | Ok
+      100 | Ok
 (1 row)
 
 select * from pgq_set.get_member_info('aset');
@@ -47,15 +47,33 @@ select * from pgq_set.get_member_info('aset');
 (3 rows)
 
 select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null);
- ret_code | ret_desc 
+ ret_code |                     ret_note                      
+----------+---------------------------------------------------
+      200 | Node "node1" added to set "aset" with type "root"
+(1 row)
+
+select * from pgq_set.subscribe_node('aset', 'node2');
+ ret_code | ret_note | global_watermark 
+----------+----------+------------------
+      200 | Ok       |                1
+(1 row)
+
+select * from pgq_set.subscribe_node('aset', 'node3', 1);
+ ret_code | ret_note | global_watermark 
+----------+----------+------------------
+      200 | Ok       |                1
+(1 row)
+
+select * from pgq_set.unsubscribe_node('aset', 'node3');
+ ret_code | ret_note 
 ----------+----------
       200 | Ok
 (1 row)
 
 select * from pgq_set.get_node_info('aset');
node_type | node_name | queue_name | global_watermark | local_watermark | completed_tick | provider_node | provider_location | paused | resync | up_to_date | combined_set | combined_type | combined_queue 
------------+-----------+------------+------------------+-----------------+----------------+---------------+-------------------+--------+--------+------------+--------------+---------------+----------------
root      | node1     | aset       |                1 |               1 |                |               |                   | f      | f      | f          |              |               | 
ret_code | ret_note | node_type | node_name | queue_name | global_watermark | local_watermark | completed_tick | provider_node | provider_location | paused | resync | uptodate | combined_set | combined_type | combined_queue 
+----------+----------+-----------+-----------+------------+------------------+-----------------+----------------+---------------+-------------------+--------+--------+----------+--------------+---------------+----------------
     100 | Ok       | root      | node1     | aset       |                1 |               1 |                |               |                   | f      | f      | f        |              |               | 
 (1 row)
 
 select * from pgq_set.is_root('q');
@@ -68,3 +86,15 @@ select * from pgq_set.is_root('aset');
 
 select * from pgq_set.is_root(null);
 ERROR:  set does not exist: <NULL>
+select * from pgq_set.rename_node_step1('aset', 'node2', 'node2x');
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.rename_node_step2('aset', 'node2', 'node2x');
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
index 064b65e81dda89fc7b11a0496a519bd80e359455..53cb2733a070ecf958efc990691ae7a8eeb5f779 100644 (file)
@@ -28,7 +28,13 @@ returns record as $$
 -- ----------------------------------------------------------------------
 declare
     o  record;
+    node record;
 begin
+    select node_type in ('root', 'combined-root') as is_root
+      into node
+      from pgq_set.set_info where set_name = i_set_name
+       for update;
+
     select node_location into o
       from pgq_set.member_info
      where set_name = i_set_name
@@ -43,7 +49,15 @@ begin
         insert into pgq_set.member_info (set_name, node_name, node_location, dead)
         values (i_set_name, i_node_name, i_node_location, i_dead);
     end if;
-    select 200, 'Ok' into ret_code, ret_note;
+
+    if node.is_root then
+        perform pgq.insert_event(s.queue_name, 'member-info',
+                                 i_node_name, i_set_name, i_node_location, i_dead::text, null)
+           from pgq_set.set_info s
+         where s.set_name = i_set_name;
+    end if;
+
+    select 100, 'Ok' into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql security definer;
diff --git a/sql/pgq_set/functions/pgq_set.change_provider.sql b/sql/pgq_set/functions/pgq_set.change_provider.sql
new file mode 100644 (file)
index 0000000..29ed310
--- /dev/null
@@ -0,0 +1,30 @@
+
+create or replace function pgq_set.change_provider(
+    in i_set_name text,
+    in i_new_provider text,
+    out ret_code int4,
+    out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.change_provider(2)
+--
+--      Change provider for this node.
+--
+-- Parameters:
+--      i_set_name  - set name
+--      i_new_provider - node name for new provider
+-- ----------------------------------------------------------------------
+begin
+    update pgq_set.set_info
+       set provider_node = i_new_provider,
+           uptodate = false
+     where set_name = i_set_name;
+    if not found then
+        select 404, 'Unknown set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
+    select 200, 'Node provider set to : ' || i_new_provider into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
index 84ada82c80a401923f356922db850079a6249597..65a2dd253bee0e88ed8d5a8b3f8b71219c10288e 100644 (file)
@@ -7,7 +7,7 @@ create or replace function pgq_set.create_node(
     in i_global_watermark bigint,
     in i_combined_set text,
     out ret_code int4,
-    out ret_desc text)
+    out ret_note  text)
 returns record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq_set.create_node(7)
@@ -23,7 +23,7 @@ returns record as $$
 --      i_combined_set - merge-leaf: target set
 --
 -- Returns:
---      desc
+--      401 - node already initialized
 --
 -- Node Types:
 --      root - master node
@@ -38,6 +38,12 @@ declare
     _wm_consumer text;
     _global_wm bigint;
 begin
+    perform 1 from pgq_set.set_info where set_name = i_set_name;
+    if found then
+        select 401, 'Node already initialized' into ret_code, ret_note;
+        return;
+    end if;
+
     if i_node_type in ('root', 'combined-root') then
         if coalesce(i_provider_name, i_global_watermark::text,
                     i_combined_set) is not null then
@@ -84,7 +90,9 @@ begin
             values (i_set_name, i_node_name, _global_wm);
     end if;
 
-    select 200, 'Ok' into ret_code, ret_desc;
+    select 200, 'Node "' || i_node_name || '" added to set "'
+           || i_set_name || '" with type "' || i_node_type || '"'
+        into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql security definer;
diff --git a/sql/pgq_set/functions/pgq_set.drop_member.sql b/sql/pgq_set/functions/pgq_set.drop_member.sql
new file mode 100644 (file)
index 0000000..35584c7
--- /dev/null
@@ -0,0 +1,59 @@
+
+create or replace function pgq_set.drop_member(
+    in i_set_name text,
+    in i_node_name text,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.drop_member(1)
+--
+--      Drop unreferenced member.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_name - node to drop
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - error description
+--
+-- Return Codes:
+--      200 - Ok
+--      404 - No such set
+-- ----------------------------------------------------------------------
+declare
+    _queue_name  text;
+    _wm_consumer text;
+    _global_wm   bigint;
+    sub          record;
+    node         record;
+begin
+    select * into node from pgq_set.set_info
+      where set_name = i_set_name;
+    if not found then
+        select 404, 'No such set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
+    if node.node_name = i_node_name then
+        select 403, 'Cannot use drop_member on node itself' into ret_code, ret_note;
+        return;
+    end if;
+    if node.provider_node = i_node_name then
+        select 403, 'Cannot use drop_member on node child' into ret_code, ret_note;
+        return;
+    end if;
+
+    perform 1 from pgq_set.subscriber_info
+      where set_name = i_set_name
+        and node_name = i_node_name;
+    if found then
+        select f.ret_code, f.ret_note into ret_code, ret_note
+          from pgq_set.unsubscribe_node(i_set_name, i_node_name) f;
+    end if;
+    perform * from pgq_set.remove_member(i_set_name, i_node_name);
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
index a738eb14f56426deb2c6d6a8abe93d1c91eab79a..701e8ff8866fb84781707f6eca944cbd0821cb9c 100644 (file)
@@ -2,6 +2,8 @@
 create or replace function pgq_set.get_node_info(
     in i_set_name text,
 
+    out ret_code int4,
+    out ret_note text,
     out node_type text,
     out node_name text,
     out queue_name text,
@@ -13,7 +15,7 @@ create or replace function pgq_set.get_node_info(
     out provider_location text,
     out paused boolean,
     out resync boolean,
-    out up_to_date boolean,
+    out uptodate boolean,
 
     out combined_set text,
     out combined_type text,
@@ -38,7 +40,7 @@ create or replace function pgq_set.get_node_info(
 --      provider_location - connect string to provider node
 --      paused - this node should not do any work
 --      resync - re-register on provider queue (???)
---      up_to_date - if consumer has loaded last changes
+--      uptodate - if consumer has loaded last changes
 --      combined_set - target set name for merge-leaf
 --      combined_type - node type of target set
 --      combined_queue - queue name for target set
@@ -46,19 +48,23 @@ create or replace function pgq_set.get_node_info(
 declare
     sql text;
 begin
-    select n.node_type, n.node_name, t.tick_id, n.queue_name,
+    select 100, 'Ok', n.node_type, n.node_name, t.tick_id, n.queue_name,
            c.set_name, c.node_type, c.queue_name, n.global_watermark,
-           n.provider_node, n.paused, n.resync, n.up_to_date,
+           n.provider_node, n.paused, n.resync, n.uptodate,
            p.node_location
-      into node_type, node_name, completed_tick, queue_name,
+      into ret_code, ret_note, node_type, node_name, completed_tick, queue_name,
            combined_set, combined_type, combined_queue, global_watermark,
-           provider_node, paused, resync, up_to_date,
+           provider_node, paused, resync, uptodate,
            provider_location
       from pgq_set.set_info n
            left join pgq_set.completed_tick t on (t.set_name = n.set_name and t.worker_name = n.node_name)
            left join pgq_set.set_info c on (c.set_name = n.combined_set)
            left join pgq_set.member_info p on (p.set_name = n.set_name and p.node_name = n.provider_node)
       where n.set_name = i_set_name;
+    if not found then
+        select 404, 'Unknown set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
 
     select min(u.tick_id) into local_watermark
       from (select tick_id
diff --git a/sql/pgq_set/functions/pgq_set.rename_node.sql b/sql/pgq_set/functions/pgq_set.rename_node.sql
new file mode 100644 (file)
index 0000000..b48a2d0
--- /dev/null
@@ -0,0 +1,190 @@
+
+create or replace function pgq_set.rename_node_step1(
+    in i_set_name text,
+    in i_node_name_old text,
+    in i_node_name_new text,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.rename_node_step1(3)
+--
+--      Rename a node - step1.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_name_old - node name
+--      i_node_name_new - node connect string
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - error description
+--
+-- Return Codes:
+--      200 - Ok
+--      404 - No such set
+-- ----------------------------------------------------------------------
+declare
+    n  record;
+    reg record;
+begin
+    select s.node_name, s.node_type, s.paused, s.uptodate, s.queue_name
+      into n from pgq_set.set_info s
+     where s.set_name = i_set_name for update;
+    if not found then
+        select 404, 'Unknown set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
+
+    -- make copy of member info
+    perform 1 from pgq_set.member_info
+      where set_name = i_set_name
+        and node_name = i_node_name_new;
+    if not found then
+        insert into pgq_set.member_info
+              (set_name, node_name, node_location, dead)
+        select set_name, i_node_name_new, node_location, dead
+          from pgq_set.member_info
+         where set_name = i_set_name
+           and node_name = i_node_name_old;
+    end if;
+
+    -- make copy of subscriber info
+    perform 1 from pgq_set.subscriber_info
+      where set_name = i_set_name
+        and node_name = i_node_name_new;
+    if not found then
+        insert into pgq_set.subscriber_info
+              (set_name, node_name, local_watermark)
+        select set_name, i_node_name_new, local_watermark
+          from pgq_set.subscriber_info
+         where set_name = i_set_name
+           and node_name = i_node_name_old;
+    end if;
+
+    if n.queue_name is not null then
+        select f.last_tick into reg
+          from pgq.get_consumer_info(n.queue_name, i_node_name_old) f;
+        if found then
+            perform 1 from pgq.get_consumer_info(n.queue_name, i_node_name_new);
+            if not found then
+                perform pgq.register_consumer_at(n.queue_name, i_node_name_new, reg.last_tick);
+            end if;
+        end if;
+    end if;
+
+    -- FIXME: on root insert event about new node
+
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function pgq_set.rename_node_step2(
+    in i_set_name text,
+    in i_node_name_old text,
+    in i_node_name_new text,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.rename_node_step2(3)
+--
+--      Rename a node - step2.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_name_old - node name
+--      i_node_name_new - node connect string
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - error description
+--
+-- Return Codes:
+--      200 - Ok
+--      404 - No such set
+-- ----------------------------------------------------------------------
+declare
+    n  record;
+    det record;
+    reg record;
+begin
+    select s.node_name, s.node_type, s.paused, s.uptodate, s.queue_name, s.provider_node
+      into n from pgq_set.set_info s
+     where s.set_name = i_set_name for update;
+    if not found then
+        select 404, 'Unknown set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
+
+    if n.node_name = i_node_name_old then
+        if not n.paused or not n.uptodate then
+            select 401, 'Bad node state during rename' into ret_code, ret_note;
+            return;
+        end if;
+        update pgq_set.set_info
+           set node_name = i_node_name_new,
+               uptodate = false
+         where set_name = i_set_name;
+    elsif n.provider_node = i_node_name_old then
+        update pgq_set.set_into
+           set provider_node = i_node_name_new,
+               uptodate = false
+         where set_name = i_set_name;
+    end if;
+
+    -- delete old copy of subscriber info
+    select into det
+      (select count(1) from pgq_set.subscriber_info
+        where set_name = i_set_name
+          and node_name = i_node_name_old) as got_old,
+      (select count(1) from pgq_set.subscriber_info
+        where set_name = i_set_name
+          and node_name = i_node_name_new) as got_new;
+    if det.got_old > 0 and det.got_new > 0 then
+        delete from pgq_set.subscriber_info
+         where set_name = i_set_name
+           and node_name = i_node_name_old;
+    elsif det.got_old > 0 then
+        select 401, 'got old subscriber but not new' into ret_code, ret_note;
+        return;
+    end if;
+    
+    -- delete old copy of subscriber info
+    select into det
+      (select count(1) from pgq_set.member_info
+        where set_name = i_set_name
+          and node_name = i_node_name_old) > 0 as got_old,
+      (select count(1) from pgq_set.member_info
+        where set_name = i_set_name
+          and node_name = i_node_name_new) > 0 as got_new;
+    if det.got_old and det.got_new then
+        delete from pgq_set.member_info
+         where set_name = i_set_name
+           and node_name = i_node_name_old;
+    elsif det.got_old then
+        select 401, 'got old member but not new' into ret_code, ret_note;
+        return;
+    end if;
+    
+    if n.queue_name is not null then
+        select f.last_tick into reg
+          from pgq.get_consumer_info(n.queue_name, i_node_name_old) f;
+        if found then
+            perform 1 from pgq.get_consumer_info(n.queue_name, i_node_name_new);
+            if not found then
+                perform pgq.register_consumer_at(n.queue_name, i_node_name_new, reg.last_tick);
+            end if;
+        end if;
+    end if;
+
+    -- FIXME: on parent remove old registration
+    -- FIXME: on root insert event about old node delete
+
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.set_node_paused.sql b/sql/pgq_set/functions/pgq_set.set_node_paused.sql
new file mode 100644 (file)
index 0000000..dd659b3
--- /dev/null
@@ -0,0 +1,46 @@
+
+create or replace function pgq_set.set_node_paused(
+    in i_set_name text,
+    in i_paused boolean,
+    out ret_code int4,
+    out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_node_paused(2)
+--
+--      Set node paused flag.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_paused   - new flag state
+-- ----------------------------------------------------------------------
+declare
+    cur_paused  boolean;
+    nname       text;
+    op          text;
+begin
+    op := case when i_paused then 'paused' else 'resumed' end;
+    select paused, node_name into cur_paused, nname
+      from pgq_set.set_info
+     where set_name = i_set_name
+       for update;
+    if not found then
+        select 400, 'Set does not exist: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
+
+    if cur_paused = i_paused then
+        select 200, 'Node already '|| op || ': ' || nname into ret_code, ret_note;
+        return;
+    end if;
+
+    update pgq_set.set_info
+       set paused = i_paused,
+           uptodate = false
+     where set_name = i_set_name;
+    select 200, 'Node ' || op || ': ' || nname into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
+
index ce99083ade6ef5c78c92a1a3e1268caa5dabac0e..d578c6a5bd94dc5d880ae8772532a00a6d276b80 100644 (file)
@@ -17,7 +17,7 @@ returns int4 as $$
 -- ----------------------------------------------------------------------
 begin
     update pgq_set.set_info
-       set up_to_date = i_uptodate
+       set uptodate = i_uptodate
      where set_name = i_set_name;
     if not found then
         raise exception 'no such set: %', i_set_name;
index cdf6da47d0129b92f6ca501cb33a194f6447b153..01a5af77df91176027a82156c235b3cf6be35cbf 100644 (file)
@@ -2,29 +2,37 @@
 create or replace function pgq_set.subscribe_node(
     in i_set_name text,
     in i_remote_node_name text,
+    in i_custom_tick_id int8,
     out ret_code int4,
     out ret_note text,
     out global_watermark bigint)
 returns record as $$
 -- ----------------------------------------------------------------------
--- Function: pgq_set.subscribe_node(2)
+-- Function: pgq_set.subscribe_node(3)
 --
---      Subscribe remote node to local node.
+--      Subscribe remote node to local node at custom position.
+--      Should be used when changing provider for existing node.
 --
 -- Parameters:
 --      i_set_name - set name
 --      i_remote_node_name - node name
+--      i_custom_tick_id - tick id
 --
 -- Returns:
 --      ret_code - error code
 --      ret_note - description
---      global_watermark - minimal watermark, also subscription pos
+--      global_watermark - minimal watermark
 -- ----------------------------------------------------------------------
 declare
     n record;
 begin
     select s.node_type, s.global_watermark, s.queue_name into n
-      from pgq_set.set_info s where s.set_name = i_set_name;
+      from pgq_set.set_info s where s.set_name = i_set_name
+       for update;
+    if not found then
+        select 404, 'Unknown set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
     global_watermark := n.global_watermark;
 
     if n.node_type in ('leaf', 'merge-leaf') then
@@ -33,7 +41,7 @@ begin
         return;
     end if;
 
-    perform pgq.register_consumer_at(n.queue_name, i_remote_node_name, n.global_watermark);
+    perform pgq.register_consumer_at(n.queue_name, i_remote_node_name, i_custom_tick_id);
 
     insert into pgq_set.subscriber_info (set_name, node_name, local_watermark)
     values (i_set_name, i_remote_node_name, n.global_watermark);
@@ -43,3 +51,40 @@ begin
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_set.subscribe_node(
+    in i_set_name text,
+    in i_remote_node_name text,
+    out ret_code int4,
+    out ret_note text,
+    out global_watermark bigint)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.subscribe_node(2)
+--
+--      Subscribe remote node to local node at global_watermark position,
+--      so it receivec all events.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_remote_node_name - node name
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - description
+--      global_watermark - minimal watermark, also subscription pos
+-- ----------------------------------------------------------------------
+declare
+    n record;
+begin
+    select s.global_watermark into n
+      from pgq_set.set_info s where s.set_name = i_set_name
+       for update;
+
+    select f.ret_code, f.ret_note, f.global_watermark
+      into ret_code, ret_note, global_watermark
+      from pgq_set.subscribe_node(i_set_name, i_remote_node_name, n.global_watermark) f;
+
+    return;
+end;
+$$ language plpgsql security definer;
+
index 941e315ce04f91e144cef89d65e38e57c0649581..d7aa3fb4e6219f396523b547802ffd8bb71a2564 100644 (file)
@@ -17,9 +17,16 @@ select * from pgq_set.get_member_info('aset');
 
 select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null);
 
+select * from pgq_set.subscribe_node('aset', 'node2');
+select * from pgq_set.subscribe_node('aset', 'node3', 1);
+select * from pgq_set.unsubscribe_node('aset', 'node3');
+
 select * from pgq_set.get_node_info('aset');
 
 select * from pgq_set.is_root('q');
 select * from pgq_set.is_root('aset');
 select * from pgq_set.is_root(null);
 
+select * from pgq_set.rename_node_step1('aset', 'node2', 'node2x');
+select * from pgq_set.rename_node_step2('aset', 'node2', 'node2x');
+
index fd9d27c7f6e94247e60423f176f7cddc4c55ef16..c19b7449c8e44ffe30d8dd6339fbe57f44d01ec4 100644 (file)
 \i functions/pgq_set.subscribe_node.sql
 \i functions/pgq_set.unsubscribe_node.sql
 \i functions/pgq_set.set_node_uptodate.sql
+\i functions/pgq_set.set_node_paused.sql
+\i functions/pgq_set.change_provider.sql
+\i functions/pgq_set.drop_member.sql
+\i functions/pgq_set.rename_node.sql
 
 -- Group: Node Info
 \i functions/pgq_set.get_node_info.sql
index 75f8a01986519090f5078cf180e7c59f9b0ce8b8..ed5d09e3a0f0c8f064cd3c1340cec3b61621656b 100644 (file)
@@ -1,3 +1,13 @@
+-- ----------------------------------------------------------------------
+-- Section: Londiste internals
+--
+--      Londiste storage: tables/seqs/fkeys/triggers/events.
+--
+-- pgq_set event types:
+--      member-info         - ev_data: node_name, extra1: set_name, extra2: location, extra3: dead
+--      global-watermark    - ev_data: tick_id,  extra1: set_name
+--      tick-id             - ev_data: tick_id,  extra1: set_name
+-- ----------------------------------------------------------------------
 
 create schema pgq_set;
 grant usage on schema pgq_set to public;
@@ -37,7 +47,7 @@ create table pgq_set.member_info (
 --      global_watermark    - set's global watermark, set by root node
 --      paused              - true if worker for this node should sleep
 --      resync              - true if worker for this node needs to re-register itself on provider queue
---      up_to_date          - true if worker for this node has seen table changes
+--      uptodate            - true if worker for this node has seen table changes
 --
 -- Node types:
 --      root            - data + batches is generated here
@@ -59,7 +69,7 @@ create table pgq_set.set_info (
 
     paused          boolean not null default false,
     resync          boolean not null default false,
-    up_to_date      boolean not null default false,
+    uptodate        boolean not null default false,
 
     foreign key (set_name, node_name) references pgq_set.member_info,
     foreign key (set_name, provider_node) references pgq_set.member_info,
index 2942b4c1494275ff28d875f5145748204d39c52b..9e66879cd8cd6b03f67caca8c9bfac73a79cb966 100755 (executable)
@@ -3,7 +3,7 @@
 . ../env.sh
 
 ./stop.sh
-rm -f sys/*.log sys/*.ini
+rm -f sys/*.log sys/*.ini sys/*.log.[0-9]
 
 set -e
 
@@ -13,6 +13,8 @@ set -e
 ./makenode.sh test_set node1 branch root
 londiste.py sys/worker_root.ini status
 
+londiste.py sys/worker_root.ini rename-node n_node1 node1renamed
+
 #exit 0
 
 ./makenode.sh test_set node2 branch root