more pgq_set/londiste cleanup
authorMarko Kreen <markokr@gmail.com>
Tue, 22 Apr 2008 12:46:13 +0000 (12:46 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 22 Apr 2008 12:46:13 +0000 (12:46 +0000)
doc/TODO.txt
python/londiste.py
python/londiste/setup.py
python/pgq/setadmin.py
python/pgq/setconsumer.py
tests/env.sh
tests/londiste/checkerr.sh
tests/londiste/env.sh [deleted file]
tests/londiste/gendb.sh
tests/londiste/makenode.sh
tests/londiste/stop.sh

index 1e0001abe9513de472a673ee5d95105b20d58836..601fb7bbcb652949404450f68b1654eb4404425c 100644 (file)
@@ -12,7 +12,6 @@
 
  * cascaded replication, switchover, failover [marko]
    - add --create
-   - before adding, check if table is 'ok' on provider
    - root worker:
      - insert seq pos in queue
      - seq add/remove events
      - failover
      - pause
      - resume
-     - status
+     - node-status
+     - set-status
    - standard msg/error handling for all sql functions
    - compare/repair
    - check if table is in other sets? [NAK]
+   - setconsumer/pgq - insert tick with original date (+evid?)
 
  * drop support for 8.1 ??
 
index b4678e04d11bc1d69ed3f5f68a835107e8ed9936..06ec17db6a81f6066f6793199521d967a726af8c 100755 (executable)
@@ -3,10 +3,9 @@
 """Londiste launcher.
 """
 
-import sys, os, optparse, skytools, pgq, pgq.setadmin
+import sys, os, os.path, optparse, skytools
 
 # python 2.3 will try londiste.py first...
-import sys, os.path
 if os.path.exists(os.path.join(sys.path[0], 'londiste.py')) \
     and not os.path.exists(os.path.join(sys.path[0], 'londiste')):
     del sys.path[0]
@@ -56,32 +55,12 @@ Internal Commands:
   copy                  copy table logic
 """
 
-class NodeSetup(pgq.setadmin.SetAdmin):
-    initial_db_name = 'node_db'
-    extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ]
-    def __init__(self, args):
-        pgq.setadmin.SetAdmin.__init__(self, 'londiste', args)
-    def extra_init(self, node_type, node_db, provider_db):
-        if not provider_db:
-            return
-        pcurs = provider_db.cursor()
-        ncurs = node_db.cursor()
-        q = "select table_name from londiste.set_get_table_list(%s)"
-        pcurs.execute(q, [self.set_name])
-        for row in pcurs.fetchall():
-            tbl = row['table_name']
-            q = "select * from londiste.set_add_table(%s, %s)"
-            ncurs.execute(q, [self.set_name, tbl])
-        node_db.commit()
-        provider_db.commit()
-
-
 cmd_handlers = (
     (('init-root', 'init-branch', 'init-leaf', 'members', 'tag-dead', 'tag-alive',
-      'redirect', 'promote-root'), NodeSetup),
-    (('worker', 'replay'), londiste.Replicator),
+      'redirect', 'promote-root', 'status'), londiste.LondisteSetup),
     (('add', 'remove', 'add-seq', 'remove-seq', 'tables', 'seqs',
       'missing', 'resync', 'check', 'fkeys'), londiste.LondisteSetup),
+    (('worker', 'replay'), londiste.Replicator),
     (('compare',), londiste.Comparator),
     (('repair',), londiste.Repairer),
     (('copy',), londiste.CopyTable),
@@ -122,8 +101,8 @@ class Londiste(skytools.DBScript):
                 help = "add: keep old data", default=False)
         g.add_option("--provider",
                 help = "init: upstream node temp connect string")
-        g.add_option("--create", action = 'callback', callback = self.opt_create_cb, type='string',
-                help = "add: create table/seq if not exist")
+        g.add_option("--create",
+                help = "add: create table/seq if not exist (seq,pkey,full,indexes,fkeys)")
         p.add_option_group(g)
 
         return p
index 15991311c9ebe5846ceb8c75c66dda1107f4fc20..7ab6093d6dd2008dd45db7e6da32767eb5638bb2 100644 (file)
@@ -5,86 +5,19 @@
 
 import sys, os, skytools
 
+import pgq.setadmin
+
 __all__ = ['LondisteSetup']
 
-class LondisteSetup(skytools.DBScript):
+class LondisteSetup(pgq.setadmin.SetAdmin):
+    initial_db_name = 'node_db'
+    extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ]
     def __init__(self, args):
-        skytools.DBScript.__init__(self, 'londiste', args)
-        self.set_single_loop(1)
-        self.pidfile = self.pidfile + ".setup"
-
+        pgq.setadmin.SetAdmin.__init__(self, 'londiste', args)
         self.set_name = self.cf.get("set_name")
-        self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name)
-
-        if len(self.args) < 2:
-            self.log.error("need command")
-            sys.exit(1)
-
-    def run(self):
-        cmd = self.args[1]
-        fname = "cmd_" + cmd.replace('-', '_')
-        if hasattr(self, fname):
-            getattr(self, fname)(self.args[2:])
-        else:
-            self.log.error('bad subcommand')
-            sys.exit(1)
-
-    def fetch_list(self, curs, sql, args, keycol = None):
-        curs.execute(sql, args)
-        rows = curs.dictfetchall()
-        if not keycol:
-            res = rows
-        else:
-            res = [r[keycol] for r in rows]
-        return res
-
-    def db_fetch_list(self, sql, args, keycol = None):
-        db = self.get_database('node_db')
-        curs = db.cursor()
-        res = self.fetch_list(curs, sql, keycol)
-        db.commit()
-        return res
-
-    def display_table(self, desc, curs, sql, args = [], fields = []):
-        """Display multirow query as a table."""
-
-        curs.execute(sql, args)
-        rows = curs.fetchall()
-        if len(rows) == 0:
-            return 0
-
-        if not fields:
-            fields = [f[0] for f in curs.description]
-        
-        widths = [15] * len(fields)
-        for row in rows:
-            for i, k in enumerate(fields):
-                rlen = row[k] and len(row) or 0
-                widths[i] = widths[i] > rlen and widths[i] or rlen
-        widths = [w + 2 for w in widths]
-
-        fmt = '%%-%ds' * (len(widths) - 1) + '%%s'
-        fmt = fmt % tuple(widths[:-1])
-        if desc:
-            print desc
-        print fmt % tuple(fields)
-        print fmt % tuple(['-'*15] * len(fields))
-            
-        for row in rows:
-            print fmt % tuple([row[k] for k in fields])
-        print '\n'
-        return 1
-
-    def db_display_table(self, desc, sql, args = [], fields = []):
-        db = self.get_database('node_db')
-        curs = db.cursor()
-        res = self.display_table(desc, curs, sql, args, fields)
-        db.commit()
-        return res
-        
 
     def init_optparse(self, parser=None):
-        p = skytools.DBScript.init_optparse(self, parser)
+        p = pgq.setadmin.SetAdmin.init_optparse(self, parser)
         p.add_option("--expect-sync", action="store_true", dest="expect_sync",
                     help = "no copy needed", default=False)
         p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
@@ -93,71 +26,61 @@ class LondisteSetup(skytools.DBScript):
                     help="force", default=False)
         p.add_option("--all", action="store_true",
                     help="include all tables", default=False)
-        p.add_option("--provider",
-                help="init: upstream node temp connect string", default=None)
         return p
 
-    def exec_checked(self, curs, sql, args):
-        curs.execute(sql, args)
-        ok = True
-        for row in curs.fetchall():
-            if (row[0] / 100) == 2:
-                self.log.info("%d %s" % (row[0], row[1]))
-            else:
-                self.log.error("%d %s" % (row[0], row[1]))
-                ok = False
-        return ok
-
-    def exec_many(self, curs, sql, baseargs, extra_list):
-        res = True
-        for a in extra_list:
-            ok = self.exec_checked(curs, sql, baseargs + [a])
-            if not ok:
-                res = False
-        return res
-
-    def db_exec_many(self, sql, baseargs, extra_list):
-        db = self.get_database('node_db')
-        curs = db.cursor()
-        ok = self.exec_many(curs, sql, baseargs, extra_list)
-        if ok:
-            self.log.info("COMMIT")
-            db.commit()
-        else:
-            self.log.info("ROLLBACK")
-            db.rollback()
+    def extra_init(self, node_type, node_db, provider_db):
+        if not provider_db:
+            return
+        pcurs = provider_db.cursor()
+        ncurs = node_db.cursor()
+        q = "select table_name from londiste.set_get_table_list(%s)"
+        pcurs.execute(q, [self.set_name])
+        for row in pcurs.fetchall():
+            tbl = row['table_name']
+            q = "select * from londiste.set_add_table(%s, %s)"
+            ncurs.execute(q, [self.set_name, tbl])
+        node_db.commit()
+        provider_db.commit()
 
     def cmd_add(self, args = []):
         q = "select * from londiste.node_add_table(%s, %s)"
-        self.db_exec_many(q, [self.set_name], args)
+        db = self.get_database('node_db')
+        self.db_cmd_many(db, q, [self.set_name], args)
 
     def cmd_remove(self, args = []):
         q = "select * from londiste.node_remove_table(%s, %s)"
-        self.db_exec_many(q, [self.set_name], args)
+        db = self.get_database('node_db')
+        self.db_cmd_many(db, q, [self.set_name], args)
 
     def cmd_add_seq(self, args = []):
         q = "select * from londiste.node_add_seq(%s, %s)"
-        self.db_exec_many(q, [self.set_name], args)
+        db = self.get_database('node_db')
+        self.db_cmd_many(db, q, [self.set_name], args)
 
     def cmd_remove_seq(self, args = []):
         q = "select * from londiste.node_remove_seq(%s, %s)"
-        self.db_exec_many(q, [self.set_name], args)
+        db = self.get_database('node_db')
+        self.db_cmd_many(db, q, [self.set_name], args)
 
     def cmd_resync(self, args = []):
         q = "select * from londiste.node_resync_table(%s, %s)"
-        self.db_exec_many(q, [self.set_name], args)
+        db = self.get_database('node_db')
+        self.db_cmd_many(db, q, [self.set_name], args)
 
     def cmd_tables(self, args = []):
         q = "select table_name, merge_state from londiste.node_get_table_list(%s)"
-        self.db_display_table("Tables on node", q, [self.set_name])
+        db = self.get_database('node_db')
+        self.db_display_table(db, "Tables on node", q, [self.set_name])
 
     def cmd_seqs(self, args = []):
         q = "select seq_namefrom londiste.node_get_seq_list(%s)"
-        self.db_display_table("Sequences on node", q, [self.set_name])
+        db = self.get_database('node_db')
+        self.db_display_table(db, "Sequences on node", q, [self.set_name])
 
     def cmd_missing(self, args = []):
         q = "select * from londiste.node_show_missing(%s)"
-        self.db_display_table("MIssing objects on node", q, [self.set_name])
+        db = self.get_database('node_db')
+        self.db_display_table(db, "Missing objects on node", q, [self.set_name])
 
     def cmd_check(self, args = []):
         pass
index 47f70de8ed6893d9440bca7d7a8b532da99e51ed..77c07a3eb64c14efbda8ec5dca713cf7337ef8bc 100644 (file)
@@ -2,23 +2,9 @@
 
 import sys, optparse, skytools
 
-from pgq.setconsumer import MemberInfo, NodeInfo
+from pgq.setinfo import *
 
-class SetInfo:
-    def __init__(self, set_name, info_row, member_rows):
-        self.root_info = info_row
-        self.set_name = set_name
-        self.member_map = {}
-        self.root_name = info_row['node_name']
-        self.root_type = info_row['node_type']
-        self.global_watermark = info_row['global_watermark']
-
-        for r in member_rows:
-            n = MemberInfo(r)
-            self.member_map[n.name] = n
-
-    def get_member(self, name):
-        return self.member_map.get(name)
+__all__ = ['SetAdmin']
 
 command_usage = """
 %prog [options] INI CMD [subcmd args]
@@ -26,7 +12,7 @@ command_usage = """
 commands:
 """
 
-class SetAdmin(skytools.DBScript):
+class SetAdmin(skytools.AdminScript):
     root_name = None
     root_info = None
     member_map = {}
@@ -35,7 +21,7 @@ class SetAdmin(skytools.DBScript):
     initial_db_name = 'node_db'
 
     def init_optparse(self, parser = None):
-        p = skytools.DBScript.init_optparse(self, parser)
+        p = skytools.AdminScript.init_optparse(self, parser)
         p.set_usage(command_usage.strip())
 
         g = optparse.OptionGroup(p, "actual setadm options")
@@ -46,29 +32,24 @@ class SetAdmin(skytools.DBScript):
         p.add_option_group(g)
         return p
 
-    def work(self):
-        self.set_single_loop(1)
-
+    def reload(self):
+        skytools.AdminScript.reload(self)
         self.set_name = self.cf.get('set_name')
 
-        if self.is_cmd("init-root", 2):
-            self.init_node("root", self.args[2], self.args[3])
-        elif self.is_cmd("init-branch", 2):
-            self.init_node("branch", self.args[2], self.args[3])
-        elif self.is_cmd("init-leaf", 2):
-            self.init_node("leaf", self.args[2], self.args[3])
-        else:
-            self.log.info("need command")
-
-    def is_cmd(self, name, argcnt):
-        if len(self.args) < 2:
-            return False
-        if self.args[1] != name:
-            return False
-        if len(self.args) != argcnt + 2:
-            self.log.error("cmd %s needs %d args" % (name, argcnt))
-            sys.exit(1)
-        return True
+    def cmd_init_root(self, args):
+        if len(args) != 2:
+            raise Exception('init-root needs 2 args')
+        self.init_node('root', args[0], args[1])
+
+    def cmd_init_branch(self, args):
+        if len(args) != 2:
+            raise Exception('init-branch needs 2 args')
+        self.init_node('branch', args[0], args[1])
+
+    def cmd_init_leaf(self, args):
+        if len(args) != 2:
+            raise Exception('init-leaf needs 2 args')
+        self.init_node('leaf', args[0], args[1])
 
     def init_node(self, node_type, node_name, node_location):
         provider_loc = self.options.provider
@@ -157,7 +138,7 @@ class SetAdmin(skytools.DBScript):
     def extra_init(self, node_type, node_db, provider_db):
         pass
 
-    def find_root_db(self, initial_loc):
+    def find_root_db(self, initial_loc = None):
         if initial_loc:
             loc = initial_loc
         else:
@@ -205,20 +186,6 @@ class SetAdmin(skytools.DBScript):
 
         return SetInfo(self.set_name, info, node_list)
 
-    def exec_sql(self, db, q, args):
-        self.log.debug(q)
-        curs = db.cursor()
-        curs.execute(q, args)
-        db.commit()
-
-    def exec_query(self, db, q, args):
-        self.log.debug(q)
-        curs = db.cursor()
-        curs.execute(q, args)
-        res = curs.dictfetchall()
-        db.commit()
-        return res
-
     def install_code(self, db):
         objs = [
             skytools.DBLanguage("plpgsql"),
@@ -231,6 +198,62 @@ class SetAdmin(skytools.DBScript):
         skytools.db_install(db.cursor(), objs, self.log)
         db.commit()
 
+    def cmd_status(self, args):
+        root_db = self.find_root_db()
+        sinf = self.load_root_info(root_db)
+
+        for mname, minf in sinf.member_map.iteritems():
+            db = self.get_database('look_db', connstr = minf.location, autocommit = 1)
+            curs = db.cursor()
+            curs.execute("select * from pgq_set.get_node_info(%s)", [self.set_name])
+            node = NodeInfo(curs.fetchone())
+            sinf.add_node(node)
+            self.close_database('look_db')
+
+        sinf.print_tree()
+
+    def cmd_switch(self):
+        [['node', 'PAUSE']]
+        [['new_parent', 'select * from pgq_set.subscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
+        [['node', 'select * from pgq_set.change_provider(%(set_name)s, %(new_provider)s)']]
+        [['old_parent', 'select * from pgq_set.unsubscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
+        [['node', 'RESUME']]
+
+    def cmd_promote(self):
+        [['old-root', 'PAUSE']]
+        [['old-root', 'demote, set-provider?']]
+        [['new-root', 'wait-for-catch-up']]
+        [['new-root', 'pause']]
+        [['new-root', 'promote']]
+        [['new-root', 'resume']]
+        [['old-root', 'resume']]
+        [['new_parent', 'select * from pgq_set.subscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
+        [['node', 'select * from pgq_set.change_provider(%(set_name)s, %(new_provider)s)']]
+        [['old_parent', 'select * from pgq_set.unsubscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
+        [['node', 'RESUME']]
+
+    def subscribe_node(self, target_node, subscriber_node, tick_pos):
+        q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
+        self.node_exec(target_node, q, [self.set_name, target_node, tick_pos])
+
+    def unsubscribe_node(self, target_node, subscriber_node, tick_pos):
+        q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
+        self.node_exec(target_node, q, [self.set_name, target_node, tick_pos])
+
+    def node_cmd(self, node_name, sql, args, commit = True):
+        m = self.lookup_member(node_name)
+        db = self.get_database('node_'+node_name)
+        self.db_cmd(db, sql, args, commit = commit)
+
+    def connect_node(self, node_name):
+        sinf = self.get_set_info()
+        m = sinf.get_member(node_name)
+        loc = m.node_location
+        db = self.get_database("node." + node_name, connstr = loc)
+
+    def disconnect_node(self, node_name):
+        self.close_database("node." + node_name)
+
 if __name__ == '__main__':
     script = SetAdmin('set_admin', sys.argv[1:])
     script.start()
index 76b8dbe69361dbb47873e78052899920f223c570..268cd7a898f1a578ec63ecdf1e471bb82c1e6dee 100644 (file)
@@ -3,90 +3,10 @@
 import sys, time, skytools
 
 from pgq.rawconsumer import RawQueue
+from pgq.setinfo import *
 
 __all__ = ['SetConsumer']
 
-ROOT = 'root'
-BRANCH = 'branch'
-LEAF = 'leaf'
-COMBINED_ROOT = 'combined-root'
-COMBINED_BRANCH = 'combined-branch'
-MERGE_LEAF = 'merge-leaf'
-
-class MemberInfo:
-    def __init__(self, row):
-        self.name = row['node_name']
-        self.location = row['node_location']
-        self.dead = row['dead']
-
-class NodeInfo:
-    def __init__(self, row, member_list, main_worker = True):
-        self.member_map = {}
-        self.main_worker = main_worker
-        for r in member_list:
-            m = MemberInfo(r)
-            self.member_map[m.name] = m
-
-        self.name = row['node_name']
-        self.type = row['node_type']
-        self.queue_name = row['queue_name']
-        self.global_watermark = row['global_watermark']
-        self.local_watermark = row['local_watermark']
-        self.completed_tick = row['completed_tick']
-        self.provider_node = row['provider_node']
-        self.provider_location = row['provider_location']
-        self.paused = row['paused']
-        self.resync = row['resync']
-        self.up_to_date = row['up_to_date']
-        self.combined_set = row['combined_set']
-        self.combined_type = row['combined_type']
-        self.combined_queue = row['combined_queue']
-        self.worker_name = row['worker_name']
-
-    def need_action(self, action_name):
-        if not self.main_worker:
-            return action_name in ('process-batch', 'process-events')
-
-        typ = self.type
-        if type == 'merge-leaf':
-            if self.target_type == 'combined-branch':
-                typ += "merge-leaf-to-branch"
-            elif self.target_type == 'combined-root':
-                typ += "merge-leaf-to-root"
-            else:
-                raise Exception('bad target type')
-
-        try:
-            return action_map[action_name][typ]
-        except KeyError, d:
-            raise Exception('need_action(name=%s, type=%s) unknown' % (action_name, typ))
-
-    def get_target_queue(self):
-        qname = None
-        if self.type == 'merge-leaf':
-            qname = self.combined_queue
-        else:
-            qname = self.queue_name
-        if qname is None:
-            raise Exception("no target queue")
-        return qname
-
-action_map = {
-'process-batch':   {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-'process-events':  {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'copy-events':     {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'tick-event':      {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'wait-behind':     {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1},
-'sync-part-pos':   {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-}
-
-node_properties = {
-'pgq':     {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':1},
-'queue':   {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':0},
-}
-
 class SetConsumer(skytools.DBScript):
     last_local_wm_publish_time = 0
     last_global_wm_publish_time = 0
@@ -268,7 +188,7 @@ class SetConsumer(skytools.DBScript):
         mbr_list = curs.dictfetchall()
         db.commit()
 
-        return NodeInfo(node_row, mbr_list, self.main_worker)
+        return NodeInfo(node_row, self.main_worker)
 
     def tag_node_uptodate(self, dst_db):
         dst_curs = dst_db.cursor()
index 05f61e8de9f67ddfed434dca1d1dbf58ef4d7898..11d82a3c5d86b8c3c7530fa7fcc34e5c9d4535fe 100644 (file)
@@ -1,6 +1,6 @@
 
 PYTHONPATH=../../python:$PYTHONPATH
-PATH=../../python:../../scripts:$PATH
+PATH=../../python:../../python/bin:../../scripts:$PATH
 export PYTHONPATH PATH
 
 
index ce26c71a47a73a99157d353a5b6b7c42cad620d1..eed0dfa0343cee4e6e8beb67c2e99146e8c3d77b 100755 (executable)
@@ -1,4 +1,4 @@
 #! /bin/sh
 
-grep -E 'WARN|ERR|CRIT' sys/log.*
+grep -E 'WARN|ERR|CRIT' sys/*log*
 
diff --git a/tests/londiste/env.sh b/tests/londiste/env.sh
deleted file mode 100644 (file)
index 45c82d8..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-
-PYTHONPATH=../../python:$PYTHONPATH
-PATH=../../python:../../scripts:$PATH
-export PYTHONPATH PATH
-
-#. /opt/apps/pgsql-dev/env
-
index 38678bbf6258f95ee7f1c2ea41d15270006527b6..1f1673eab3f4cb0c33c05aeb857a53aebfd655d8 100755 (executable)
@@ -3,16 +3,40 @@
 . ../env.sh
 
 ./stop.sh
-rm -f sys/log.*
+rm -f sys/log.* sys/*.log
 
 set -e
 
 
 ./makenode.sh test_set root root 
 
-last=root
-for n in `seq 1 10`; do
-  ./makenode.sh test_set node$n branch $last
-  last=node$n
-done
+./makenode.sh test_set node1 branch root
+londiste.py sys/worker_root.ini status
+
+#exit 0
+
+./makenode.sh test_set node2 branch root
+./makenode.sh test_set node3 branch root
+
+./makenode.sh test_set node4 branch node1
+./makenode.sh test_set node5 branch node1
+./makenode.sh test_set node6 branch node1
+
+./makenode.sh test_set node7 branch node5
+
+./makenode.sh test_set node8 branch node2
+
+./makenode.sh test_set node9 branch node3
+./makenode.sh test_set node10 branch node3
+./makenode.sh test_set node11 branch node3
+#./makenode.sh test_set node12 branch node3
+#./makenode.sh test_set node13 branch node3
+
+londiste.py sys/worker_root.ini status
+
+#last=root
+#for n in `seq 1 10`; do
+#  ./makenode.sh test_set node$n branch $last
+#  last=node$n
+#done
 
index caf11dab8eb30f5f268bc2e5ccd0e6936a53b4eb..e6bf148e368aaac6a0cf777a433959ca360a4533 100755 (executable)
@@ -11,6 +11,7 @@ run () {
   "$@"
 }
 
+verbose=-v
 
 # usage: makenode <set_name> <base_name> <type> <provider_base_name>
 set_name="$1"
@@ -26,7 +27,10 @@ londiste_conf="sys/worker_$base_name.ini"
 
 for pf in sys/pid.ticker_$base_name \
   sys/pid.worker_$base_name \
-  sys/pid.worker_$base_name.*
+  sys/pid.worker_$base_name.* \
+  sys/ticker_$base_name.pid \
+  sys/worker_$base_name.pid \
+  sys/worker_$base_name.*.pid
 do
   test -f $pf || continue
   msg "Killing $pf"
@@ -41,8 +45,8 @@ job_name = ticker_$base_name
 db = $connstr
 maint_delay_min = 1
 loop_delay = 0.5
-logfile = sys/log.%(job_name)s
-pidfile = sys/pid.%(job_name)s
+logfile = sys/%(job_name)s.log
+pidfile = sys/%(job_name)s.pid
 use_skylog = 0
 connection_lifetime = 10
 queue_refresh_period = 10
@@ -54,8 +58,8 @@ cat > "$londiste_conf" <<EOF
 job_name = worker_$base_name
 set_name = $set_name
 node_db = $connstr
-pidfile = sys/pid.%(job_name)s
-logfile = sys/log.%(job_name)s
+pidfile = sys/%(job_name)s.pid
+logfile = sys/%(job_name)s.log
 loop_delay = 1
 connection_lifetime = 10
 parallel_copies = 4
@@ -67,16 +71,16 @@ dropdb $db 2>&1 | grep -v 'not exist' || true
 createdb $db
 
 msg "Installing pgq"
-pgqadm.py $ticker_conf install
+pgqadm.py $ticker_conf install $verbose
 msg "Launching ticker"
-pgqadm.py $ticker_conf ticker -d
+pgqadm.py $ticker_conf ticker -d $verbose
 
 msg "Initializing node"
 run londiste.py $londiste_conf "init-$node_type" "$node_name" "$connstr" -v \
   --provider="dbname=db_$provider_base_name host=127.0.0.1"
 
 msg "Launching Londiste"
-londiste.py $londiste_conf worker -d -v
+londiste.py $londiste_conf worker -d $verbose
 
 for n in `seq 1 16`; do
   tbl="manytable$n"
@@ -92,7 +96,7 @@ select '$tbl-$base_name'
 EOF
 
   msg "Adding $tbl to n_$base_name"
-  londiste.py $londiste_conf add $tbl
+  londiste.py $londiste_conf add $tbl $verbose
 
 done
 
index 2bf9220a7cd0ec50571389577a9c109bc6c0f0ff..cc0586a1f0b11b3506d82b354be3c3a5e8e66d0b 100755 (executable)
@@ -1,7 +1,7 @@
 #! /bin/sh
 
 got=0
-for pf in sys/pid.*; do
+for pf in sys/*pid*; do
   test -f "$pf" || continue
   echo " * Killing $pf"
   kill `cat $pf`