more setconsumer/londiste work.
authorMarko Kreen <markokr@gmail.com>
Tue, 15 Apr 2008 13:02:47 +0000 (13:02 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 15 Apr 2008 13:02:47 +0000 (13:02 +0000)
simple init/event processing/copy seems to work.

27 files changed:
python/londiste.py
python/londiste/playback.py
python/londiste/setup.py
python/londiste/table_copy.py
python/pgq/rawconsumer.py
python/pgq/setadmin.py
python/pgq/setconsumer.py
python/skytools/scripting.py
python/skytools/sqltools.py
sql/londiste/functions/londiste.node_prepare_triggers.sql
sql/londiste/functions/londiste.set_get_table_list.sql [new file with mode: 0644]
sql/londiste/structure/functions.sql
sql/pgq_set/Makefile
sql/pgq_set/functions/pgq_set.set_global_watermark.sql
sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql
sql/pgq_set/functions/pgq_set.track_tick.sql [new file with mode: 0644]
sql/pgq_set/structure/functions.sql
sql/pgq_set/structure/pgq_set.sql
tests/londiste/Makefile [new file with mode: 0644]
tests/londiste/conf/ticker_branch.ini [new file with mode: 0644]
tests/londiste/conf/ticker_leaf.ini [new file with mode: 0644]
tests/londiste/conf/w_branch.ini [new file with mode: 0644]
tests/londiste/conf/w_leaf.ini [new file with mode: 0644]
tests/londiste/conf/w_root.ini [new file with mode: 0644]
tests/londiste/conf/w_root.ini.rej [new file with mode: 0644]
tests/londiste/gendb.sh
tests/londiste/stop.sh

index 82e49281e8d88cabd2d42813806055904a3d0f02..e2ac37df4cc2c1b6a05ee1a7093a1e8b1406e7e0 100755 (executable)
@@ -61,6 +61,20 @@ class NodeSetup(pgq.setadmin.SetAdmin):
     extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ]
     def __init__(self, args):
         pgq.setadmin.SetAdmin.__init__(self, 'londiste', args)
+    def extra_init(self, node_type, node_db, provider_db):
+        if not provider_db:
+            return
+        pcurs = provider_db.cursor()
+        ncurs = node_db.cursor()
+        q = "select table_name from londiste.set_get_table_list(%s)"
+        pcurs.execute(q, [self.set_name])
+        for row in pcurs.fetchall():
+            tbl = row['table_name']
+            q = "select * from londiste.set_add_table(%s, %s)"
+            ncurs.execute(q, [self.set_name, tbl])
+        node_db.commit()
+        provider_db.commit()
+
 
 cmd_handlers = (
     (('init-root', 'init-branch', 'init-leaf', 'members', 'tag-dead', 'tag-alive',
@@ -70,6 +84,7 @@ cmd_handlers = (
       'missing', 'resync', 'check', 'fkeys'), londiste.LondisteSetup),
     (('compare',), londiste.Comparator),
     (('repair',), londiste.Repairer),
+    (('copy',), londiste.CopyTable),
 )
 
 class Londiste(skytools.DBScript):
@@ -105,6 +120,8 @@ class Londiste(skytools.DBScript):
                 help = "add: no copy needed", default=False)
         g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
                 help = "add: keep old data", default=False)
+        g.add_option("--provider",
+                help = "init: upstream node temp connect string", default=None)
         p.add_option_group(g)
 
         return p
index 34513c4a7fd9290ffba2e88731f8c30b27213220..67b409515c5343d499623f4e3149bf95bbd654e8 100644 (file)
@@ -263,18 +263,18 @@ class Replicator(pgq.SetConsumer):
         self.copy_thread = 0
         self.seq_cache = SeqCache()
 
-    def process_set_batch(self, src_db, dst_db, ev_list, copy_queue):
+    def process_set_batch(self, src_db, dst_db, ev_list):
         "All work for a batch.  Entry point from SetConsumer."
 
         # this part can play freely with transactions
 
         dst_curs = dst_db.cursor()
         
-        self.cur_tick = self.cur_batch_info['tick_id']
-        self.prev_tick = self.cur_batch_info['prev_tick_id']
+        self.cur_tick = self.src_queue.cur_tick
+        self.prev_tick = self.src_queue.prev_tick
 
         self.load_table_state(dst_curs)
-        self.sync_tables(dst_db)
+        self.sync_tables(src_db, dst_db)
 
         self.copy_snapshot_cleanup(dst_db)
 
@@ -292,13 +292,14 @@ class Replicator(pgq.SetConsumer):
         self.handle_seqs(dst_curs)
 
         self.sql_list = []
-        SetConsumer.process_set_batch(self, src_db, dst_db, ev_list, copy_queue)
+        pgq.SetConsumer.process_set_batch(self, src_db, dst_db, ev_list)
         self.flush_sql(dst_curs)
 
         # finalize table changes
         self.save_table_state(dst_curs)
 
     def handle_seqs(self, dst_curs):
+        return # FIXME
         if self.copy_thread:
             return
 
@@ -313,7 +314,7 @@ class Replicator(pgq.SetConsumer):
         src_curs = self.get_database('provider_db').cursor()
         self.seq_cache.resync(src_curs, dst_curs)
 
-    def sync_tables(self, dst_db):
+    def sync_tables(self, src_db, dst_db):
         """Table sync loop.
         
         Calls appropriate handles, which is expected to
@@ -323,13 +324,14 @@ class Replicator(pgq.SetConsumer):
         while 1:
             cnt = Counter(self.table_list)
             if self.copy_thread:
-                res = self.sync_from_copy_thread(cnt, dst_db)
+                res = self.sync_from_copy_thread(cnt, src_db, dst_db)
             else:
-                res = self.sync_from_main_thread(cnt, dst_db)
+                res = self.sync_from_main_thread(cnt, src_db, dst_db)
 
             if res == SYNC_EXIT:
                 self.log.debug('Sync tables: exit')
-                self.detach()
+                self.unregister_consumer(src_db.cursor())
+                src_db.commit()
                 sys.exit(0)
             elif res == SYNC_OK:
                 return
@@ -342,7 +344,7 @@ class Replicator(pgq.SetConsumer):
             self.load_table_state(dst_db.cursor())
             dst_db.commit()
     
-    def sync_from_main_thread(self, cnt, dst_db):
+    def sync_from_main_thread(self, cnt, src_db, dst_db):
         "Main thread sync logic."
 
         #
@@ -386,7 +388,7 @@ class Replicator(pgq.SetConsumer):
             # seems everything is in sync
             return SYNC_OK
 
-    def sync_from_copy_thread(self, cnt, dst_db):
+    def sync_from_copy_thread(self, cnt, src_db, dst_db):
         "Copy thread sync logic."
 
         #
@@ -419,7 +421,7 @@ class Replicator(pgq.SetConsumer):
         elif cnt.copy:
             # table is not copied yet, do it
             t = self.get_table_by_state(TABLE_IN_COPY)
-            self.do_copy(t)
+            self.do_copy(t, src_db, dst_db)
 
             # forget previous value
             self.work_state = 1
@@ -431,21 +433,27 @@ class Replicator(pgq.SetConsumer):
 
     def process_set_event(self, dst_curs, ev):
         """handle one event"""
-        if not self.interesting(ev):
-            self.stat_inc('ignored_events')
-            ev.tag_done()
-        elif ev.type in ('I', 'U', 'D'):
+        self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s" % (ev.id, ev.type, ev.data, ev.extra1))
+        if ev.type in ('I', 'U', 'D'):
             self.handle_data_event(ev, dst_curs)
+        elif ev.type == 'add-table':
+            self.add_set_table(dst_curs, ev.data)
+        elif ev.type == 'remove-table':
+            self.remove_set_table(dst_curs, ev.data)
         else:
-            self.handle_system_event(ev, dst_curs)
+            pgq.SetConsumer.process_set_event(self, dst_curs, ev)
 
     def handle_data_event(self, ev, dst_curs):
-        # buffer SQL statements, then send them together
-        fmt = self.sql_command[ev.type]
-        sql = fmt % (ev.extra1, ev.data)
-        self.sql_list.append(sql)
-        if len(self.sql_list) > 200:
-            self.flush_sql(dst_curs)
+        t = self.get_table_by_name(ev.extra1)
+        if t and t.interesting(ev, self.cur_tick, self.copy_thread):
+            # buffer SQL statements, then send them together
+            fmt = self.sql_command[ev.type]
+            sql = fmt % (ev.extra1, ev.data)
+            self.sql_list.append(sql)
+            if len(self.sql_list) > 200:
+                self.flush_sql(dst_curs)
+        else:
+            self.stat_inc('ignored_events')
         ev.tag_done()
 
     def flush_sql(self, dst_curs):
@@ -461,44 +469,24 @@ class Replicator(pgq.SetConsumer):
 
     def interesting(self, ev):
         if ev.type not in ('I', 'U', 'D'):
-            return 1
+            raise Exception('bug - bad event type in .interesting')
         t = self.get_table_by_name(ev.extra1)
         if t:
             return t.interesting(ev, self.cur_tick, self.copy_thread)
         else:
             return 0
 
-    def handle_system_event(self, ev, dst_curs):
-        "System event."
-
-        if ev.type == "T":
-            self.log.info("got new table event: "+ev.data)
-            # check tables to be dropped
-            name_list = []
-            for name in ev.data.split(','):
-                name_list.append(name.strip())
-
-            del_list = []
-            for tbl in self.table_list:
-                if tbl.name in name_list:
-                    continue
-                del_list.append(tbl)
+    def add_set_table(self, dst_curs, tbl):
+        q = "select londiste.set_add_table(%s, %s)"
+        dst_curs.execute(q, [self.set_name, tbl])
 
-            # separate loop to avoid changing while iterating
-            for tbl in del_list:
-                self.log.info("Removing table %s from set" % tbl.name)
-                self.remove_table(tbl, dst_curs)
-
-            ev.tag_done()
-        else:
-            self.log.warning("Unknows op %s" % ev.type)
-            ev.tag_failed("Unknown operation")
-
-    def remove_table(self, tbl, dst_curs):
-        del self.table_map[tbl.name]
-        self.table_list.remove(tbl)
-        q = "select londiste.subscriber_remove_table(%s, %s)"
-        dst_curs.execute(q, [self.pgq_queue_name, tbl.name])
+    def remove_set_table(self, dst_curs, tbl):
+        if tbl in self.table_map:
+            t = self.table_map[tbl]
+            del self.table_map[tbl]
+            self.table_list.remove(t)
+        q = "select londiste.set_remove_table(%s, %s)"
+        dst_curs.execute(q, [self.set_name, tbl])
 
     def load_table_state(self, curs):
         """Load table state from database.
@@ -507,9 +495,9 @@ class Replicator(pgq.SetConsumer):
         to load state on every batch.
         """
 
-        q = "select table_name, snapshot, merge_state, skip_truncate"\
-            "  from londiste.subscriber_get_table_list(%s)"
-        curs.execute(q, [self.pgq_queue_name])
+        q = "select table_name, custom_snapshot, merge_state, skip_truncate"\
+            "  from londiste.node_get_table_list(%s)"
+        curs.execute(q, [self.set_name])
 
         new_list = []
         new_map = {}
@@ -517,7 +505,7 @@ class Replicator(pgq.SetConsumer):
             t = self.get_table_by_name(row['table_name'])
             if not t:
                 t = TableState(row['table_name'], self.log)
-            t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate'])
+            t.loaded_state(row['merge_state'], row['custom_snapshot'], row['skip_truncate'])
             new_list.append(t)
             new_map[t.name] = t
 
@@ -534,8 +522,8 @@ class Replicator(pgq.SetConsumer):
             merge_state = t.render_state()
             self.log.info("storing state of %s: copy:%d new_state:%s" % (
                             t.name, self.copy_thread, merge_state))
-            q = "select londiste.subscriber_set_table_state(%s, %s, %s, %s)"
-            curs.execute(q, [self.pgq_queue_name,
+            q = "select londiste.node_set_table_state(%s, %s, %s, %s)"
+            curs.execute(q, [self.set_name,
                              t.name, t.str_snapshot, merge_state])
             t.changed = 0
             got_changes = 1
@@ -563,18 +551,6 @@ class Replicator(pgq.SetConsumer):
             return self.table_map[name]
         return None
 
-    def fill_mirror_queue(self, ev_list, dst_curs):
-        # insert events
-        rows = []
-        fields = ['ev_type', 'ev_data', 'ev_extra1']
-        for ev in mirror_list:
-            rows.append((ev.type, ev.data, ev.extra1))
-        pgq.bulk_insert_events(dst_curs, rows, fields, self.mirror_queue)
-
-        # create tick
-        q = "select pgq.ticker(%s, %s)"
-        dst_curs.execute(q, [self.mirror_queue, self.cur_tick])
-
     def launch_copy(self, tbl_stat):
         self.log.info("Launching copy process")
         script = sys.argv[0]
@@ -631,12 +607,12 @@ class Replicator(pgq.SetConsumer):
         """Restore fkeys that have both tables on sync."""
         dst_curs = dst_db.cursor()
         # restore fkeys -- one at a time
-        q = "select * from londiste.subscriber_get_queue_valid_pending_fkeys(%s)"
-        dst_curs.execute(q, [self.pgq_queue_name])
+        q = "select * from londiste.node_get_valid_pending_fkeys(%s)"
+        dst_curs.execute(q, [self.set_name])
         list = dst_curs.dictfetchall()
         for row in list:
             self.log.info('Creating fkey: %(fkey_name)s (%(from_table)s --> %(to_table)s)' % row)
-            q2 = "select londiste.subscriber_restore_table_fkey(%(from_table)s, %(fkey_name)s)"
+            q2 = "select londiste.restore_table_fkey(%(from_table)s, %(fkey_name)s)"
             dst_curs.execute(q2, row)
             dst_db.commit()
     
@@ -649,7 +625,7 @@ class Replicator(pgq.SetConsumer):
         list = dst_curs.dictfetchall()
         for row in list:
             self.log.info('Dropping fkey: %s' % row['fkey_name'])
-            q2 = "select londiste.subscriber_drop_table_fkey(%(from_table)s, %(fkey_name)s)"
+            q2 = "select londiste.drop_table_fkey(%(from_table)s, %(fkey_name)s)"
             dst_curs.execute(q2, row)
             dst_db.commit()
         
index ef3acd023522e8ac7aa735e7385305b098118273..15991311c9ebe5846ceb8c75c66dda1107f4fc20 100644 (file)
@@ -93,6 +93,8 @@ class LondisteSetup(skytools.DBScript):
                     help="force", default=False)
         p.add_option("--all", action="store_true",
                     help="include all tables", default=False)
+        p.add_option("--provider",
+                help="init: upstream node temp connect string", default=None)
         return p
 
     def exec_checked(self, curs, sql, args):
index 5e21483a37fe74a73039d4e7dbcadee4d3580238..c712d8bd34434cfdacf773554791963847a9e530 100644 (file)
@@ -18,12 +18,11 @@ class CopyTable(Replicator):
 
         if copy_thread:
             self.pidfile += ".copy"
-            self.consumer_id += "_copy"
+            self.consumer_name += "_copy"
             self.copy_thread = 1
+            self.main_worker = False
 
-    def do_copy(self, tbl_stat):
-        src_db = self.get_database('provider_db')
-        dst_db = self.get_database('subscriber_db')
+    def do_copy(self, tbl_stat, src_db, dst_db):
 
         # it should not matter to pgq
         src_db.commit()
@@ -84,7 +83,7 @@ class CopyTable(Replicator):
         # to make state juggling faster.  on mostly idle db-s
         # each step may take tickers idle_timeout secs, which is pain.
         q = "select pgq.force_tick(%s)"
-        src_curs.execute(q, [self.pgq_queue_name])
+        src_curs.execute(q, [self.src_queue.queue_name])
         src_db.commit()
 
     def real_copy(self, srccurs, dstcurs, tbl_stat):
index c1df916db55f8a5aa640b86edc810c4dda85bc8f..1ab452fc919055b271c3480d56578056b8f351b0 100644 (file)
@@ -31,7 +31,7 @@ class RawQueue:
 
         return self.batch_id
 
-    def finish_batch(self, curs, batch_id):
+    def finish_batch(self, curs):
         q = "select * from pgq.finish_batch(%s)"
         curs.execute(q, [self.batch_id])
 
@@ -39,7 +39,7 @@ class RawQueue:
         return pgq.consumer._BatchWalker(curs, self.batch_id, self.queue_name)
 
     def bulk_insert(self, curs, ev):
-        row = map(ev.__getattribute__, self.bulk_insert_fields)
+        row = map(ev.__getattr__, self.bulk_insert_fields)
         self.bulk_insert_buf.append(row)
         if len(self.bulk_insert_buf) >= self.bulk_insert_size:
             self.finish_bulk_insert(curs)
index a389f3892627fb113fd6dd8770e08916f4f5a516..d1b10e33bc4f387844dd3d55593993156e98dee9 100644 (file)
@@ -40,9 +40,9 @@ class SetAdmin(skytools.DBScript):
 
         g = optparse.OptionGroup(p, "actual setadm options")
         g.add_option("--connstr", action="store_true",
-                     help = "add: ignore table differences, repair: ignore lag")
+                     help = "initial connect string")
         g.add_option("--provider",
-                     help = "add: ignore table differences, repair: ignore lag")
+                     help = "init: connect string for provider")
         p.add_option_group(g)
         return p
 
@@ -71,6 +71,8 @@ class SetAdmin(skytools.DBScript):
         return True
 
     def init_node(self, node_type, node_name, node_location):
+        provider_loc = self.options.provider
+
         # connect to database
         db = self.get_database("new_node", connstr = node_location)
 
@@ -98,8 +100,9 @@ class SetAdmin(skytools.DBScript):
                           [self.set_name, node_name, node_location])
             self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
                           [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
+            provider_db = None
         else:
-            root_db = self.find_root_db()
+            root_db = self.find_root_db(provider_loc)
             set = self.load_root_info(root_db)
 
             # check if member already exists
@@ -109,7 +112,15 @@ class SetAdmin(skytools.DBScript):
 
             global_watermark = set.global_watermark
             combined_set = None
-            provider_name = self.options.provider
+
+            provider_db = self.get_database('provider_db', connstr = provider_loc)
+            curs = provider_db.cursor()
+            curs.execute("select node_type, node_name from pgq_set.get_node_info(%s)", [self.set_name])
+            provider_db.commit()
+            row = curs.fetchone()
+            if not row:
+                raise Exceotion("provider node not found")
+            provider_name = row['node_name']
 
             # register member on root
             self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)",
@@ -123,7 +134,6 @@ class SetAdmin(skytools.DBScript):
                 sys.exit(1)
 
             # register on provider
-            provider_db = self.get_database('provider_db', connstr = provider.location)
             self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
             self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)",
@@ -140,10 +150,18 @@ class SetAdmin(skytools.DBScript):
                            global_watermark, combined_set])
             db.commit()
 
+        self.extra_init(node_type, db, provider_db)
+
         self.log.info("Done")
 
-    def find_root_db(self):
-        loc = self.cf.get(self.initial_db_name)
+    def extra_init(self, node_type, node_db, provider_db):
+        pass
+
+    def find_root_db(self, initial_loc):
+        if initial_loc:
+            loc = initial_loc
+        else:
+            loc = self.cf.get(self.initial_db_name)
 
         while 1:
             db = self.get_database('root_db', connstr = loc)
index 6e01ccf6a96feab196701a0d08233b06553bbf68..3b87cbfd571ca54c42cfd357752187a366f3dfea 100644 (file)
@@ -20,8 +20,9 @@ class MemberInfo:
         self.dead = row['dead']
 
 class NodeInfo:
-    def __init__(self, row, member_list):
+    def __init__(self, row, member_list, main_worker = True):
         self.member_map = {}
+        self.main_worker = main_worker
         for r in member_list:
             m = MemberInfo(r)
             self.member_map[m.name] = m
@@ -43,6 +44,9 @@ class NodeInfo:
         self.worker_name = row['worker_name']
 
     def need_action(self, action_name):
+        if not self.main_worker:
+            return action_name in ('process-batch', 'process-events')
+
         typ = self.type
         if type == 'merge-leaf':
             if self.target_type == 'combined-branch':
@@ -69,12 +73,13 @@ class NodeInfo:
 
 action_map = {
 'process-batch':   {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-'process-events':  {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'copy-events':     {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+'process-events':  {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
+'copy-events':     {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
 'tick-event':      {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
 'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
 'wait-behind':     {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1},
 'sync-part-pos':   {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
 }
 
 node_properties = {
@@ -83,84 +88,101 @@ node_properties = {
 }
 
 class SetConsumer(skytools.DBScript):
-    last_global_wm_event = 0
+    last_local_wm_publish_time = 0
+    last_global_wm_publish_time = 0
+    main_worker = True
+    reg_ok = False
+    def __init__(self, service_name, args,
+                 node_db_name = 'node_db'):
+        skytools.DBScript.__init__(self, service_name, args)
+        self.node_db_name = node_db_name
+        self.consumer_name = self.cf.get('consumer_name', self.job_name)
+
     def work(self):
         self.tick_id_cache = {}
 
         self.set_name = self.cf.get('set_name')
-        target_db = self.get_database('subscriber_db')
-
-        node = self.load_node_info(target_db)
-        self.consumer_name = node.worker_name
+        dst_db = self.get_database(self.node_db_name)
+        dst_curs = dst_db.cursor()
 
-        if not node.up_to_date:
-            self.tag_node_uptodate(target_db)
+        dst_node = self.load_node_info(dst_db)
+        if self.main_worker:
+            self.consumer_name = dst_node.worker_name
+            if not dst_node.up_to_date:
+                self.tag_node_uptodate(dst_db)
 
-        if node.paused:
+        if dst_node.paused:
             return 0
 
-        if node.need_action('global-wm-event'):
-            curs = target_db.cursor()
-            self.set_global_watermark(curs, node.local_watermark)
-            target_db.commit()
+        if dst_node.need_action('global-wm-event'):
+            self.publish_global_watermark(dst_db, dst_node.local_watermark)
 
-        if not node.need_action('process-batch'):
+        if not dst_node.need_action('process-batch'):
             return 0
 
         #
         # batch processing follows
         #
 
-        source_db = self.get_database('source_db', connstr = node.provider_location)
-        srcnode = self.load_node_info(source_db)
+        src_db = self.get_database('src_db', connstr = dst_node.provider_location)
+        src_curs = src_db.cursor()
+        src_node = self.load_node_info(src_db)
         
         # get batch
-        srcqueue = RawQueue(srcnode.queue_name, self.consumer_name)
+        src_queue = RawQueue(src_node.queue_name, self.consumer_name)
+        self.src_queue = src_queue
+        self.dst_queue = None
+
+        if not self.main_worker and not self.reg_ok:
+            self.register_consumer(src_curs)
 
-        batch_id = srcqueue.next_batch(source_db.cursor())
-        source_db.commit()
+        batch_id = src_queue.next_batch(src_curs)
+        src_db.commit()
         if batch_id is None:
             return 0
 
-        if node.need_action('wait-behind'):
-            if node.should_wait(queue.cur_tick):
+        self.log.debug("New batch: tick_id=%d / batch_id=%d" % (src_queue.cur_tick, batch_id))
+
+        if dst_node.need_action('wait-behind'):
+            if dst_node.should_wait(src_queue.cur_tick):
                 return 0
 
-        if node.need_action('process-event'):
+        if dst_node.need_action('process-events'):
             # load and process batch data
-            ev_list = self.get_batch_events(source_db, batch_id)
+            ev_list = src_queue.get_batch_events(src_curs)
 
-            copy_queue = None
-            if node.need_action('copy-events'):
-                copy_queue = node.get_target_queue()
-            self.process_set_batch(target_db, ev_list, copy_queue)
-            if copy_queue:
-                copy_queue.finish_bulk_insert(curs)
-                self.copy_tick(target_curs, srcqueue, copy_queue)
+            if dst_node.need_action('copy-events'):
+                self.dst_queue = RawQueue(dst_node.get_target_queue(), self.consumer_name)
+            self.process_set_batch(src_db, dst_db, ev_list)
+            if self.dst_queue:
+                self.dst_queue.finish_bulk_insert(dst_curs)
+                self.copy_tick(dst_curs, src_queue, self.dst_queue)
 
             # COMBINED_BRANCH needs to sync with part sets
-            if node.need_action('sync-part-pos'):
-                self.move_part_positions(target_curs)
+            if dst_node.need_action('sync-part-pos'):
+                self.move_part_positions(dst_curs)
 
         # we are done on target
-        self.set_tick_complete(target_curs)
-        target_db.commit()
+        self.set_tick_complete(dst_curs, src_queue.cur_tick)
+        dst_db.commit()
 
         # done on source
-        self.finish_batch(source_db, batch_id)
+        src_queue.finish_batch(src_curs)
+        src_db.commit()
 
         # occasinally send watermark upwards
-        self.send_local_watermark_upwards(target_db, source_db)
+        if dst_node.need_action('local-wm-publish'):
+            self.send_local_watermark_upwards(src_db, dst_node)
 
         # got a batch so there can be more
         return 1
 
-    def process_set_batch(self, src_db, dst_db, ev_list, copy_queue = None):
+    def process_set_batch(self, src_db, dst_db, ev_list):
         dst_curs = dst_db.cursor()
         for ev in ev_list:
             self.process_set_event(dst_curs, ev)
-            if copy_queue:
-                copy_queue.bulk_insert(dst_curs, ev)
+            if self.dst_queue:
+                self.dst_queue.bulk_insert(dst_curs, ev)
         self.stat_add('count', len(ev_list))
 
     def process_set_event(self, dst_curs, ev):
@@ -203,21 +225,35 @@ class SetConsumer(skytools.DBScript):
         q = "select * from pgq_set.add_member(%s, %s, %s, %s)"
         dst_curs.execute(q, [set_name, node_name, node_location, dead])
 
-    def send_local_watermark_upwards(self, target_db, source_db):
-        target_curs = target_db.cursor()
-        source_curs = source_db.cursor()
-        q = "select pgq_ext.get_local_watermark(%s)"
-        target_curs.execute(q, [self.set_name])
-        wm = target_curs.fetchone()[0]
-        target_db.commit()
-    
-        q = "select pgq_ext.set_subscriber_watermark(%s, %s, %s)"
-        source_curs.execute(q, [self.set_name])
+    def send_local_watermark_upwards(self, src_db, node):
+        # fixme - delay
+        now = time.time()
+        delay = now - self.last_local_wm_publish_time
+        if delay < 1*60:
+            return
+        self.last_local_wm_publish_time = now
+
+        self.log.debug("send_local_watermark_upwards")
+        src_curs = src_db.cursor()
+        q = "select pgq_set.set_subscriber_watermark(%s, %s, %s)"
+        src_curs.execute(q, [self.set_name, node.name, node.local_watermark])
+        src_db.commit()
 
     def set_global_watermark(self, dst_curs, tick_id):
+        self.log.debug("set_global_watermark: %s" % tick_id)
         q = "select pgq_set.set_global_watermark(%s, %s)"
         dst_curs.execute(q, [self.set_name, tick_id])
 
+    def publish_global_watermark(self, dst_db, watermark):
+        now = time.time()
+        delay = now - self.last_global_wm_publish_time
+        if delay < 1*60:
+            return
+        self.last_global_wm_publish_time = now
+
+        self.set_global_watermark(dst_db.cursor(), watermark)
+        dst_db.commit()
+
     def load_node_info(self, db):
         curs = db.cursor()
 
@@ -232,10 +268,10 @@ class SetConsumer(skytools.DBScript):
         mbr_list = curs.dictfetchall()
         db.commit()
 
-        return NodeInfo(node_row, mbr_list)
+        return NodeInfo(node_row, mbr_list, self.main_worker)
 
     def tag_node_uptodate(self, dst_db):
-        dst_curs = db.cursor()
+        dst_curs = dst_db.cursor()
         q = "select * from pgq_set.set_node_uptodate(%s, true)"
         dst_curs.execute(q, [self.set_name])
         dst_db.commit()
@@ -244,6 +280,24 @@ class SetConsumer(skytools.DBScript):
         q = "select * from pgq.ticker(%s, %s)"
         dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
 
+    def set_tick_complete(self, dst_curs, tick_id):
+        q = "select * from pgq_set.set_completed_tick(%s, %s, %s)"
+        dst_curs.execute(q, [self.set_name, self.consumer_name, tick_id])
+
+    def register_consumer(self, src_curs):
+        if self.main_worker:
+            raise Exception('main set worker should not play with registrations')
+
+        q = "select * from pgq.register_consumer(%s, %s)"
+        src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name])
+
+    def unregister_consumer(self, src_curs):
+        if self.main_worker:
+            raise Exception('main set worker should not play with registrations')
+
+        q = "select * from pgq.unregister_consumer(%s, %s)"
+        src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name])
+
 if __name__ == '__main__':
     script = SetConsumer('setconsumer', sys.argv[1:])
     script.start()
index cb6ee80217b8d9831d4676f25f21b46f6e356234..3a3556a2854f661f39dff910168a38779ee04b33 100644 (file)
@@ -510,6 +510,8 @@ class DBScript(object):
             if self.looping and not self.do_single_loop:
                 time.sleep(20)
                 return 1
+            else:
+                sys.exit(1)
 
     def work(self):
         """Here should user's processing happen.
index 726f94f7ec4ce0eed7f0f4fc5e9177f69f98ab64..566576eb5b140528d243a195ead789ad5a1f4bc0 100644 (file)
@@ -330,7 +330,10 @@ class DBObject(object):
             sql = open(fn, "r").read()
         else:
             raise Exception('object not defined')
-        curs.execute(sql)
+        for stmt in skytools.parse_statements(sql):
+            if log:
+                log.debug(repr(stmt))
+            curs.execute(stmt)
 
     def find_file(self):
         full_fn = None
index 8e658fa8452a86397d7c69f8ef7805e619f8c85f..53b2563501cfbf06499e6008435a052d3b4ccda9 100644 (file)
@@ -26,19 +26,21 @@ begin
         return next;
         return;
     end if;
-    logtrg_name := i_set_name || '_logtrigger';
-    denytrg_name := i_set_name || '_denytrigger';
-    logtrg := 'create trigger ' || quote_ident(logtrg_name)
-        || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
-        || ' for each row execute procedure pgq.sqltriga(' || quote_literal(qname) || ')';
-    insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
-    values (i_set_name, fq_table_name, logtrg_name, 'root', logtrg);
-    select 200, logtrg into ret_code, ret_desc;
-    return next;
+    if qname is not null then
+        logtrg_name := i_set_name || '_logtrigger';
+        logtrg := 'create trigger ' || quote_ident(logtrg_name)
+            || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
+            || ' for each row execute procedure pgq.sqltriga(' || quote_literal(qname) || ')';
+        insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
+        values (i_set_name, fq_table_name, logtrg_name, 'root', logtrg);
+        select 200, logtrg into ret_code, ret_desc;
+        return next;
+    end if;
 
+    denytrg_name := i_set_name || '_denytrigger';
     denytrg := 'create trigger ' || quote_ident(denytrg_name)
         || ' before insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
-        || ' for each row execute procedure pgq.denytriga(' || quote_literal(qname) || ')';
+        || ' for each row execute procedure pgq.denytriga(' || quote_literal(i_set_name) || ')';
     insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
     values (i_set_name, fq_table_name, denytrg_name, 'non-root', denytrg);
     select 200, denytrg into ret_code, ret_desc;
@@ -46,5 +48,5 @@ begin
 
     return;
 end;
-$$ language plpgsql security definer;
+$$ language plpgsql;
 
diff --git a/sql/londiste/functions/londiste.set_get_table_list.sql b/sql/londiste/functions/londiste.set_get_table_list.sql
new file mode 100644 (file)
index 0000000..6082706
--- /dev/null
@@ -0,0 +1,30 @@
+
+create or replace function londiste.set_get_table_list(
+    in i_set_name       text,
+    out table_name      text,
+    out is_local        bool)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.set_get_table_list(1)
+--
+--      Show tables registered for set.
+--
+--      This means its available from root, events for it appear
+--      in queue and nodes can attach to it.
+--
+-- Called by:
+--      Admin tools.
+-- ----------------------------------------------------------------------
+begin
+    for table_name, is_local in
+        select t.table_name, n.table_name is not null
+          from londiste.set_table t left join londiste.node_table n
+               on (t.set_name = n.set_name and t.table_name = n.table_name)
+         where t.set_name = i_set_name
+    loop
+        return next;
+    end loop;
+    return;
+end;
+$$ language plpgsql strict security definer;
+
index e40330ade57ad5b7bf3797d8562cd6e617eba558..0bb20e20c1404626628db4f20633f4121f163098 100644 (file)
@@ -12,6 +12,7 @@
 -- Group: Set object registrations
 \i functions/londiste.set_add_table.sql
 \i functions/londiste.set_remove_table.sql
+\i functions/londiste.set_get_table_list.sql
 
 -- Group: FKey handling
 \i functions/londiste.handle_fkeys.sql
index 24c89961fea90e3735438120627e570f006873f7..3fbd423b5d91f7af294d2b955528bd1436cd0bf8 100644 (file)
@@ -7,7 +7,7 @@ SRCS = structure/pgq_set.sql structure/functions.sql $(FUNCS)
 REGRESS = pgq_set
 REGRESS_OPTS = --load-language=plpgsql
 
-PGXS = $(shell pg_config --pgxs)
+include ../../config.mak
 include $(PGXS)
 
 NDOC = NaturalDocs
index 445ca13bfac0a0bfc461f5c5579e73a3ff845109..ceac1bb4ba004b0707a849b9524c012962dab781 100644 (file)
@@ -41,7 +41,7 @@ begin
     end if;
 
     if this.node_type in ('root', 'combined-root') then
-        perform pgq.insert_event(this.queue_name, 'global-watermark', i_watermark,
+        perform pgq.insert_event(this.queue_name, 'global-watermark', i_watermark::text,
                                  i_set_name, null, null, null);
     end if;
     return i_watermark;
index 3f4d85f9d5bfa10ecb333093fa3ee3534ea7e3db..c0cbd2f037f71d7b61885030c93a6cf0e3177b19 100644 (file)
@@ -7,9 +7,7 @@ returns bigint as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq_set.set_subscriber_watermark(3)
 --
---      Notify provider about subscribers lowest watermark.  On root
---      node, changes global_watermark and sends event about that
---      to the queue.
+--      Notify provider about subscribers lowest watermark.
 --
 -- Parameters:
 --      i_set_name - set name
@@ -23,15 +21,6 @@ declare
     m       record;
     cur_wm  bigint;
 begin
-    select node_type, global_watermark, local_queue
-      into m
-      from pgq_set.local_node
-     where set_name = i_set_name
-       for update;
-    if not found then
-        raise exception 'no such set: %', i_set_name;
-    end if;
-
     update pgq_set.subscriber_info
        set local_watermark = i_watermark
      where set_name = i_set_name
@@ -40,15 +29,6 @@ begin
         raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
     end if;
 
-    if m.node_type in ('root', 'combined-root') then
-        cur_wm := pgq_set.get_local_watermark(i_set_name);
-        if cur_wm > m.global_watermark then
-            update pgq_set.local_node set global_watermark = cur_wm
-                where set_name = i_set_name;
-            perform pgq.insert_event(m.local_queue, 'global-watermark', cur_wm);
-        end if;
-    end if;
-
     return i_watermark;
 end;
 $$ language plpgsql security definer;
diff --git a/sql/pgq_set/functions/pgq_set.track_tick.sql b/sql/pgq_set/functions/pgq_set.track_tick.sql
new file mode 100644 (file)
index 0000000..17831ea
--- /dev/null
@@ -0,0 +1,33 @@
+
+create or replace function pgq_set.get_completed_tick(a_set text, a_consumer text)
+returns int8 as $$
+declare
+    res   int8;
+begin
+    select tick_id into res
+      from pgq_set.completed_tick
+     where set_name = a_set and worker_name = a_consumer;
+    return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_set.set_completed_tick(a_set text, a_consumer text, a_tick_id bigint)
+returns integer as $$
+begin
+    if a_tick_id is null then
+        delete from pgq_set.completed_tick
+         where set_name = a_set and worker_name = a_consumer;
+    else   
+        update pgq_set.completed_tick
+           set tick_id = a_tick_id
+         where set_name = a_set and worker_name = a_consumer;
+        if not found then
+            insert into pgq_set.completed_tick (set_name, worker_name, tick_id)
+                values (a_set, a_consumer, a_tick_id);
+        end if;
+    end if;
+
+    return 1;
+end;
+$$ language plpgsql security definer;
+
index 70b44a918770f8e2e6e2d286690ccbfffd39d592..fd9d27c7f6e94247e60423f176f7cddc4c55ef16 100644 (file)
@@ -20,4 +20,5 @@
 \i functions/pgq_set.set_global_watermark.sql
 \i functions/pgq_set.set_partition_watermark.sql
 
+\i functions/pgq_set.track_tick.sql
 
index 7344be9f2e06b1f7d4baf1af155c6f8ec3bb1d79..211a8ab803bc785a428b2bc396ce9a757a4d3816 100644 (file)
@@ -23,7 +23,7 @@ create table pgq_set.member_info (
 );
 
 -- ----------------------------------------------------------------------
--- Table: pgq_set.local_node
+-- Table: pgq_set.set_info
 --
 --      Local node info.
 --
diff --git a/tests/londiste/Makefile b/tests/londiste/Makefile
new file mode 100644 (file)
index 0000000..fb5f2db
--- /dev/null
@@ -0,0 +1,72 @@
+
+PYTHONPATH := ../../python:$(PYTHONPATH)
+PATH := ../../python:../../scripts:$(PATH)
+
+contrib=/usr/share/postgresql/8.1/contrib
+contrib=/opt/apps/pgsql-dev/share/contrib
+contrib=/opt/pgsql/share/contrib
+
+
+tests: gendb
+       pgqadm.py conf/ticker_root.ini ticker -v -d
+       pgqadm.py conf/ticker_branch.ini ticker -v -d
+       pgqadm.py conf/ticker_leaf.ini ticker -v -d
+       londiste.py conf/w_root.ini replay -d -v
+       londiste.py conf/w_branch.ini replay -d -v
+       londiste.py conf/w_leaf.ini replay -d -v
+       echo everything is running now
+       sleep 10
+       grep -E 'WARN|ERR|CRIT' sys/log.*
+
+gendb: init db_root db_branch db_leaf
+
+init:
+       mkdir -p sys
+       ./stop.sh
+       sleep 1
+       rm -rf file_logs sys
+       mkdir -p sys
+
+db_root:
+       echo "creating database: $@"
+       dropdb $@ && sleep 1 || true
+       sleep 1
+       createdb $@
+       londiste.py conf/w_root.ini init-root n_root "dbname=$@"
+       pgqadm.py -v conf/ticker_root.ini install
+       psql -q $@ -f data.sql
+       londiste.py -v conf/w_root.ini add data1
+       #londiste.py -v conf/w_root.ini add data1
+       #londiste.py -v conf/w_root.ini remove data1
+       #londiste.py -v conf/w_root.ini remove data1
+       #londiste.py -v conf/w_root.ini add data1
+       londiste.py -v conf/w_root.ini tables
+
+db_branch:
+       echo "creating database: $@"
+       dropdb $@ && sleep 1 || true
+       createdb $@
+       pgqadm.py conf/ticker_branch.ini install
+       londiste.py conf/w_branch.ini init-branch n_branch "dbname=$@" --provider="dbname=db_root"
+       psql -q $@ -f data.sql
+       londiste.py conf/w_branch.ini add data1
+       #londiste.py conf/w_branch.ini add data1
+       #londiste.py conf/w_branch.ini remove data1
+       #londiste.py conf/w_branch.ini remove data1
+       #londiste.py conf/w_branch.ini add data1
+       londiste.py conf/w_branch.ini tables
+
+db_leaf:
+       echo "creating database: $@"
+       dropdb $@ && sleep 1 || true
+       createdb $@
+       pgqadm.py conf/ticker_leaf.ini install
+       londiste.py conf/w_leaf.ini init-leaf n_leaf "dbname=$@" --provider="dbname=db_root"
+       psql -q $@ -f data.sql
+       londiste.py conf/w_leaf.ini add data1
+       #londiste.py conf/w_leaf.ini add data1
+       #londiste.py conf/w_leaf.ini remove data1
+       #londiste.py conf/w_leaf.ini remove data1
+       #londiste.py conf/w_leaf.ini add data1
+       londiste.py conf/w_leaf.ini tables
+
diff --git a/tests/londiste/conf/ticker_branch.ini b/tests/londiste/conf/ticker_branch.ini
new file mode 100644 (file)
index 0000000..63210ee
--- /dev/null
@@ -0,0 +1,21 @@
+[pgqadm]
+
+job_name = ticker_branch
+
+db = dbname=db_branch
+
+# how often to run maintenance [minutes]
+maint_delay_min = 1
+
+# how often to check for activity [secs]
+loop_delay = 0.5
+
+logfile = sys/log.%(job_name)s
+pidfile = sys/pid.%(job_name)s
+
+use_skylog = 0
+
+connection_lifetime = 21
+
+queue_refresh_period = 10
+
diff --git a/tests/londiste/conf/ticker_leaf.ini b/tests/londiste/conf/ticker_leaf.ini
new file mode 100644 (file)
index 0000000..25ee7ba
--- /dev/null
@@ -0,0 +1,21 @@
+[pgqadm]
+
+job_name = ticker_leaf
+
+db = dbname=db_leaf
+
+# how often to run maintenance [minutes]
+maint_delay_min = 1
+
+# how often to check for activity [secs]
+loop_delay = 0.5
+
+logfile = sys/log.%(job_name)s
+pidfile = sys/pid.%(job_name)s
+
+use_skylog = 0
+
+connection_lifetime = 21
+
+queue_refresh_period = 10
+
diff --git a/tests/londiste/conf/w_branch.ini b/tests/londiste/conf/w_branch.ini
new file mode 100644 (file)
index 0000000..f5445a9
--- /dev/null
@@ -0,0 +1,14 @@
+[londiste]
+job_name = worker_branch
+
+set_name = test_set
+
+node_db = dbname=db_branch
+
+pidfile = sys/pid.%(job_name)s
+logfile = sys/log.%(job_name)s
+
+loop_delay = 1
+
+connection_lifetime = 30
+
diff --git a/tests/londiste/conf/w_leaf.ini b/tests/londiste/conf/w_leaf.ini
new file mode 100644 (file)
index 0000000..eaa7ad4
--- /dev/null
@@ -0,0 +1,14 @@
+[londiste]
+job_name = worker_leaf
+
+set_name = test_set
+
+node_db = dbname=db_leaf
+
+pidfile = sys/pid.%(job_name)s
+logfile = sys/log.%(job_name)s
+
+loop_delay = 1
+
+connection_lifetime = 30
+
diff --git a/tests/londiste/conf/w_root.ini b/tests/londiste/conf/w_root.ini
new file mode 100644 (file)
index 0000000..3aa733d
--- /dev/null
@@ -0,0 +1,15 @@
+
+[londiste]
+job_name = worker_root
+
+set_name = test_set
+
+node_db = dbname=db_root
+
+pidfile = sys/pid.%(job_name)s
+logfile = sys/log.%(job_name)s
+
+loop_delay = 1
+
+connection_lifetime = 30
+
diff --git a/tests/londiste/conf/w_root.ini.rej b/tests/londiste/conf/w_root.ini.rej
new file mode 100644 (file)
index 0000000..c38182c
--- /dev/null
@@ -0,0 +1,17 @@
+***************
+*** 0 ****
+--- 1,14 ----
++ [londiste]
++ job_name = worker_root
++ 
++ set_name = test_set
++ 
++ node_db = dbname=db_root
++ 
++ pidfile = sys/pid.%(job_name)s
++ logfile = sys/log.%(job_name)s
++ 
++ loop_delay = 1
++ 
++ connection_lifetime = 30
++ 
index 2e4b5a3614ddbb1515c408ba711eec33693a2c71..25950d8d43ef2f8292407eb45e3a17f56ef3be33 100755 (executable)
@@ -6,6 +6,7 @@ contrib=/usr/share/postgresql/8.1/contrib
 contrib=/opt/apps/pgsql-dev/share/contrib
 contrib=/opt/pgsql/share/contrib
 
+set -e
 
 
 mkdir -p sys
@@ -17,15 +18,12 @@ mkdir -p sys
 
 db=db_root
 echo "creating database: $db"
-dropdb $db
+dropdb $db && sleep 1 || true
 sleep 1
 createdb $db
-
 londiste.py conf/w_root.ini init-root n_root "dbname=$db"
-
 pgqadm.py conf/ticker_root.ini install
 psql -q $db -f data.sql
-
 londiste.py conf/w_root.ini add data1
 londiste.py conf/w_root.ini add data1
 londiste.py conf/w_root.ini remove data1
@@ -33,6 +31,20 @@ londiste.py conf/w_root.ini remove data1
 londiste.py conf/w_root.ini add data1
 londiste.py conf/w_root.ini tables
 
+db=db_branch
+echo "creating database: $db"
+dropdb $db && sleep 1 || true
+createdb $db
+pgqadm.py conf/ticker_branch.ini install
+londiste.py conf/w_branch.ini init-branch n_branch "dbname=$db" --provider="dbname=db_root"
+psql -q $db -f data.sql
+londiste.py conf/w_branch.ini add data1
+londiste.py conf/w_branch.ini add data1
+londiste.py conf/w_branch.ini remove data1
+londiste.py conf/w_branch.ini remove data1
+londiste.py conf/w_branch.ini add data1
+londiste.py conf/w_branch.ini tables
+
 exit 0
 
 db=subscriber
index b6b951e4256c021d12495e9c5926c0253d720711..f808281d72f80ad8135555681ec7dc141d14bc07 100755 (executable)
@@ -2,11 +2,20 @@
 
 . ../env.sh
 ./testing.py -s conf/tester.ini
-londiste.py -s conf/fwrite.ini
-londiste.py -s conf/replic.ini
+londiste.py -s conf/w_leaf.ini
+londiste.py -s conf/w_branch.ini
+londiste.py -s conf/w_root.ini
 
 sleep 1
 
-pgqadm.py -s conf/ticker.ini
-pgqadm.py -s conf/linkticker.ini
+pgqadm.py -s conf/ticker_root.ini
+pgqadm.py -s conf/ticker_branch.ini
+pgqadm.py -s conf/ticker_leaf.ini
+
+sleep 1
+
+for f in sys/pid.*; do
+  test -f "$f" || continue
+  kill `cat $f`
+done