python experiments on set handling
authorMarko Kreen <markokr@gmail.com>
Wed, 5 Dec 2007 15:47:38 +0000 (15:47 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 5 Dec 2007 15:47:38 +0000 (15:47 +0000)
python/pgq/rawconsumer.py [new file with mode: 0644]
python/pgq/setconsumer.py [new file with mode: 0644]
python/setadm.py [new file with mode: 0755]

diff --git a/python/pgq/rawconsumer.py b/python/pgq/rawconsumer.py
new file mode 100644 (file)
index 0000000..0cff4b2
--- /dev/null
@@ -0,0 +1,51 @@
+
+
+import sys, time, skytools, pgq.consumer
+
+class RawQueue:
+    queue_name = None
+    consumer_name = None
+    batch_id = None
+    cur_tick = None
+    prev_tick = None
+    def __init__(self, queue_name, consumer_name):
+        self.queue_name = queue_name
+        self.consumer_name = consumer_name
+        self.bulk_insert_buf = []
+        self.bulk_insert_size = 200
+        self.bulk_insert_fields = ['ev_id', 'ev_time', 'ev_type', 'ev_data', 'ev_extra1', 'ev_extra2', 'ev_extra3', 'ev_extra4']
+
+    def next_batch(self, curs):
+        q = "select * from pgq.next_batch(%s, %s)"
+        curs.execute(q, [self.queue_name, self.consumer_name])
+        self.batch_id = curs.fetchone()[0]
+
+        if not self.batch_id:
+            return self.batch_id
+
+        q = "select tick_id, prev_tick_id from pgq.get_batch_info(%s)"
+        curs.execute(q, [self.batch_id])
+        inf = curs.dictfetchone()
+        self.cur_tick = inf['tick_id']
+        self.prev_tick = inf['prev_tick_id']
+
+        return self.batch_id
+
+    def finish_batch(self, curs, batch_id): pass
+        q = "select * from pgq.finish_batch(%s)"
+        curs.execute(q, [self.batch_id])
+
+    def get_batch_events(self, curs):
+        return pgq.consumer._BatchWalker(curs, self.batch_id, self.queue_name)
+
+    def bulk_insert(self, curs, ev):
+        row = map(ev.__getattribute__, self.bulk_insert_fields)
+        self.bulk_insert_buf.append(row)
+        if len(self.bulk_insert_buf) >= self.bulk_insert_size:
+            self.finish_bulk_insert(curs)
+
+    def finish_bulk_insert(self, curs):
+        pgq.bulk_insert_events(curs, self.bulk_insert_buf,
+                               self.bulk_insert_fields, self.queue_name):
+        self.bulk_insert_buf = []
+
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py
new file mode 100644 (file)
index 0000000..6acb230
--- /dev/null
@@ -0,0 +1,243 @@
+#! /usr/bin/env python
+
+import sys, time, skytools
+
+ROOT = 'root'
+BRANCH = 'branch'
+LEAF = 'leaf'
+COMBINED_ROOT = 'combined-root'
+COMBINED_BRANCH = 'combined-branch'
+MERGE_LEAF = 'merge-leaf'
+
+class MemberInfo:
+    def __init__(self, row):
+        self.name = row['node_name']
+        self.location = row['node_location']
+        self.dead = row['dead']
+
+class NodeInfo:
+    def __init__(self, row, member_list):
+        self.member_map = {}
+        for r in member_list:
+            m = MemberInfo(r)
+            self.member_map[m.name] = m
+
+        self.name = row['node_name']
+        self.type = row['node_type']
+        self.queue_name = row['queue_name']
+        self.global_watermark = row['global_watermark']
+        self.local_watermark = row['local_watermark']
+        self.completed_tick = row['completed_tick']
+        self.provider_node = row['provider_node']
+        self.paused = row['paused']
+        self.resync = row['resync']
+        self.up_to_date = row['up_to_date']
+        self.combined_set = row['combined_set']
+        self.combined_type = row['combined_type']
+        self.combined_queue = row['combined_queue']
+
+    def need_action(self, action_name):
+        typ = self.type
+        if type == 'merge-leaf':
+            if self.target_type == 'combined-branch':
+                typ += "merge-leaf-to-branch"
+            elif self.target_type == 'combined-root':
+                typ += "merge-leaf-to-root"
+            else:
+                raise Exception('bad target type')
+
+        try:
+            return action_map[action_name][typ]
+        except KeyError, d:
+            raise Exception('need_action(name=%s, type=%s) unknown' % (action_name, typ))
+
+    def get_target_queue(self):
+        qname = None
+        if self.type == 'merge-leaf':
+            qname = self.combined_queue
+        else:
+            qname = self.queue_name
+        if qname is None:
+            raise Exception("no target queue")
+        return qname
+
+action_map = {
+'process-batch':   {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
+'process-events':  {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
+'copy-events':     {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+'tick-event':      {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
+'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+'wait-behind':     {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1},
+'sync-part-pos':   {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+}
+
+node_properties = {
+'pgq':     {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':1},
+'queue':   {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':0},
+}
+
+class SetConsumer(skytools.DBScript):
+    last_global_wm_event = 0
+    def work(self):
+
+        self.tick_id_cache = {}
+
+        self.set_name = self.cf.get('set_name')
+        target_db = self.get_database('subscriber_db')
+
+        node = self.load_node_info(target_db)
+
+        if not node.up_to_date:
+            self.tag_node_uptodate(target_db)
+
+        if node.paused:
+            return 0
+
+        if node.need_action('global-wm-event'):
+            curs = target_db.cursor()
+            self.set_global_watermark(curs, node.local_watermark)
+            target_db.commit()
+
+        if not node.need_action('process-batch'):
+            return 0
+
+        #
+        # batch processing follows
+        #
+
+        srcnode = self.load_node_info(source_db)
+        
+        # get batch
+        srcqueue = RawQueue(srcnode.queue_name, self.consumer_name)
+
+        batch_id = srcqueue.next_batch(source_db.cursor())
+        source_db.commit()
+        if batch_id is None:
+            return 0
+
+        if node.need_action('wait-behind'):
+            if node.should_wait(queue.cur_tick):
+                return 0
+
+        if node.need_action('process-event'):
+            # load and process batch data
+            ev_list = self.get_batch_events(source_db, batch_id)
+
+            copy_queue = None
+            if node.need_action('copy-events'):
+                copy_queue = node.get_target_queue()
+            self.process_set_batch(target_db, ev_list, copy_queue)
+            if copy_queue:
+                copy_queue.finish_bulk_insert(curs)
+                self.copy_tick(target_curs, srcqueue, copy_queue)
+
+            # COMBINED_BRANCH needs to sync with part sets
+            if node.need_action('sync-part-pos'):
+                self.move_part_positions(target_curs)
+
+        # we are done on target
+        self.set_tick_complete(target_curs)
+        target_db.commit()
+
+        # done on source
+        self.finish_batch(source_db, batch_id)
+
+        # occasinally send watermark upwards
+        self.send_local_watermark_upwards(target_db, source_db)
+
+        # got a batch so there can be more
+        return 1
+
+    def process_set_batch(self, src_db, dst_db, ev_list, copy_queue = None):
+        curs = db.cursor()
+        for ev in ev_list:
+            self.process_set_event(curs, ev)
+            if copy_queue:
+                copy_queue.bulk_insert(curs, ev)
+        self.stat_add('count', len(ev_list))
+
+    def process_set_event(self, curs, ev):
+        if ev.type == 'set-tick':
+            self.handle_set_tick(curs, ev)
+        elif ev.type == 'set-member-info':
+            self.handle_member_info(curs, ev)
+        elif ev.type == 'global-watermark':
+            self.handle_global_watermark(curs, ev)
+        else:
+            raise Exception('bad event for set consumer')
+
+    def handle_global_watermark(self, curs, ev):
+        set_name = ev.extra1
+        tick_id = ev.data
+        if set_name == self.set_name:
+            self.set_global_watermark(curs, tick_id)
+
+    def handle_set_tick(self, curs, ev):
+        data = skytools.db_urldecode(ev.data)
+        set_name = data['set_name']
+        tick_id = data['tick_id']
+        self.tick_id_cache[set_name] = tick_id
+
+    def move_part_positions(self, curs):
+        q = "select * from pgq_set.set_partition_watermark(%s, %s, %s)"
+        for set_name, tick_id in self.tick_id_cache.items():
+            curs.execute(q, [self.set_name, set_name, tick_id])
+
+    def handle_member_info(self, curs, ev):
+        data = skytools.db_urldecode(ev.data)
+        set_name = data['set_name']
+        node_name = data['node_name']
+        node_location = data['node_location']
+        dead = data['dead']
+        # this can also be member for part set, ignore then
+        if set_name != self.set_name:
+            return
+
+        q = "select * from pgq_set.add_member(%s, %s, %s, %s)"
+        curs.execute(q, [set_name, node_name, node_location, dead])
+
+    def send_local_watermark_upwards(self, target_db, source_db):
+        target_curs = target_db.cursor()
+        source_curs = source_db.cursor()
+        q = "select pgq_ext.get_local_watermark(%s)"
+        target_curs.execute(q, [self.set_name])
+        wm = target_curs.fetchone()[0]
+        target_db.commit()
+    
+        q = "select pgq_ext.set_subscriber_watermark(%s, %s, %s)"
+        source_curs.execute(q, [self.set_name])
+
+    def set_global_watermark(self, curs, tick_id):
+        q = "select pgq_set.set_global_watermark(%s, %s)"
+        curs.execute(q, [self.set_name, tick_id])
+
+    def load_node_info(self, db):
+        curs = db.cursor()
+
+        q = "select * from pgq_set.get_node_info(%s)"
+        curs.execute(q, [self.set_name])
+        node_row = curs.dictfetchone()
+        if not node_row:
+            raise Exception('node not initialized')
+
+        q = "select * from pgq_set.get_member_info(%s)"
+        curs.execute(q, [self.set_name])
+        mbr_list = curs.dictfetchall()
+        db.commit()
+
+        return NodeInfo(node_row, mbr_list)
+
+    def tag_node_uptodate(self, db):
+        curs = db.cursor()
+        q = "select * from pgq_set.set_node_uptodate(%s, true)"
+        curs.execute(q, [self.set_name])
+        db.commit()
+
+    def copy_tick(self, curs, src_queue, dst_queue):
+        q = "select * from pgq.ticker(%s, %s)"
+        curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
+
+if __name__ == '__main__':
+    script = SetConsumer('setconsumer', ['test.ini'])
+    script.start()
+
diff --git a/python/setadm.py b/python/setadm.py
new file mode 100755 (executable)
index 0000000..d9c44b9
--- /dev/null
@@ -0,0 +1,212 @@
+#! /usr/bin/env python
+
+import sys, optparse, skytools
+
+from pgq.setconsumer import MemberInfo, NodeInfo
+
+
+class MemberInfo:
+    def __init__(self, row):
+        self.name = row['node_name']
+        self.location = row['node_location']
+        self.dead = row['dead']
+
+class SetInfo:
+    def __init__(self, set_name, info_row, member_rows):
+        self.root_info = info_row
+        self.set_name = set_name
+        self.member_map = {}
+        self.root_name = info_row['node_name']
+        self.root_type = info_row['node_type']
+        self.global_watermark = info_row['global_watermark']
+
+        for r in member_rows:
+            n = MemberInfo(r)
+            self.member_map[n.name] = n
+
+    def get_member(self, name):
+        return self.member_map.get(name)
+
+command_usage = """
+%prog [options] INI CMD [subcmd args]
+
+commands:
+"""
+
+class SetAdmin(skytools.DBScript):
+    root_name = None
+    root_info = None
+    member_map = {}
+    set_name = None
+
+    def init_optparse(self, parser = None):
+        p = skytools.DBScript.init_optparse(self, parser)
+        p.set_usage(command_usage.strip())
+
+        g = optparse.OptionGroup(p, "actual setadm options")
+        g.add_option("--connstr", action="store_true",
+                     help = "add: ignore table differences, repair: ignore lag")
+        g.add_option("--provider",
+                     help = "add: ignore table differences, repair: ignore lag")
+        p.add_option_group(g)
+        return p
+
+    def work(self):
+        self.set_single_loop(1)
+
+        self.set_name = self.cf.get('set_name')
+
+        if self.is_cmd("init-root", 2):
+            self.init_node("root", self.args[2], self.args[3])
+        elif self.is_cmd("init-branch", 2):
+            self.init_node("branch", self.args[2], self.args[3])
+        elif self.is_cmd("init-leaf", 2):
+            self.init_node("leaf", self.args[2], self.args[3])
+        else:
+            self.log.info("need command")
+
+    def is_cmd(self, name, argcnt):
+        if len(self.args) < 2:
+            return False
+        if self.args[1] != name:
+            return False
+        if len(self.args) != argcnt + 2:
+            self.log.error("cmd %s needs %d args" % (name, argcnt))
+            sys.exit(1)
+        return True
+
+    def init_node(self, node_type, node_name, node_location):
+        # connect to database
+        db = self.get_database("new_node", connstr = node_location)
+
+        # check if code is installed
+        self.install_code(db)
+
+        # query current status
+        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+        info = res[0]
+        if info['node_type'] is not None:
+            self.log.info("Node is already initialized as %s" % info['node_type'])
+            return
+
+        # register member
+        if node_type in ('root', 'combined-root'):
+            global_watermark = None
+            combined_set = None
+            provider_name = None
+            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
+        else:
+            root_db = self.find_root_db()
+            set = self.load_root_info(root_db)
+
+            # check if member already exists
+            if set.get_member(node_name) is not None:
+                self.log.error("Node '%s' already exists" % node_name)
+                sys.exit(1)
+
+            global_watermark = set.global_watermark
+            combined_set = None
+            provider_name = self.options.provider
+
+            # register member on root
+            self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            root_db.commit()
+
+            # lookup provider
+            provider = set.get_member(provider_name)
+            if not provider:
+                self.log.error("Node %s does not exist" % provider_name)
+                sys.exit(1)
+
+            # register on provider
+            worker_name = "qweqweqwe"
+            provider_db = self.get_database('provider_db', connstr = provider.location)
+            self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)",
+                          [self.set_name, node_name, worker_name])
+            provider_db.commit()
+
+            # initialize node itself
+            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, provider_name, provider.location])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
+            db.commit()
+
+            
+
+
+        self.log.info("Done")
+
+    def find_root_db(self):
+        db = self.get_database('root_db')
+
+        while 1:
+            # query current status
+            res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+            info = res[0]
+            type = info['node_type']
+            if type is None:
+                self.log.info("Root node not initialized?")
+                sys.exit(1)
+
+            # configured db may not be root anymore, walk upwards then
+            if type in ('root', 'combined-root'):
+                db.commit()
+                return db
+
+            self.close_connection()
+            loc = info['provider_location']
+            if loc is None:
+                self.log.info("Sub node provider not initialized?")
+                sys.exit(1)
+
+            # walk upwards
+            db = self.get_database('root_db', connstr = loc)
+
+    def load_root_info(self, db):
+        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+        info = res[0]
+
+        q = "select * from pgq_set.get_member_info(%s)"
+        node_list = self.exec_query(db, q, [self.set_name])
+
+        db.commit()
+
+        return SetInfo(self.set_name, info, node_list)
+
+    def exec_sql(self, db, q, args):
+        self.log.debug(q)
+        curs = db.cursor()
+        curs.execute(q, args)
+        db.commit()
+
+    def exec_query(self, db, q, args):
+        self.log.debug(q)
+        curs = db.cursor()
+        curs.execute(q, args)
+        res = curs.dictfetchall()
+        db.commit()
+        return res
+
+    def install_code(self, db):
+        objs = [
+            skytools.DBLanguage("plpgsql"),
+            skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
+            skytools.DBSchema("pgq", sql_file="pgq.sql"),
+            skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"),
+        ]
+        skytools.db_install(db.cursor(), objs, self.log)
+        db.commit()
+
+if __name__ == '__main__':
+    script = SetAdmin('set_admin', sys.argv[1:])
+    script.start()
+