python/pgq update
authorMarko Kreen <markokr@gmail.com>
Fri, 13 Feb 2009 12:16:59 +0000 (14:16 +0200)
committerMarko Kreen <markokr@gmail.com>
Fri, 13 Feb 2009 13:20:31 +0000 (15:20 +0200)
- remove the obsolete setconsumer stuff
- New CascadedConsumer / CascadedWorker classes,
  that are based on regular pgq.Consumer

- move RemoteConsumer / SerialConsumer out of pgq/consumer.py

pgq.Consumer:
- rename conf params and instance variables:
  pgq_queue_name -> queue_name
  pgq_consumer_id -> consumer_name
- disable automatic registration on the queue,
  now script needs to be called with switches --register / --unregister
- drop code to read from multiple-queues at once

pgq.ticker:
- drop event seq tracking code, this is now done in db

18 files changed:
python/conf/pgqadm.ini
python/pgq/__init__.py
python/pgq/cascade/__init__.py [new file with mode: 0644]
python/pgq/cascade/admin.py [new file with mode: 0644]
python/pgq/cascade/consumer.py [new file with mode: 0644]
python/pgq/cascade/nodeinfo.py [moved from python/pgq/setinfo.py with 54% similarity]
python/pgq/cascade/worker.py [new file with mode: 0644]
python/pgq/consumer.py
python/pgq/event.py
python/pgq/maint.py
python/pgq/rawconsumer.py [deleted file]
python/pgq/remoteconsumer.py [new file with mode: 0644]
python/pgq/setadmin.py [deleted file]
python/pgq/setconsumer.py [deleted file]
python/pgq/status.py
python/pgq/ticker.py
python/pgqadm.py
python/setadm.py

index a2e92f6b0cb5895c20276e01b654b3a60cf2bfc6..24ac4fc4e1ec58e9be64be5772f21ad9227e9049 100644 (file)
@@ -1,6 +1,7 @@
 
 [pgqadm]
 
+# should be globally unique
 job_name = pgqadm_somedb
 
 db = dbname=provider port=6000 host=127.0.0.1
index bb78abc0af4f017141f787bd9a0a2be2945413e6..b34ef625c1296553fcf9134170a7528fe3b918f5 100644 (file)
@@ -1,18 +1,47 @@
 """PgQ framework for Python."""
 
+__pychecker__ = 'no-miximport'
+
 import pgq.event
 import pgq.consumer
-import pgq.setconsumer
+import pgq.remoteconsumer
 import pgq.producer
 
+import pgq.ticker
+import pgq.maint
+import pgq.status
+
+import pgq.cascade
+import pgq.cascade.nodeinfo
+import pgq.cascade.admin
+import pgq.cascade.consumer
+import pgq.cascade.worker
+
 from pgq.event import *
 from pgq.consumer import *
-from pgq.setconsumer import *
+from pgq.remoteconsumer import *
 from pgq.producer import *
 
+from pgq.ticker import *
+from pgq.maint import *
+from pgq.status import *
+
+from pgq.cascade.nodeinfo import *
+from pgq.cascade.admin import *
+from pgq.cascade.consumer import *
+from pgq.cascade.worker import *
+
 __all__ = (
     pgq.event.__all__ +
     pgq.consumer.__all__ +
-    pgq.setconsumer.__all__ +
-    pgq.producer.__all__ )
+    pgq.remoteconsumer.__all__ +
+    pgq.cascade.nodeinfo.__all__ +
+    pgq.cascade.admin.__all__ +
+    pgq.cascade.consumer.__all__ +
+    pgq.cascade.worker.__all__ +
+    pgq.producer.__all__ +
+    pgq.ticker.__all__ +
+    pgq.maint.__all__ +
+    pgq.status.__all__ )
+
 
diff --git a/python/pgq/cascade/__init__.py b/python/pgq/cascade/__init__.py
new file mode 100644 (file)
index 0000000..051c462
--- /dev/null
@@ -0,0 +1,2 @@
+"""Cascaded Queue support."""
+
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
new file mode 100644 (file)
index 0000000..e1cb482
--- /dev/null
@@ -0,0 +1,701 @@
+#! /usr/bin/env python
+
+## NB: not all commands work ##
+
+"""cascaded queue administration.
+
+londiste.py INI pause [NODE [CONS]]
+
+setadm.py INI pause NODE [CONS]
+
+
+"""
+
+import sys, time, optparse, skytools
+
+from pgq.cascade.nodeinfo import *
+
+__all__ = ['CascadeAdmin']
+
+command_usage = """\
+%prog [options] INI CMD [subcmd args]
+
+Node Initialization:
+  create-root   NAME CONNSTR
+  create-branch NAME CONNSTR --provider=<public_connstr>
+  create-leaf   NAME CONNSTR --provider=<public_connstr>
+    Initializes node.
+
+    setadm: give worker name with switch --worker.
+
+Node Administration:
+  pause                             Pause a consumer.
+  resume                            Resume a consumer.
+  change-provider --provider NEW    Change where consumer reads from
+
+     setadm: --node and/or --consumer switches to specify
+     either node or consumer.
+
+Works, naming problems:
+
+  status                Show set state      [set-status]
+  members               Show members in set [nodes]
+  switchover --target NODE [--all]
+
+Broken:
+
+  rename-node OLD NEW   Rename a node
+  show-consumers [--node]
+  failover NEWROOT
+  tag-dead NODE ..      Tag node as dead
+  tag-alive NODE ..     Tag node as alive
+"""
+
+class CascadeAdmin(skytools.AdminScript):
+    """Cascaded pgq administration."""
+    queue_name = None
+    queue_info = None
+    extra_objs = []
+    local_node = None
+
+    def __init__(self, svc_name, dbname, args, worker_setup = False):
+        skytools.AdminScript.__init__(self, svc_name, args)
+        self.initial_db_name = dbname
+        if worker_setup:
+            self.options.worker = self.job_name
+            self.options.consumer = self.job_name
+
+    def init_optparse(self, parser = None):
+        """Add SetAdmin switches to parser."""
+        p = skytools.AdminScript.init_optparse(self, parser)
+        p.set_usage(command_usage.strip())
+
+        g = optparse.OptionGroup(p, "actual queue admin options")
+        g.add_option("--connstr", action="store_true",
+                     help = "initial connect string")
+        g.add_option("--provider",
+                     help = "init: connect string for provider")
+        g.add_option("--queue",
+                     help = "specify queue name")
+        g.add_option("--worker",
+                     help = "create: specify worker name")
+        g.add_option("--node",
+                     help = "specify node name")
+        g.add_option("--consumer",
+                     help = "specify consumer name")
+        g.add_option("--target",
+                    help = "switchover: specify replacement node")
+        g.add_option("--merge",
+                    help = "create-node: combined queue name")
+        p.add_option_group(g)
+        return p
+
+    def reload(self):
+        """Reload config."""
+        skytools.AdminScript.reload(self)
+        if self.options.queue:
+            self.queue_name = self.options.queue
+        else:
+            self.queue_name = self.cf.get('queue_name', '')
+            if not self.queue_name:
+                self.queue_name = self.cf.get('pgq_queue_name', '')
+                if not self.queue_name:
+                    raise Exception('"queue_name" not specified in config')
+
+    #
+    # Node initialization.
+    #
+
+    def cmd_create_root(self, node_name, node_location):
+        return self.create_node('root', node_name, node_location)
+
+    def cmd_create_branch(self, node_name, node_location):
+        return self.create_node('branch', node_name, node_location)
+
+    def cmd_create_leaf(self, node_name, node_location):
+        return self.create_node('leaf', node_name, node_location)
+
+    def create_node(self, node_type, node_name, node_location):
+        """Generic node init."""
+        provider_loc = self.options.provider
+
+        if node_type not in ('root', 'branch', 'leaf'):
+            raise Exception('unknown node type')
+
+        # connect to database
+        db = self.get_database("new_node", connstr = node_location)
+
+        # check if code is installed
+        self.install_code(db)
+
+        # query current status
+        res = self.exec_query(db, "select * from pgq_node.get_node_info(%s)", [self.queue_name])
+        info = res[0]
+        if info['node_type'] is not None:
+            self.log.info("Node is already initialized as %s" % info['node_type'])
+            return
+        
+        self.log.info("Initializing node")
+
+        worker_name = self.options.worker
+        if not worker_name:
+            raise Exception('--worker required')
+        combined_queue = self.options.merge
+        if combined_queue and node_type != 'leaf':
+            raise Exception('--merge can be used only for leafs')
+
+        # register member
+        if node_type == 'root':
+            global_watermark = None
+            combined_queue = None
+            provider_name = None
+            self.exec_cmd(db, "select * from pgq_node.register_location(%s, %s, %s, false)",
+                          [self.queue_name, node_name, node_location])
+            self.exec_cmd(db, "select * from pgq_node.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.queue_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_queue])
+            provider_db = None
+        else:
+            root_db = self.find_root_db(provider_loc)
+            queue_info = self.load_queue_info(root_db)
+
+            # check if member already exists
+            if queue_info.get_member(node_name) is not None:
+                self.log.error("Node '%s' already exists" % node_name)
+                sys.exit(1)
+
+            combined_set = None
+
+            provider_db = self.get_database('provider_db', connstr = provider_loc)
+            q = "select node_type, node_name from pgq_node.get_node_info(%s)"
+            res = self.exec_query(provider_db, q, [self.queue_name])
+            row = res[0]
+            if not row['node_name']:
+                raise Exception("provider node not found")
+            provider_name = row['node_name']
+
+            # register member on root
+            self.exec_cmd(root_db, "select * from pgq_node.register_location(%s, %s, %s, false)",
+                          [self.queue_name, node_name, node_location])
+
+            # lookup provider
+            provider = queue_info.get_member(provider_name)
+            if not provider:
+                self.log.error("Node %s does not exist" % provider_name)
+                sys.exit(1)
+
+            # register on provider
+            self.exec_cmd(provider_db, "select * from pgq_node.register_location(%s, %s, %s, false)",
+                          [self.queue_name, node_name, node_location])
+            rows = self.exec_cmd(provider_db, "select * from pgq_node.register_subscriber(%s, %s, %s, null)",
+                                 [self.queue_name, node_name, worker_name])
+            global_watermark = rows[0]['global_watermark']
+
+            # initialize node itself
+
+            # insert members
+            self.exec_cmd(db, "select * from pgq_node.register_location(%s, %s, %s, false)",
+                          [self.queue_name, node_name, node_location])
+            for m in queue_info.member_map.values():
+                self.exec_cmd(db, "select * from pgq_node.register_location(%s, %s, %s, %s)",
+                              [self.queue_name, m.name, m.location, m.dead])
+
+            # real init
+            self.exec_cmd(db, "select * from pgq_node.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [ self.queue_name, node_type, node_name, worker_name,
+                            provider_name, global_watermark, combined_queue ])
+
+
+        self.extra_init(node_type, db, provider_db)
+
+        self.log.info("Done")
+
+    def extra_init(self, node_type, node_db, provider_db):
+        """Callback to do specific init."""
+        pass
+
+    def find_root_db(self, initial_loc = None):
+        """Find root node, having start point."""
+        if initial_loc:
+            loc = initial_loc
+        else:
+            loc = self.cf.get(self.initial_db_name)
+
+        while self.looping:
+            db = self.get_database('root_db', connstr = loc)
+
+
+            # query current status
+            res = self.exec_query(db, "select * from pgq_node.get_node_info(%s)", [self.queue_name])
+            info = res[0]
+            node_type = info['node_type']
+            if node_type is None:
+                self.log.info("Root node not initialized?")
+                sys.exit(1)
+
+            self.log.debug("db='%s' -- type='%s' provider='%s'" % (loc, node_type, info['provider_location']))
+            # configured db may not be root anymore, walk upwards then
+            if node_type in ('root', 'combined-root'):
+                db.commit()
+                return db
+
+            self.close_database('root_db')
+            if loc == info['provider_location']:
+                raise Exception("find_root_db: got loop: %s" % loc)
+            loc = info['provider_location']
+            if loc is None:
+                self.log.error("Sub node provider not initialized?")
+                sys.exit(1)
+        raise Exception('process canceled')
+
+    def find_consumer_check(self, node, consumer):
+        cmap = self.get_node_consumer_map(node)
+        return (consumer in cmap)
+
+    def find_consumer(self, node = None, consumer = None):
+        if not node and not consumer:
+            node = self.options.node
+            consumer = self.options.consumer
+        if not node and not consumer:
+            raise Exception('Need either --node or --consumer')
+
+        # specific node given
+        if node:
+            if consumer:
+                if not self.find_consumer_check(node, consumer):
+                    raise Exception('Consumer not found')
+            else:
+                state = self.get_node_info(node)
+                consumer = state.worker_name
+            return (node, consumer)
+        
+        # global consumer search
+        if self.find_consumer_check(self.local_node, consumer):
+            return (self.local_node, consumer)
+
+        # fixme: dead node handling?
+        nodelist = self.queue_info.member_map.keys()
+        for node in nodelist:
+            if node == self.local_node:
+                continue
+            if self.find_consumer_check(node, consumer):
+                return (node, consumer)
+        
+        raise Exception('Consumer not found')
+
+    def install_code(self, db):
+        """Install cascading code to db."""
+        objs = [
+            skytools.DBLanguage("plpgsql"),
+            skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
+            skytools.DBSchema("pgq", sql_file="pgq.sql"),
+            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"), # not needed actually
+            skytools.DBSchema("pgq_node", sql_file="pgq_node.sql"),
+        ]
+        objs += self.extra_objs
+        skytools.db_install(db.cursor(), objs, self.log)
+        db.commit()
+
+    #
+    # Print status of whole set.
+    #
+
+    def cmd_status(self):
+        """Show set status."""
+        root_db = self.find_root_db()
+        sinf = self.load_queue_info(root_db)
+
+        for mname, minf in sinf.member_map.iteritems():
+            db = self.get_database('look_db', connstr = minf.location, autocommit = 1)
+            curs = db.cursor()
+            curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name])
+            node = NodeInfo(self.queue_name, curs.fetchone())
+            node.load_status(curs)
+            self.load_extra_status(curs, node)
+            sinf.add_node(node)
+            self.close_database('look_db')
+
+        sinf.print_tree()
+
+    def load_extra_status(self, curs, node):
+        """Fetch extra info."""
+        pass
+
+    #
+    # Normal commands.
+    #
+
+    def cmd_change_provider(self):
+        """Change node provider."""
+
+        self.load_local_info()
+        self.change_provider(
+                node = self.options.node,
+                consumer = self.options.consumer,
+                new_provider = self.options.provider)
+
+    def node_change_provider(self, node, new_provider):
+        self.change_provider(node, new_provider = new_provider)
+
+    def change_provider(self, node = None, consumer = None, new_provider = None):
+        old_provider = None
+        if not new_provider:
+            raise Exception('Please give --provider')
+
+        if not node or not consumer:
+            node, consumer = self.find_consumer(node = node, consumer = consumer)
+
+        cmap = self.get_node_consumer_map(node)
+        cinfo = cmap[consumer]
+        old_provider = cinfo['provider_node']
+
+        if old_provider == new_provider:
+            self.log.info("Consumer '%s' at node '%s' has already '%s' as provider" % (
+                            consumer, node, new_provider))
+            return
+
+        # pause target node
+        self.pause_consumer(node, consumer)
+
+        # reload node info
+        node_db = self.get_node_database(node)
+        qinfo = self.load_queue_info(node_db)
+        ninfo = qinfo.local_node
+
+        # reload consumer info
+        cmap = self.get_node_consumer_map(node)
+        cinfo = cmap[consumer]
+
+        # is it node worker or plain consumer?
+        is_worker = (ninfo.worker_name == consumer)
+
+        # fixme: expect the node to be described already
+        #q = "select * from pgq_node.add_member(%s, %s, %s, false)"
+        #self.node_cmd(new_provider, q, [self.queue_name, node_name, node_location])
+
+        # subscribe on new provider
+        if is_worker:
+            q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
+            self.node_cmd(new_provider, q, [self.queue_name, node, consumer, cinfo['last_tick_id']])
+        else:
+            q = 'select * from pgq.register_consumer_at(%s, %s, %s)'
+            self.node_cmd(new_provider, q, [self.queue_name, consumer, cinfo['last_tick_id']])
+
+        # change provider on target node
+        q = 'select * from pgq_node.change_consumer_provider(%s, %s, %s)'
+        self.node_cmd(node, q, [self.queue_name, consumer, new_provider])
+
+        # unsubscribe from old provider
+        if is_worker:
+            q = "select * from pgq_node.unregister_subscriber(%s, %s)"
+            self.node_cmd(old_provider, q, [self.queue_name, node])
+        else:
+            q = "select * from pgq.unregister_consumer(%s, %s)"
+            self.node_cmd(old_provider, q, [self.queue_name, consumer])
+
+        # done
+        self.resume_consumer(node, consumer)
+
+    def cmd_rename_node(self, old_name, new_name):
+        """Rename node."""
+
+        self.load_local_info()
+
+        root_db = self.find_root_db()
+
+        # pause target node
+        self.pause_node(old_name)
+        node = self.load_node_info(old_name)
+        provider_node = node.provider_node
+        subscriber_list = self.get_node_subscriber_list(old_name)
+
+
+        # create copy of member info / subscriber+queue info
+        step1 = 'select * from pgq_node.rename_node_step1(%s, %s, %s)'
+        # rename node itself, drop copies
+        step2 = 'select * from pgq_node.rename_node_step2(%s, %s, %s)'
+
+        # step1
+        self.exec_cmd(root_db, step1, [self.queue_name, old_name, new_name])
+        self.node_cmd(provider_node, step1, [self.queue_name, old_name, new_name])
+        self.node_cmd(old_name, step1, [self.queue_name, old_name, new_name])
+        for child in subscriber_list:
+            self.node_cmd(child, step1, [self.queue_name, old_name, new_name])
+
+        # step1
+        self.node_cmd(old_name, step2, [self.queue_name, old_name, new_name])
+        self.node_cmd(provider_node, step1, [self.queue_name, old_name, new_name])
+        for child in subscriber_list:
+            self.node_cmd(child, step2, [self.queue_name, old_name, new_name])
+        self.exec_cmd(root_db, step2, [self.queue_name, old_name, new_name])
+
+        # resume node
+        self.resume_node(old_name)
+
+    def node_depends(self, sub_node, top_node):
+        cur_node = sub_node
+        # walk upstream
+        while 1:
+            info = self.get_node_info(cur_node)
+            if cur_node == top_node:
+                # yes, top_node is sub_node's provider
+                return True
+            if info.type == 'root':
+                # found root, no dependancy
+                return False
+            # step upwards
+            cur_node = info.provider_node
+
+    def demote_node(self, oldnode, step, newnode):
+        """Downgrade old root?"""
+        q = "select * from pgq_node.demote_root(%s, %s, %s)"
+        res = self.node_cmd(oldnode, q, [self.queue_name, step, newnode])
+        return res[0]['last_tick']
+
+    def promote_branch(self, node):
+        """Promote old branch as root."""
+        q = "select * from pgq_node.promote_branch(%s)"
+        self.node_cmd(node, q, [self.queue_name])
+
+    def wait_for_catchup(self, new, last_tick):
+        """Wait until new_node catches up to old_node."""
+        # wait for it on subscriber
+        info = self.load_node_info(new)
+        if info.completed_tick >= last_tick:
+            self.log.info('tick already exists')
+            return info
+        if info.paused:
+            self.log.info('new node seems paused, resuming')
+            self.resume_node(new)
+        while 1:
+            self.log.debug('waiting for catchup: need=%d, cur=%d' % (last_tick, info.completed_tick))
+            time.sleep(1)
+            info = self.load_node_info(new)
+            if info.completed_tick >= last_tick:
+                return info
+
+
+    def switchover_root(self, old_info, new_info):
+        """Root switchover."""
+        old = old_info.name
+        new = new_info.name
+
+        self.pause_node(old)
+
+        self.demote_node(old, 1, new)
+
+        last_tick = self.demote_node(old, 2, new)
+
+        self.wait_for_catchup(new, last_tick)
+
+        self.pause_node(new)
+        self.promote_branch(new)
+
+        #self.subscribe_node(new, old, old_info.completed_tick)
+        q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)'
+        self.node_cmd(new, q, [self.queue_name, old, old_info.worker_name, last_tick])
+
+        #self.unsubscribe_node(new_node.parent_node, new_node.name)
+        q = "select * from pgq_node.unregister_subscriber(%s, %s)"
+        self.node_cmd(new_info.provider_node, q, [self.queue_name, new])
+
+        self.resume_node(new)
+
+        self.demote_node(old, 3, new)
+
+        self.resume_node(old)
+
+    def switchover_nonroot(self, old_node, new_node):
+        """Non-root switchover."""
+        if self.node_depends(new_node.name, old_node.name):
+            # yes, old_node is new_nodes provider,
+            # switch it around
+            self.node_change_provider(new_node.name, old_node.provider_node)
+
+        self.node_change_provider(old_node.name, new_node.name)
+
+    def cmd_switchover(self):
+        """Generic node switchover."""
+        self.load_local_info()
+        old_node_name = self.options.node
+        new_node_name = self.options.target
+        if not old_node_name:
+            worker = self.options.consumer
+            if not worker:
+                raise Exception('old node not given')
+            if self.queue_info.local_node.worker_name != worker:
+                raise Exception('old node not given')
+            old_node_name = self.local_node
+        if not new_node_name:
+            raise Exception('new node not given')
+        old_node = self.get_node_info(old_node_name)
+        new_node = self.get_node_info(new_node_name)
+
+        if old_node.name == new_node.name:
+            self.log.info("same node?")
+            return
+
+        if old_node.type == 'root':
+            self.switchover_root(old_node, new_node)
+        else:
+            self.switchover_nonroot(old_node, new_node)
+
+        # switch subscribers around
+        if self.options.all:
+            for n in self.get_node_subscriber_list(old_node.name):
+                self.node_change_provider(n, new_node.name)
+
+    def cmd_pause(self):
+        """Pause a node"""
+        self.load_local_info()
+        node, consumer = self.find_consumer()
+        self.pause_consumer(node, consumer)
+
+    def cmd_resume(self):
+        """Resume a node from pause."""
+        self.load_local_info()
+        node, consumer = self.find_consumer()
+        self.resume_consumer(node, consumer)
+
+    def cmd_members(self):
+        """Show member list."""
+        db = self.get_database(self.initial_db_name)
+        desc = 'Member info on %s:' % self.local_node
+        q = "select node_name, dead, node_location"\
+            " from pgq_node.get_queue_locations(%s) order by 1"
+        self.display_table(db, desc, q, [self.queue_name])
+
+    #
+    # Shortcuts for operating on nodes.
+    #
+
+    def load_local_info(self):
+        """fetch set info from local node."""
+        db = self.get_database(self.initial_db_name)
+        self.queue_info = self.load_queue_info(db)
+        self.local_node = self.queue_info.local_node.name
+
+    def get_node_database(self, node_name):
+        """Connect to node."""
+        if node_name == self.queue_info.local_node.name:
+            db = self.get_database(self.initial_db_name)
+        else:
+            m = self.queue_info.get_member(node_name)
+            if not m:
+                self.log.error("get_node_database: cannot resolve %s" % node_name)
+                sys.exit(1)
+            loc = m.location
+            db = self.get_database('node.' + node_name, connstr = loc)
+        return db
+
+    def close_node_database(self, node_name):
+        """Disconnect node's connection."""
+        if node_name == self.queue_info.local_node.name:
+            self.close_database(self.initial_db_name)
+        else:
+            self.close_database("node." + node_name)
+
+    def node_cmd(self, node_name, sql, args, quiet = False):
+        """Execute SQL command on particular node."""
+        db = self.get_node_database(node_name)
+        return self.exec_cmd(db, sql, args, quiet = quiet)
+
+    #
+    # Various operation on nodes.
+    #
+
+    def set_paused(self, node, consumer, pause_flag):
+        """Set node pause flag and wait for confirmation."""
+
+        q = "select * from pgq_node.set_consumer_paused(%s, %s, %s)"
+        self.node_cmd(node, q, [self.queue_name, consumer, pause_flag])
+
+        self.log.info('Waiting for worker to accept')
+        while self.looping:
+            q = "select * from pgq_node.get_consumer_state(%s, %s)"
+            stat = self.node_cmd(node, q, [self.queue_name, consumer], quiet = 1)[0]
+            if stat['paused'] != pause_flag:
+                raise Exception('operation canceled? %s <> %s' % (repr(stat['paused']), repr(pause_flag)))
+
+            if stat['uptodate']:
+                op = pause_flag and "paused" or "resumed"
+                self.log.info("Consumer '%s' on node '%s' %s" % (consumer, node, op))
+                return
+            time.sleep(1)
+        raise Exception('process canceled')
+
+
+    def pause_consumer(self, node, consumer):
+        """Shortcut for pausing by name."""
+        self.set_paused(node, consumer, True)
+
+    def resume_consumer(self, node, consumer):
+        """Shortcut for resuming by name."""
+        self.set_paused(node, consumer, False)
+
+    def pause_node(self, node):
+        """Shortcut for pausing by name."""
+        state = self.get_node_info(node)
+        self.pause_consumer(node, state.worker_name)
+
+    def resume_node(self, node):
+        """Shortcut for resuming by name."""
+        state = self.get_node_info(node)
+        self.resume_consumer(node, state.worker_name)
+
+    def subscribe_node(self, target_node, subscriber_node, tick_pos):
+        """Subscribing one node to another."""
+        q = "select * from pgq_node.subscribe_node(%s, %s, %s)"
+        self.node_cmd(target_node, q, [self.queue_name, subscriber_node, tick_pos])
+
+    def unsubscribe_node(self, target_node, subscriber_node):
+        """Unsubscribing one node from another."""
+        q = "select * from pgq_node.unsubscribe_node(%s, %s)"
+        self.node_cmd(target_node, q, [self.queue_name, subscriber_node])
+
+    _node_cache = {}
+    def get_node_info(self, node_name):
+        """Cached node info lookup."""
+        if node_name in self._node_cache:
+            return self._node_cache[node_name]
+        inf = self.load_node_info(node_name)
+        self._node_cache[node_name] = inf
+        return inf
+
+    def load_node_info(self, node_name):
+        """Non-cached node info lookup."""
+        db = self.get_node_database(node_name)
+        q = "select * from pgq_node.get_node_info(%s)"
+        rows = self.exec_query(db, q, [self.queue_name])
+        return NodeInfo(self.queue_name, rows[0])
+
+    def load_queue_info(self, db):
+        """Non-cached set info lookup."""
+        res = self.exec_query(db, "select * from pgq_node.get_node_info(%s)", [self.queue_name])
+        info = res[0]
+
+        q = "select * from pgq_node.get_queue_locations(%s)"
+        member_list = self.exec_query(db, q, [self.queue_name])
+
+        return QueueInfo(self.queue_name, info, member_list)
+
+    def get_node_subscriber_list(self, node_name):
+        """Fetch subscriber list from a node."""
+        q = "select node_name, local_watermark from pgq_node.get_subscriber_info(%s)"
+        db = self.get_node_database(node_name)
+        rows = self.exec_query(db, q, [self.queue_name])
+        return [r['node_name'] for r in rows]
+
+    def get_node_consumer_map(self, node_name):
+        """Fetch consumer list from a node."""
+        q = "select consumer_name, provider_node, last_tick_id from pgq_node.get_consumer_info(%s)"
+        db = self.get_node_database(node_name)
+        rows = self.exec_query(db, q, [self.queue_name])
+        res = {}
+        for r in rows:
+            res[r['consumer_name']] = r
+        return res
+
+if __name__ == '__main__':
+    script = CascadeAdmin('setadm', 'node_db', sys.argv[1:], worker_setup = False)
+    script.start()
+
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
new file mode 100644 (file)
index 0000000..97b90c2
--- /dev/null
@@ -0,0 +1,216 @@
+"""Cascaded consumer.
+
+
+Does not maintain node, but is able to pause, resume and switch provider.
+"""
+
+import sys, time, skytools
+
+from pgq.consumer import Consumer
+
+PDB = '_provider_db'
+
+__all__ = ['CascadedConsumer']
+
+class CascadedConsumer(Consumer):
+    """CascadedConsumer base class.
+
+    Loads provider from target node, accepts pause/resume commands.
+    """
+
+    _batch_info = None
+    _consumer_state = None
+
+    def __init__(self, service_name, db_name, args):
+        """Initialize new consumer.
+        
+        @param service_name: service_name for DBScript
+        @param db_name: target database name for get_database()
+        @param args: cmdline args for DBScript
+        """
+
+        Consumer.__init__(self, service_name, PDB, args)
+
+        self.log.debug("__init__")
+
+        self.target_db = db_name
+        self.provider_connstr = None
+
+    def register_consumer(self, provider_loc = None):
+        """Register consumer on source node first, then target node."""
+
+        if not provider_loc:
+            provider_loc = self.options.provider
+        if not provider_loc:
+            self.log.error('Please give provider location with --provider=')
+            sys.exit(1)
+
+        dst_db = self.get_database(self.target_db)
+        dst_curs = dst_db.cursor()
+        src_db = self.get_database(PDB, connstr = provider_loc)
+        src_curs = src_db.cursor()
+
+        # check target info
+        q = "select * from pgq_node.get_node_info(%s)"
+        res = self.exec_cmd(src_db, q, [ self.queue_name ])
+        pnode = res[0]['node_name']
+        if not pnode:
+            raise Exception('parent node not initialized?')
+
+        # source queue
+        Consumer.register_consumer(self)
+
+        # fetch pos
+        q = "select last_tick from pgq.get_consumer_info(%s, %s)"
+        src_curs.execute(q, [self.queue_name, self.consumer_name])
+        last_tick = src_curs.fetchone()['last_tick']
+        if not last_tick:
+            raise Exception('registration failed?')
+        src_db.commit()
+
+        # target node
+        q = "select * from pgq_node.register_consumer(%s, %s, %s, %s)"
+        self.exec_cmd(dst_db, q, [self.queue_name, self.consumer_name, pnode, last_tick])
+
+    def unregister_consumer(self):
+        dst_db = self.get_database(self.target_db)
+        dst_curs = dst_db.cursor()
+
+        # fetch provider loc
+        q = "select * from pgq_node.get_consumer_state(%s, %s)"
+        rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])
+        state = rows[0]
+        provider_loc = state['provider_location']
+
+        # unregister on provider
+        src_db = self.get_database(PDB, connstr = provider_loc)
+        src_curs = src_db.cursor()
+        Consumer.unregister_consumer(self)
+
+        # unregister on subscriber
+        q = "select * from pgq_node.unregister_consumer(%s, %s)"
+        self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])
+
+    def init_optparse(self, parser = None):
+        p = Consumer.init_optparse(self, parser)
+        p.add_option("--provider", help = "provider location for --register")
+        return p
+
+    def process_batch(self, src_db, batch_id, event_list):
+        self._batch_info = self.get_batch_info(batch_id)
+
+        state = self._consumer_state
+
+        if self.is_batch_done(state, self._batch_info):
+            for ev in event_list:
+                ev.tag_done()
+            return
+
+        dst_db = self.get_database(self.target_db)
+        tick_id = self._batch_info['tick_id']
+        self.process_remote_batch(src_db, tick_id, event_list, dst_db)
+
+        # this also commits
+        self.finish_remote_batch(src_db, dst_db, tick_id)
+
+    def process_root_node(self, dst_db):
+        """This is called on root node, where no processing should happen.
+        """
+        # extra sleep
+        time.sleep(10*self.loop_delay)
+
+    def work(self):
+        """Refresh state before calling Consumer.work()."""
+
+        dst_db = self.get_database(self.target_db)
+        self._consumer_state = self.refresh_state(dst_db)
+
+        if self._consumer_state['node_type'] == 'root':
+            self.log.info("target is root")
+            self.process_root_node(dst_db)
+            return
+
+        if not self.provider_connstr:
+            raise Exception('provider_connstr not set')
+        src_db = self.get_database('_provider_db', connstr = self.provider_connstr)
+
+        return Consumer.work(self)
+
+    def refresh_state(self, dst_db, full_logic = True):
+        """Fetch consumer state from target node.
+
+        This also sleeps if pause is set and updates
+        "uptodate" flag to notify that data is refreshed.
+        """
+
+        while 1:
+            q = "select * from pgq_node.get_consumer_state(%s, %s)"
+            rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])
+            state = rows[0]
+
+            # tag refreshed
+            if not state['uptodate'] and full_logic:
+                q = "select * from pgq_node.set_consumer_uptodate(%s, %s, true)"
+                self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])
+
+            if not state['paused'] or not full_logic:
+                break
+            time.sleep(self.loop_delay)
+
+        # update connection
+        loc = state['provider_location']
+        if self.provider_connstr != loc:
+            self.close_database('_provider_db')
+            self.provider_connstr = loc
+
+        return state
+
+    def is_batch_done(self, state, batch_info):
+        cur_tick = batch_info['tick_id']
+        prev_tick = batch_info['prev_tick_id']
+        dst_tick = state['completed_tick']
+
+        if not dst_tick:
+            raise Exception('dst_tick NULL?')
+
+        if prev_tick == dst_tick:
+            # on track
+            return False
+
+        if cur_tick == dst_tick:
+            # current batch is already applied, skip it
+            return True
+
+        # anything else means problems
+        raise Exception('Lost position: batch %d..%d, dst has %d' % (
+                        prev_tick, cur_tick, dst_tick))
+
+    def process_remote_batch(self, src_db, tick_id, event_list, dst_db):
+        """Per-batch callback.
+        
+        By default just calls process_remote_event() in loop."""
+        src_curs = src_db.cursor()
+        dst_curs = dst_db.cursor()
+        for ev in event_list:
+            self.process_remote_event(src_curs, dst_curs, ev)
+
+    def process_remote_event(self, src_curs, dst_curs, ev):
+        """Per-event callback.
+        
+        By default ignores cascading events and gives error on others.
+        Can be called from user handler to finish unprocessed events.
+        """
+        if ev.ev_type[:4] == "pgq.":
+            # ignore cascading events
+            ev.tag_done()
+        else:
+            raise Exception('Unhandled event type in queue: %s' % ev.ev_type)
+
+    def finish_remote_batch(self, src_db, dst_db, tick_id):
+        """Called after event processing.  This should finish
+        work on remote db and commit there.
+        """
+        # this also commits
+        q = "select * from pgq_node.set_consumer_completed(%s, %s, %s)"
+        self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name, tick_id ])
+
similarity index 54%
rename from python/pgq/setinfo.py
rename to python/pgq/cascade/nodeinfo.py
index 25a7f3c3d02b7ef14ba80fd475cb45861c54f3f2..6d4017926fbd43627cd9e41cbcb46915d03a6dee 100644 (file)
@@ -1,86 +1,58 @@
 #! /usr/bin/env python
 
-__all__ = ['MemberInfo', 'NodeInfo', 'SetInfo',
-        'ROOT', 'BRANCH', 'LEAF', 'COMBINED_ROOT',
-        'COMBINED_BRANCH', 'MERGE_LEAF']
+"""Info about node/set/members.  For admin tool.
+"""
+
+__all__ = ['MemberInfo', 'NodeInfo', 'QueueInfo']
 
 # node types
 ROOT = 'root'
 BRANCH = 'branch'
 LEAF = 'leaf'
-COMBINED_ROOT = 'combined-root'
-COMBINED_BRANCH = 'combined-branch'
-MERGE_LEAF = 'merge-leaf'
-
-# which nodes need to do what actions
-action_map = {
-'process-batch':   {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-'process-events':  {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'update-event-seq':{'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'copy-events':     {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'tick-event':      {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'wait-behind':     {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1},
-'sync-part-pos':   {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-}
 
 class MemberInfo:
+    """Info about set member."""
     def __init__(self, row):
         self.name = row['node_name']
         self.location = row['node_location']
         self.dead = row['dead']
 
 class NodeInfo:
-    def __init__(self, set_name, row, main_worker = True):
-        self.set_name = set_name
+    """Detailed info about set node."""
+    def __init__(self, queue_name, row, main_worker = True):
+        self.queue_name = queue_name
         self.member_map = {}
         self.main_worker = main_worker
 
         self.name = row['node_name']
         self.type = row['node_type']
-        self.queue_name = row['queue_name']
         self.global_watermark = row['global_watermark']
         self.local_watermark = row['local_watermark']
-        self.completed_tick = row['completed_tick']
+        self.completed_tick = row['worker_last_tick']
         self.provider_node = row['provider_node']
         self.provider_location = row['provider_location']
-        self.paused = row['paused']
-        self.resync = row['resync']
-        self.uptodate = row['uptodate']
-        self.combined_set = row['combined_set']
-        self.combined_type = row['combined_type']
+        self.consumer_name = row['worker_name']
+        self.worker_name = row['worker_name']
+        self.paused = row['worker_paused']
+        self.uptodate = row['worker_uptodate']
         self.combined_queue = row['combined_queue']
+        self.combined_type = row['combined_type']
+
+        self.parent = None
+        self.consumer_map = {}
+        self.queue_info = {}
 
         self._row = row
 
         self._info_lines = []
 
-    def need_action(self, action_name):
-        """Returns True if worker for this node needs
-        to do specified action.
-        """
-        if not self.main_worker:
-            return action_name in ('process-batch', 'process-events')
-
-        typ = self.type
-        if type == MERGE_LEAF:
-            if self.target_type == COMBINED_BRANCH:
-                typ = "merge-leaf-to-branch"
-            elif self.target_type == COMBINED_ROOT:
-                typ = "merge-leaf-to-root"
-            else:
-                raise Exception('bad target type')
-
-        try:
-            return action_map[action_name][typ]
-        except KeyError, d:
-            raise Exception('need_action(name=%s, type=%s) unknown' % (action_name, typ))
-
-    def get_target_queue(self):
+    def __get_target_queue(self):
         qname = None
-        if self.type == 'merge-leaf':
-            qname = self.combined_queue
+        if self.type == LEAF:
+            if self.combined_queue:
+                qname = self.combined_queue
+            else:
+                return None
         else:
             qname = self.queue_name
         if qname is None:
@@ -89,11 +61,12 @@ class NodeInfo:
 
     def get_infolines(self):
         lst = self._info_lines
+
         if self.parent:
             root = self.parent
             while root.parent:
                 root = root.parent
-            tick_time = self.parent.consumer_map[self.name]['tick_time']
+            tick_time = self.parent.consumer_map[self.consumer_name]['tick_time']
             root_time = root.queue_info['now']
             lag = root_time - tick_time
         else:
@@ -116,20 +89,25 @@ class NodeInfo:
             q = "select consumer_name, current_timestamp - lag as tick_time,"\
                 "  lag, last_seen, last_tick "\
                 "from pgq.get_consumer_info(%s)"
-            curs.execute(q, [self.set_name])
+            curs.execute(q, [self.queue_name])
             for row in curs.fetchall():
                 cname = row['consumer_name']
                 self.consumer_map[cname] = row
             q = "select current_timestamp - ticker_lag as tick_time,"\
                 "  ticker_lag, current_timestamp as now "\
                 "from pgq.get_queue_info(%s)"
-            curs.execute(q, [self.set_name])
+            curs.execute(q, [self.queue_name])
             self.queue_info = curs.fetchone()
 
-class SetInfo:
-    def __init__(self, set_name, info_row, member_rows):
-        self.local_node = NodeInfo(set_name, info_row)
-        self.set_name = set_name
+class QueueInfo:
+    """Info about cascaded queue.
+    
+    Slightly broken, as all info is per-node.
+    """
+
+    def __init__(self, queue_name, info_row, member_rows):
+        self.local_node = NodeInfo(queue_name, info_row)
+        self.queue_name = queue_name
         self.member_map = {}
         self.node_map = {}
         self.add_node(self.local_node)
@@ -147,6 +125,10 @@ class SetInfo:
     def add_node(self, node):
         self.node_map[node.name] = node
 
+    #
+    # Rest is about printing the tree
+    #
+
     _DATAFMT = "%-30s%s"
     def print_tree(self):
         """Print ascii-tree for set.
@@ -156,15 +138,15 @@ class SetInfo:
         self._tree_calc(root)
         datalines = self._print_node(root, '', [])
         for ln in datalines:
-            print self._DATAFMT % (' ', ln)
+            print(self._DATAFMT % (' ', ln))
 
     def _print_node(self, node, pfx, datalines):
         # print a tree fragment for node and info
         # returns list of unprinted data rows
         for ln in datalines:
-            print self._DATAFMT % (_setpfx(pfx, '|'), ln)
+            print(self._DATAFMT % (_setpfx(pfx, '|'), ln))
         datalines = node.get_infolines()
-        print "%s%s" % (_setpfx(pfx, '+--'), node.name)
+        print("%s%s" % (_setpfx(pfx, '+--'), node.name))
 
         for i, n in enumerate(node.child_list):
             sfx = ((i < len(node.child_list) - 1) and '  |' or '   ')
@@ -176,14 +158,14 @@ class SetInfo:
         # reset vars, fill parent and child_list for each node
         # returns root
         root = None
-        for node in self.node_map.itervalues():
+        for node in self.node_map.values():
             node.total_childs = 0
             node.levels = 0
             node.child_list = []
-            if node.type in (ROOT, COMBINED_ROOT):
+            if node.type == ROOT:
                 root = node
-        for node in self.node_map.itervalues():
-            if node.provider_node:
+        for node in self.node_map.values():
+            if node.provider_node and node.provider_node != node.name:
                 p = self.node_map[node.provider_node]
                 p.child_list.append(node)
                 node.parent = p
@@ -191,7 +173,7 @@ class SetInfo:
                 node.parent = None
 
         if root is None:
-            raise Exception("root nod enot found")
+            raise Exception("root nodnot found")
         return root
 
     def _tree_calc(self, node):
@@ -206,18 +188,13 @@ class SetInfo:
                 levels = subnode.levels + 1
         node.total_childs = total
         node.levels = levels
-        node.child_list.sort(_cmp_node)
+        node.child_list.sort(key = _node_key)
 
 def _setpfx(pfx, sfx):
     if pfx:
         pfx = pfx[:-1] + sfx
     return pfx
 
-
-def _cmp_node(n1, n2):
-    # returns neg if n1 smaller
-    cmp = n1.levels - n2.levels
-    if cmp == 0:
-        cmp = n1.total_childs - n2.total_childs
-    return cmp
+def _node_key(n):
+    return (n.levels, n.total_childs)
 
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
new file mode 100644 (file)
index 0000000..caaa774
--- /dev/null
@@ -0,0 +1,260 @@
+"""Cascaded worker.
+
+CascadedConsumer that also maintains node.
+
+"""
+
+import sys, time, skytools
+
+from pgq.cascade.consumer import CascadedConsumer
+from pgq.producer import bulk_insert_events
+
+__all__ = ['CascadedWorker']
+
+class WorkerState:
+    """Depending on node state decides on actions worker needs to do."""
+    # node_type,
+    # node_name, provider_node,
+    # global_watermark, local_watermark
+    # combined_queue, combined_type
+    process_batch = 0       # handled in CascadedConsumer
+    copy_events = 0         # ok
+    global_wm_event = 0     # ok
+    local_wm_publish = 0    # ok
+
+    process_events = 0      # ok
+    send_tick_event = 0     # ok
+    wait_behind = 0         # ok
+    process_tick_event = 0  # ok
+    target_queue = ''       # ok
+    keep_event_ids = 0      # ok
+    create_tick = 0         # ok
+    def __init__(self, queue_name, nst):
+        ntype = nst['node_type']
+        ctype = nst['combined_type']
+        if ntype == 'root':
+            self.global_wm_event = 1
+        elif ntype == 'branch':
+            self.target_queue = queue_name
+            self.process_batch = 1
+            self.process_events = 1
+            self.copy_events = 1
+            self.process_tick_event = 1
+            self.local_wm_publish = 1
+            self.keep_event_ids = 1
+            self.create_tick = 1
+        elif ntype == 'leaf' and not ctype:
+            self.process_batch = 1
+            self.process_events = 1
+        elif ntype == 'leaf' and ctype:
+            self.target_queue = nst['combined_queue']
+            if ctype == 'root':
+                self.process_batch = 1
+                self.process_events = 1
+                self.copy_events = 1
+                self.send_tick_event = 1
+            elif ctype == 'branch':
+                self.process_batch = 1
+                self.wait_behind = 1
+            else:
+                raise Exception('invalid state 1')
+        else:
+            raise Exception('invalid state 2')
+        if ctype and ntype != 'leaf':
+            raise Exception('invalid state 3')
+
+class CascadedWorker(CascadedConsumer):
+    """CascadedWorker base class.
+    """
+
+    global_wm_publish_time = 0
+    global_wm_publish_period = 5 * 60
+
+    local_wm_publish_time = 0
+    local_wm_publish_period = 5 * 60
+
+    max_evbuf = 500
+    cur_event_seq = 0
+    cur_max_id = 0
+    seq_buffer = 10000
+
+    main_worker = True
+
+    _worker_state = None
+    ev_buf = None
+
+    def __init__(self, service_name, db_name, args):
+        """Initialize new consumer.
+        
+        @param service_name: service_name for DBScript
+        @param db_name: target database name for get_database()
+        @param args: cmdline args for DBScript
+        """
+
+        CascadedConsumer.__init__(self, service_name, db_name, args)
+
+    def process_remote_batch(self, src_db, tick_id, event_list, dst_db):
+        """Worker-specific event processing."""
+        self.ev_buf = []
+        max_id = 0
+        st = self._worker_state
+
+        if st.wait_behind:
+            self.wait_for_tick(dst_db, tick_id)
+
+        src_curs = src_db.cursor()
+        dst_curs = dst_db.cursor()
+        for ev in event_list:
+            if st.copy_events:
+                self.copy_event(dst_curs, ev)
+            if ev.ev_type[:4] == "pgq.":
+                # process cascade events even on waiting leaf node
+                self.process_remote_event(src_curs, dst_curs, ev)
+            else:
+                if st.process_events:
+                    self.process_remote_event(src_curs, dst_curs, ev)
+                else:
+                    ev.tag_done()
+            if ev.ev_id > max_id:
+                max_id = ev.ev_id
+        if st.local_wm_publish:
+            self.publish_local_wm(src_db)
+        if max_id > self.cur_max_id:
+            self.cur_max_id = max_id
+
+    def wait_for_tick(self, dst_db, tick_id):
+        """On combined-branch leaf needs to wait from tick
+        to appear from combined-root.
+        """
+        while 1:
+            cst = self._consumer_state
+            if cst['completed_tick'] >= tick_id:
+                return
+            time.sleep(10 * self.loop_delay)
+            self._consumer_state = self.refresh_state(dst_db)
+
+    def publish_local_wm(self, src_db):
+        """Send local watermark to provider.
+        """
+        if not self.main_worker:
+            return
+        t = time.time()
+        if t - self.local_wm_publish_time >= self.local_wm_publish_period:
+            return
+
+        st = self._worker_state
+        src_curs = src_db.cursor()
+        q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
+        src_curs.execute(q, [self.pgq_queue_name, st.node_name, st.local_watermark])
+        self.local_wm_publish_time = t
+
+    def process_remote_event(self, src_curs, dst_curs, ev):
+        """Handle cascading events.
+        """
+        # non cascade events send to CascadedConsumer to error out
+        if ev.ev_type[:4] != 'pgq.':
+            CascadedConsumer.process_remote_event(self, src_curs, dst_curs, ev)
+            return
+
+        # ignore cascade events if not main worker
+        if not self.main_worker:
+            ev.tag_done()
+            return
+
+        # check if for right queue
+        t = ev.ev_type
+        if ev.ev_extra1 != self.pgq_queue_name and t != "pgq.tick-id":
+            raise Exception("bad event in queue: "+str(ev))
+
+        self.log.info("got cascade event: %s" % t)
+        if t == "pgq.location-info":
+            node = ev.ev_data
+            loc = ev.ev_extra2
+            dead = ev.ev_extra3
+            q = "select * from pgq_node.register_location(%s, %s, %s, %s)"
+            dst_curs.execute(q, [self.pgq_queue_name, node, loc, dead])
+        elif t == "pgq.global-watermark":
+            tick_id = int(ev.ev_data)
+            q = "select * from pgq_node.set_global_watermark(%s, %s)"
+            dst_curs.execute(q, [self.pgq_queue_name, tick_id])
+        elif t == "pgq.tick-id":
+            tick_id = int(ev.ev_data)
+            if ev.ev_extra1 == self.pgq_queue_name:
+                raise Exception('tick-id event for own queue?')
+            st = self._worker_state
+            if st.process_tick_event:
+                q = "select * from pgq_node.set_partition_watermark(%s, %s, %s)"
+                dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id])
+        else:
+            raise Exception("unknown cascade event: %s" % t)
+        ev.tag_done()
+
+    def finish_remote_batch(self, src_db, dst_db, tick_id):
+        """Worker-specific cleanup on target node.
+        """
+
+        if self.main_worker:
+            st = self._worker_state
+            dst_curs = dst_db.cursor()
+
+            self.flush_events(dst_curs)
+
+            # send tick event into queue
+            if st.send_tick_event:
+                q = "select pgq.insert_event(%s, 'pgq.tick-id', %s, %s, null, null, null)"
+                dst_curs.execute(q, [st.target_queue, str(tick_id), self.pgq_queue_name])
+            if st.create_tick:
+                # create actual tick
+                tick_id = self._batch_info['tick_id']
+                tick_time = self._batch_info['batch_end']
+                q = "select pgq.ticker(%s, %s, %s, %s)"
+                dst_curs.execute(q, [self.pgq_queue_name, tick_id, tick_time, self.cur_max_id])
+
+        CascadedConsumer.finish_remote_batch(self, src_db, dst_db, tick_id)
+
+    def copy_event(self, dst_curs, ev):
+        """Add event to copy buffer.
+        """
+        if not self.main_worker:
+            return
+        if ev.type[:4] == "pgq.":
+            return
+        if len(self.ev_buf) >= self.max_evbuf:
+            self.flush_events(dst_curs)
+        self.ev_buf.append(ev)
+
+    def flush_events(self, dst_curs):
+        """Send copy buffer to target queue.
+        """
+        if len(self.ev_buf) == 0:
+            return
+        flds = ['ev_time', 'ev_type', 'ev_data', 'ev_extra1',
+                'ev_extra2', 'ev_extra3', 'ev_extra4']
+        st = self._worker_state
+        if st.keep_event_ids:
+            flds.append('ev_id')
+        bulk_insert_events(dst_curs, self.ev_buf, flds, st.target_queue)
+        self.ev_buf = []
+
+    def refresh_state(self, dst_db, full_logic = True):
+        """Load also node state from target node.
+        """
+        res = CascadedConsumer.refresh_state(self, dst_db, full_logic)
+        q = "select * from pgq_node.get_node_info(%s)"
+        st = self.exec_cmd(dst_db, q, [ self.pgq_queue_name ])
+        self._worker_state = WorkerState(self.pgq_queue_name, st[0])
+        return res
+
+    def process_root_node(self, dst_db):
+        """On root node send global watermark downstream.
+        """
+        t = time.time()
+        if t - self.global_wm_publish_time < self.global_wm_publish_period:
+            return
+
+        dst_curs = dst_db.cursor()
+        q = "select * from pgq_node.set_global_watermark(%s, NULL)"
+        dst_curs.execute(q, [self.pgq_queue_name])
+        dst_db.commit()
+        self.global_wm_publish_time = t
+
index 3b8230317be009f3b93ec3ded428aad9c54d90a9..8c001cbeb1298bc78043ebe79e2e6cc22e5693a8 100644 (file)
@@ -1,9 +1,9 @@
 
 """PgQ consumer framework for Python.
 
-API problems(?):
-    - process_event() and process_batch() should have db as argument.
-    - should ev.tag*() update db immidiately?
+todo:
+    - pgq.next_batch_details()
+    - tag_done() by default
 
 """
 
@@ -11,7 +11,7 @@ import sys, time, skytools
 
 from pgq.event import *
 
-__all__ = ['Consumer', 'RemoteConsumer', 'SerialConsumer']
+__all__ = ['Consumer']
 
 class _WalkerEvent(Event):
     """Redirects status flags to BatchWalker.
@@ -113,21 +113,41 @@ class Consumer(skytools.DBScript):
         skytools.DBScript.__init__(self, service_name, args)
 
         self.db_name = db_name
-        self.reg_list = []
-        self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name)
-        self.pgq_queue_name = self.cf.get("pgq_queue_name")
+
+        # compat params
+        self.consumer_name = self.cf.get("pgq_consumer_id", '')
+        self.queue_name = self.cf.get("pgq_queue_name", '')
+
+        # proper params
+        if not self.consumer_name:
+            self.consumer_name = self.cf.get("consumer_name", self.job_name)
+        if not self.queue_name:
+            self.queue_name = self.cf.get("queue_name")
+
         self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", 0)
+        self.stat_batch_start = 0
 
-    def attach(self):
-        """Attach consumer to interesting queues."""
-        res = self.register_consumer(self.pgq_queue_name)
-        return res
+        # compat vars
+        self.pgq_queue_name = self.queue_name
+        self.consumer_id = self.consumer_name
+
+    def startup(self):
+        """Handle commands here.  __init__ does not have error logging."""
+        if self.options.register:
+            self.register_consumer()
+            sys.exit(0)
+        if self.options.unregister:
+            self.unregister_consumer()
+            sys.exit(0)
+        return skytools.DBScript.startup(self)
 
-    def detach(self):
-        """Detach consumer from all queues."""
-        tmp = self.reg_list[:]
-        for q in tmp:
-            self.unregister_consumer(q)
+    def init_optparse(self, parser = None):
+        p = skytools.DBScript.init_optparse(self, parser)
+        p.add_option('--register', action='store_true',
+                     help = 'register consumer on queue')
+        p.add_option('--unregister', action='store_true',
+                     help = 'unregister consumer from queue')
+        return p
 
     def process_event(self, db, event):
         """Process one event.
@@ -152,66 +172,59 @@ class Consumer(skytools.DBScript):
             self.process_event(db, ev)
 
     def work(self):
-        """Do the work loop, once (internal)."""
-
-        if len(self.reg_list) == 0:
-            self.log.debug("Attaching")
-            self.attach()
+        """Do the work loop, once (internal).
+        Returns: true if wants to be called again,
+        false if script can sleep.
+        """
 
         db = self.get_database(self.db_name)
         curs = db.cursor()
 
-        data_avail = 0
-        for queue in self.reg_list:
-            self.stat_start()
-
-            # acquire batch
-            batch_id = self._load_next_batch(curs, queue)
-            db.commit()
-            if batch_id == None:
-                continue
-            data_avail = 1
-
-            # load events
-            list = self._load_batch_events(curs, batch_id, queue)
-            db.commit()
-            
-            # process events
-            self._launch_process_batch(db, batch_id, list)
-
-            # done
-            self._finish_batch(curs, batch_id, list)
-            db.commit()
-            self.stat_end(len(list))
-
-        # if false, script sleeps
-        return data_avail
-
-    def register_consumer(self, queue_name):
+        self.stat_start()
+
+        # acquire batch
+        batch_id = self._load_next_batch(curs)
+        db.commit()
+        if batch_id == None:
+            return 0
+
+        # load events
+        ev_list = self._load_batch_events(curs, batch_id)
+        db.commit()
+        
+        # process events
+        self._launch_process_batch(db, batch_id, ev_list)
+
+        # done
+        self._finish_batch(curs, batch_id, ev_list)
+        db.commit()
+        self.stat_end(len(ev_list))
+
+        return 1
+
+    def register_consumer(self):
+        self.log.info("Registering consumer on source queue")
         db = self.get_database(self.db_name)
         cx = db.cursor()
         cx.execute("select pgq.register_consumer(%s, %s)",
-                [queue_name, self.consumer_id])
+                [self.queue_name, self.consumer_name])
         res = cx.fetchone()[0]
         db.commit()
 
-        self.reg_list.append(queue_name)
-
         return res
 
-    def unregister_consumer(self, queue_name):
+    def unregister_consumer(self):
+        self.log.info("Unregistering consumer from source queue")
         db = self.get_database(self.db_name)
         cx = db.cursor()
         cx.execute("select pgq.unregister_consumer(%s, %s)",
-                    [queue_name, self.consumer_id])
+                    [self.queue_name, self.consumer_name])
         db.commit()
 
-        self.reg_list.remove(queue_name)
-
     def _launch_process_batch(self, db, batch_id, list):
         self.process_batch(db, batch_id, list)
 
-    def _load_batch_events_old(self, curs, batch_id, queue_name):
+    def _load_batch_events_old(self, curs, batch_id):
         """Fetch all events for this batch."""
 
         # load events
@@ -220,26 +233,26 @@ class Consumer(skytools.DBScript):
         rows = curs.dictfetchall()
 
         # map them to python objects
-        list = []
+        ev_list = []
         for r in rows:
-            ev = Event(queue_name, r)
-            list.append(ev)
+            ev = Event(self.queue_name, r)
+            ev_list.append(ev)
 
-        return list
+        return ev_list
 
-    def _load_batch_events(self, curs, batch_id, queue_name):
+    def _load_batch_events(self, curs, batch_id):
         """Fetch all events for this batch."""
 
         if self.pgq_lazy_fetch:
-            return _BatchWalker(curs, batch_id, queue_name, self.pgq_lazy_fetch)
+            return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch)
         else:
-            return self._load_batch_events_old(curs, batch_id, queue_name)
+            return self._load_batch_events_old(curs, batch_id)
 
-    def _load_next_batch(self, curs, queue_name):
+    def _load_next_batch(self, curs):
         """Allocate next batch. (internal)"""
 
         q = "select pgq.next_batch(%s, %s)"
-        curs.execute(q, [queue_name, self.consumer_id])
+        curs.execute(q, [self.queue_name, self.consumer_name])
         return curs.fetchone()[0]
 
     def _finish_batch(self, curs, batch_id, list):
@@ -255,23 +268,24 @@ class Consumer(skytools.DBScript):
                     self._tag_failed(curs, batch_id, ev_id, stat[1])
                     failed += 1
                 elif stat[0] != EV_DONE:
-                    raise Exception("Untagged event: %d" % ev_id)
+                    raise Exception("Untagged event: id=%d" % ev_id)
         else:
             for ev in list:
-                if ev.status == EV_FAILED:
+                if ev._status == EV_FAILED:
                     self._tag_failed(curs, batch_id, ev.id, ev.fail_reason)
                     failed += 1
-                elif ev.status == EV_RETRY:
+                elif ev._status == EV_RETRY:
                     self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
                     retry += 1
-                elif stat[0] != EV_DONE:
-                    raise Exception("Untagged event: %d" % ev_id)
+                elif ev._status != EV_DONE:
+                    raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % (
+                                    ev.id, ev.type, ev.data, ev.extra1))
 
         # report weird events
         if retry:
-            self.stat_add('retry-events', retry)
+            self.stat_increase('retry-events', retry)
         if failed:
-            self.stat_add('failed-events', failed)
+            self.stat_increase('failed-events', failed)
 
         curs.execute("select pgq.finish_batch(%s)", [batch_id])
 
@@ -317,189 +331,3 @@ class Consumer(skytools.DBScript):
         self.stat_put('duration', t - self.stat_batch_start)
 
 
-class RemoteConsumer(Consumer):
-    """Helper for doing event processing in another database.
-
-    Requires that whole batch is processed in one TX.
-    """
-
-    def __init__(self, service_name, db_name, remote_db, args):
-        Consumer.__init__(self, service_name, db_name, args)
-        self.remote_db = remote_db
-
-    def process_batch(self, db, batch_id, event_list):
-        """Process all events in batch.
-        
-        By default calls process_event for each.
-        """
-        dst_db = self.get_database(self.remote_db)
-        curs = dst_db.cursor()
-
-        if self.is_last_batch(curs, batch_id):
-            for ev in event_list:
-                ev.tag_done()
-            return
-
-        self.process_remote_batch(db, batch_id, event_list, dst_db)
-
-        self.set_last_batch(curs, batch_id)
-        dst_db.commit()
-
-    def is_last_batch(self, dst_curs, batch_id):
-        """Helper function to keep track of last successful batch
-        in external database.
-        """
-        q = "select pgq_ext.is_batch_done(%s, %s)"
-        dst_curs.execute(q, [ self.consumer_id, batch_id ])
-        return dst_curs.fetchone()[0]
-
-    def set_last_batch(self, dst_curs, batch_id):
-        """Helper function to set last successful batch
-        in external database.
-        """
-        q = "select pgq_ext.set_batch_done(%s, %s)"
-        dst_curs.execute(q, [ self.consumer_id, batch_id ])
-
-    def process_remote_batch(self, db, batch_id, event_list, dst_db):
-        raise Exception('process_remote_batch not implemented')
-
-class SerialConsumer(Consumer):
-    """Consumer that applies batches sequentially in second database.
-
-    Requirements:
-     - Whole batch in one TX.
-     - Must not use retry queue.
-
-    Features:
-     - Can detect if several batches are already applied to dest db.
-     - If some ticks are lost. allows to seek back on queue.
-       Whether it succeeds, depends on pgq configuration.
-    """
-
-    def __init__(self, service_name, db_name, remote_db, args):
-        Consumer.__init__(self, service_name, db_name, args)
-        self.remote_db = remote_db
-        self.dst_schema = "pgq_ext"
-        self.cur_batch_info = None
-
-    def startup(self):
-        if self.options.rewind:
-            self.rewind()
-            sys.exit(0)
-        if self.options.reset:
-            self.dst_reset()
-            sys.exit(0)
-        return Consumer.startup(self)
-
-    def init_optparse(self, parser = None):
-        p = Consumer.init_optparse(self, parser)
-        p.add_option("--rewind", action = "store_true",
-                help = "change queue position according to destination")
-        p.add_option("--reset", action = "store_true",
-                help = "reset queue pos on destination side")
-        return p
-
-    def process_batch(self, db, batch_id, event_list):
-        """Process all events in batch.
-        """
-
-        dst_db = self.get_database(self.remote_db)
-        curs = dst_db.cursor()
-
-        self.cur_batch_info = self.get_batch_info(batch_id)
-
-        # check if done
-        if self.is_batch_done(curs):
-            for ev in event_list:
-                ev.tag_done()
-            return
-
-        # actual work
-        self.process_remote_batch(db, batch_id, event_list, dst_db)
-
-        # finish work
-        self.set_batch_done(curs)
-        dst_db.commit()
-
-    def is_batch_done(self, dst_curs):
-        """Helper function to keep track of last successful batch
-        in external database.
-        """
-
-        cur_tick = self.cur_batch_info['tick_id']
-        prev_tick = self.cur_batch_info['prev_tick_id']
-
-        dst_tick = self.get_last_tick(dst_curs)
-        if not dst_tick:
-            # seems this consumer has not run yet against dst_db
-            return False
-
-        if prev_tick == dst_tick:
-            # on track
-            return False
-
-        if prev_tick < dst_tick:
-            if dst_tick - prev_tick > 5:
-                 raise Exception('Difference too big, skipping dangerous')
-            self.log.warning('Got tick %d, dst has %d - skipping' % (prev_tick, dst_tick))
-            return True
-        else:
-            self.log.error('Got tick %d, dst has %d - ticks lost' % (prev_tick, dst_tick))
-            raise Exception('Lost ticks')
-
-    def set_batch_done(self, dst_curs):
-        """Helper function to set last successful batch
-        in external database.
-        """
-        tick_id = self.cur_batch_info['tick_id']
-        self.set_last_tick(dst_curs, tick_id)
-
-    def attach(self):
-        new = Consumer.attach(self)
-        if new:
-            self.dst_reset()
-
-    def detach(self):
-        """If detaching, also clean completed tick table on dest."""
-
-        Consumer.detach(self)
-        self.dst_reset()
-
-    def process_remote_batch(self, db, batch_id, event_list, dst_db):
-        raise Exception('process_remote_batch not implemented')
-
-    def rewind(self):
-        self.log.info("Rewinding queue")
-        src_db = self.get_database(self.db_name)
-        dst_db = self.get_database(self.remote_db)
-        src_curs = src_db.cursor()
-        dst_curs = dst_db.cursor()
-
-        dst_tick = self.get_last_tick(dst_curs)
-        if dst_tick:
-            q = "select pgq.register_consumer_at(%s, %s, %s)"
-            src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick])
-        else:
-            self.log.warning('No tick found on dst side')
-
-        dst_db.commit()
-        src_db.commit()
-        
-    def dst_reset(self):
-        self.log.info("Resetting queue tracking on dst side")
-        dst_db = self.get_database(self.remote_db)
-        dst_curs = dst_db.cursor()
-        self.set_last_tick(dst_curs, None)
-        dst_db.commit()
-        
-    def get_last_tick(self, dst_curs):
-        q = "select %s.get_last_tick(%%s)" % self.dst_schema
-        dst_curs.execute(q, [self.consumer_id])
-        res = dst_curs.fetchone()
-        return res[0]
-
-    def set_last_tick(self, dst_curs, tick_id):
-        q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
-        dst_curs.execute(q, [ self.consumer_id, tick_id ])
-
-
index afaabbaf3e31f459b44be06c7f2a0e10d16ecc16..80801b16f6fec4a8a4278772a695708a0e5b7e51 100644 (file)
@@ -38,6 +38,9 @@ class Event(object):
     Consumer is supposed to tag them after processing.
     If not, events will stay in retry queue.
     """
+    __slots__ = ('_event_row', '_status', 'retry_time',
+                 'fail_reason', 'queue_name')
+
     def __init__(self, queue_name, row):
         self._event_row = row
         self._status = EV_UNTAGGED
@@ -62,3 +65,16 @@ class Event(object):
     def get_status(self):
         return self._status
 
+    # be also dict-like
+    def __getitem__(self, k): return self._event_row.__getitem__(k)
+    def __contains__(self, k): return self._event_row.__contains__(k)
+    def get(self, k, d=None): return self._event_row.get(k, d)
+    def has_key(self, k): return self._event_row.has_key(k)
+    def keys(self): return self._event_row.keys()
+    def values(self): return self._event_row.keys()
+    def items(self): return self._event_row.items()
+    def iterkeys(self): return self._event_row.iterkeys()
+    def itervalues(self): return self._event_row.itervalues()
+    def __str__(self):
+        return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % (
+                self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4)
index 396f9c0a24a8c241229741f1b4afb5256ae945b0..89014dd4cadeed5768705380d963d33f3ae96992 100644 (file)
@@ -2,6 +2,8 @@
 
 import skytools, time
 
+__all__ = ['MaintenanceJob']
+
 def get_pgq_api_version(curs):
     q = "select count(1) from pg_proc p, pg_namespace n"\
         " where n.oid = p.pronamespace and n.nspname='pgq'"\
diff --git a/python/pgq/rawconsumer.py b/python/pgq/rawconsumer.py
deleted file mode 100644 (file)
index a43b86b..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-
-
-import sys, time, skytools, pgq.consumer
-
-class RawQueue:
-    queue_name = None
-    consumer_name = None
-    batch_id = None
-    cur_tick = None
-    prev_tick = None
-    def __init__(self, queue_name, consumer_name):
-        self.queue_name = queue_name
-        self.consumer_name = consumer_name
-        self.bulk_insert_buf = []
-        self.bulk_insert_size = 200
-        self.bulk_insert_fields = ['ev_id', 'ev_time', 'ev_type', 'ev_data', 'ev_extra1', 'ev_extra2', 'ev_extra3', 'ev_extra4']
-
-    def next_batch(self, curs):
-        q = "select * from pgq.next_batch(%s, %s)"
-        curs.execute(q, [self.queue_name, self.consumer_name])
-        self.batch_id = curs.fetchone()[0]
-
-        if not self.batch_id:
-            return self.batch_id
-
-        q = "select tick_id, prev_tick_id, batch_end from pgq.get_batch_info(%s)"
-        curs.execute(q, [self.batch_id])
-        inf = curs.dictfetchone()
-        self.cur_tick = inf['tick_id']
-        self.prev_tick = inf['prev_tick_id']
-        self.tick_time = inf['batch_end']
-
-        return self.batch_id
-
-    def finish_batch(self, curs):
-        q = "select * from pgq.finish_batch(%s)"
-        curs.execute(q, [self.batch_id])
-
-    def get_batch_events(self, curs):
-        return pgq.consumer._BatchWalker(curs, self.batch_id, self.queue_name)
-
-    def bulk_insert(self, curs, ev):
-        row = map(ev.__getattr__, self.bulk_insert_fields)
-        self.bulk_insert_buf.append(row)
-        if len(self.bulk_insert_buf) >= self.bulk_insert_size:
-            self.finish_bulk_insert(curs)
-
-    def finish_bulk_insert(self, curs):
-        pgq.bulk_insert_events(curs, self.bulk_insert_buf,
-                               self.bulk_insert_fields, self.queue_name)
-        self.bulk_insert_buf = []
-
diff --git a/python/pgq/remoteconsumer.py b/python/pgq/remoteconsumer.py
new file mode 100644 (file)
index 0000000..f5c2ced
--- /dev/null
@@ -0,0 +1,197 @@
+
+"""
+old RemoteConsumer / SerialConsumer classes.
+
+"""
+
+import sys, time, skytools
+
+from pgq.consumer import Consumer
+
+__all__ = ['RemoteConsumer', 'SerialConsumer']
+
+class RemoteConsumer(Consumer):
+    """Helper for doing event processing in another database.
+
+    Requires that whole batch is processed in one TX.
+    """
+
+    def __init__(self, service_name, db_name, remote_db, args):
+        Consumer.__init__(self, service_name, db_name, args)
+        self.remote_db = remote_db
+
+    def process_batch(self, db, batch_id, event_list):
+        """Process all events in batch.
+        
+        By default calls process_event for each.
+        """
+        dst_db = self.get_database(self.remote_db)
+        curs = dst_db.cursor()
+
+        if self.is_last_batch(curs, batch_id):
+            for ev in event_list:
+                ev.tag_done()
+            return
+
+        self.process_remote_batch(db, batch_id, event_list, dst_db)
+
+        self.set_last_batch(curs, batch_id)
+        dst_db.commit()
+
+    def is_last_batch(self, dst_curs, batch_id):
+        """Helper function to keep track of last successful batch
+        in external database.
+        """
+        q = "select pgq_ext.is_batch_done(%s, %s)"
+        dst_curs.execute(q, [ self.consumer_name, batch_id ])
+        return dst_curs.fetchone()[0]
+
+    def set_last_batch(self, dst_curs, batch_id):
+        """Helper function to set last successful batch
+        in external database.
+        """
+        q = "select pgq_ext.set_batch_done(%s, %s)"
+        dst_curs.execute(q, [ self.consumer_name, batch_id ])
+
+    def process_remote_batch(self, db, batch_id, event_list, dst_db):
+        raise Exception('process_remote_batch not implemented')
+
+class SerialConsumer(Consumer):
+    """Consumer that applies batches sequentially in second database.
+
+    Requirements:
+     - Whole batch in one TX.
+     - Must not use retry queue.
+
+    Features:
+     - Can detect if several batches are already applied to dest db.
+     - If some ticks are lost. allows to seek back on queue.
+       Whether it succeeds, depends on pgq configuration.
+    """
+
+    def __init__(self, service_name, db_name, remote_db, args):
+        Consumer.__init__(self, service_name, db_name, args)
+        self.remote_db = remote_db
+        self.dst_schema = "pgq_ext"
+        self.cur_batch_info = None
+
+    def startup(self):
+        if self.options.rewind:
+            self.rewind()
+            sys.exit(0)
+        if self.options.reset:
+            self.dst_reset()
+            sys.exit(0)
+        return Consumer.startup(self)
+
+    def init_optparse(self, parser = None):
+        p = Consumer.init_optparse(self, parser)
+        p.add_option("--rewind", action = "store_true",
+                help = "change queue position according to destination")
+        p.add_option("--reset", action = "store_true",
+                help = "reset queue pos on destination side")
+        return p
+
+    def process_batch(self, db, batch_id, event_list):
+        """Process all events in batch.
+        """
+
+        dst_db = self.get_database(self.remote_db)
+        curs = dst_db.cursor()
+
+        self.cur_batch_info = self.get_batch_info(batch_id)
+
+        # check if done
+        if self.is_batch_done(curs):
+            for ev in event_list:
+                ev.tag_done()
+            return
+
+        # actual work
+        self.process_remote_batch(db, batch_id, event_list, dst_db)
+
+        # finish work
+        self.set_batch_done(curs)
+        dst_db.commit()
+
+    def is_batch_done(self, dst_curs):
+        """Helper function to keep track of last successful batch
+        in external database.
+        """
+
+        cur_tick = self.cur_batch_info['tick_id']
+        prev_tick = self.cur_batch_info['prev_tick_id']
+
+        dst_tick = self.get_last_tick(dst_curs)
+        if not dst_tick:
+            # seems this consumer has not run yet against dst_db
+            return False
+
+        if prev_tick == dst_tick:
+            # on track
+            return False
+
+        if cur_tick == dst_tick:
+            # current batch is already applied, skip it
+            return True
+
+        # anything else means problems
+        raise Exception('Lost position: batch %d..%d, dst has %d' % (
+                        prev_tick, cur_tick, dst_tick))
+
+    def set_batch_done(self, dst_curs):
+        """Helper function to set last successful batch
+        in external database.
+        """
+        tick_id = self.cur_batch_info['tick_id']
+        self.set_last_tick(dst_curs, tick_id)
+
+    def register_consumer(self):
+        new = Consumer.register_consumer(self)
+        if new: # fixme
+            self.dst_reset()
+
+    def unregister_consumer(self):
+        """If unregistering, also clean completed tick table on dest."""
+
+        Consumer.unregister_consumer(self)
+        self.dst_reset()
+
+    def process_remote_batch(self, db, batch_id, event_list, dst_db):
+        raise Exception('process_remote_batch not implemented')
+
+    def rewind(self):
+        self.log.info("Rewinding queue")
+        src_db = self.get_database(self.db_name)
+        dst_db = self.get_database(self.remote_db)
+        src_curs = src_db.cursor()
+        dst_curs = dst_db.cursor()
+
+        dst_tick = self.get_last_tick(dst_curs)
+        if dst_tick:
+            q = "select pgq.register_consumer_at(%s, %s, %s)"
+            src_curs.execute(q, [self.queue_name, self.consumer_name, dst_tick])
+        else:
+            self.log.warning('No tick found on dst side')
+
+        dst_db.commit()
+        src_db.commit()
+        
+    def dst_reset(self):
+        self.log.info("Resetting queue tracking on dst side")
+        dst_db = self.get_database(self.remote_db)
+        dst_curs = dst_db.cursor()
+        self.set_last_tick(dst_curs, None)
+        dst_db.commit()
+        
+    def get_last_tick(self, dst_curs):
+        q = "select %s.get_last_tick(%%s)" % self.dst_schema
+        dst_curs.execute(q, [self.consumer_name])
+        res = dst_curs.fetchone()
+        return res[0]
+
+    def set_last_tick(self, dst_curs, tick_id):
+        q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
+        dst_curs.execute(q, [ self.consumer_name, tick_id ])
+
+
diff --git a/python/pgq/setadmin.py b/python/pgq/setadmin.py
deleted file mode 100644 (file)
index 0c50046..0000000
+++ /dev/null
@@ -1,463 +0,0 @@
-#! /usr/bin/env python
-
-import sys, time, optparse, skytools
-
-from pgq.setinfo import *
-
-__all__ = ['SetAdmin']
-
-command_usage = """\
-%prog [options] INI CMD [subcmd args]
-
-Node Initialization:
-  init-root   NODE_NAME NODE_CONSTR
-  init-branch NODE_NAME NODE_CONSTR --provider=<constr>
-  init-leaf   NODE_NAME NODE_CONSTR --provider=<constr>
-    Initializes node.  Given connstr is kept as global connstring
-    for that node.  Those commands ignore node_db in .ini.
-    The --provider connstr is used only for initial set info
-    fetching, later actual provider's connect string is used.
-
-Node Administration:
-  status                Show set state
-  members               Show members in set
-  rename-node OLD NEW   Rename a node
-  change-provider NODE NEWSRC   
-  pause NODE
-  resume NODE
-
-  switchover NEWROOT
-  failover NEWROOT
-  tag-dead NODE ..      Tag node as dead
-  tag-alive NODE ..     Tag node as alive
-"""
-
-class SetAdmin(skytools.AdminScript):
-    set_name = None
-    extra_objs = []
-    initial_db_name = 'node_db'
-
-    def init_optparse(self, parser = None):
-        p = skytools.AdminScript.init_optparse(self, parser)
-        p.set_usage(command_usage.strip())
-
-        g = optparse.OptionGroup(p, "actual setadm options")
-        g.add_option("--connstr", action="store_true",
-                     help = "initial connect string")
-        g.add_option("--provider",
-                     help = "init: connect string for provider")
-        p.add_option_group(g)
-        return p
-
-    def reload(self):
-        skytools.AdminScript.reload(self)
-        self.set_name = self.cf.get('set_name')
-
-    #
-    # Node initialization.
-    #
-
-    def cmd_init_root(self, node_name, node_location):
-        self.init_node('root', node_name, node_location)
-
-    def cmd_init_branch(self, node_name, node_location):
-        if len(args) != 2:
-            raise Exception('init-branch needs 2 args')
-        self.init_node('branch', node_name, node_location)
-
-    def cmd_init_leaf(self, node_name, node_location):
-        self.init_node('leaf', node_name, node_location)
-
-    def init_node(self, node_type, node_name, node_location):
-        provider_loc = self.options.provider
-
-        # connect to database
-        db = self.get_database("new_node", connstr = node_location)
-
-        # check if code is installed
-        self.install_code(db)
-
-        # query current status
-        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-        info = res[0]
-        if info['node_type'] is not None:
-            self.log.info("Node is already initialized as %s" % info['node_type'])
-            return
-        
-        self.log.info("Initializing node")
-
-        # register member
-        if node_type in ('root', 'combined-root'):
-            global_watermark = None
-            combined_set = None
-            provider_name = None
-            self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            self.exec_cmd(db, "select * from pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
-            provider_db = None
-        else:
-            root_db = self.find_root_db(provider_loc)
-            set = self.load_set_info(root_db)
-
-            # check if member already exists
-            if set.get_member(node_name) is not None:
-                self.log.error("Node '%s' already exists" % node_name)
-                sys.exit(1)
-
-            combined_set = None
-
-            provider_db = self.get_database('provider_db', connstr = provider_loc)
-            q = "select node_type, node_name from pgq_set.get_node_info(%s)"
-            res = self.exec_query(provider_db, q, [self.set_name])
-            row = res[0]
-            if not row['node_name']:
-                raise Exception("provider node not found")
-            provider_name = row['node_name']
-
-            # register member on root
-            self.exec_cmd(root_db, "select * from pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-
-            # lookup provider
-            provider = set.get_member(provider_name)
-            if not provider:
-                self.log.error("Node %s does not exist" % provider_name)
-                sys.exit(1)
-
-            # register on provider
-            self.exec_cmd(provider_db, "select * from pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            rows = self.exec_cmd(provider_db, "select * from pgq_set.subscribe_node(%s, %s)",
-                                 [self.set_name, node_name])
-            global_watermark = rows[0]['global_watermark']
-
-            # initialize node itself
-
-            # insert members
-            self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            for m in set.member_map.values():
-                self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, %s)",
-                              [self.set_name, m.name, m.location, m.dead])
-
-            # real init
-            self.exec_cmd(db, "select * from pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, provider_name,
-                           global_watermark, combined_set])
-
-
-        self.extra_init(node_type, db, provider_db)
-
-        self.log.info("Done")
-
-    def extra_init(self, node_type, node_db, provider_db):
-        pass
-
-    def find_root_db(self, initial_loc = None):
-        if initial_loc:
-            loc = initial_loc
-        else:
-            loc = self.cf.get(self.initial_db_name)
-
-        while 1:
-            db = self.get_database('root_db', connstr = loc)
-
-
-            # query current status
-            res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-            info = res[0]
-            type = info['node_type']
-            if type is None:
-                self.log.info("Root node not initialized?")
-                sys.exit(1)
-
-            self.log.debug("db='%s' -- type='%s' provider='%s'" % (loc, type, info['provider_location']))
-            # configured db may not be root anymore, walk upwards then
-            if type in ('root', 'combined-root'):
-                db.commit()
-                return db
-
-            self.close_database('root_db')
-            if loc == info['provider_location']:
-                raise Exception("find_root_db: got loop: %s" % loc)
-            loc = info['provider_location']
-            if loc is None:
-                self.log.error("Sub node provider not initialized?")
-                sys.exit(1)
-
-    def install_code(self, db):
-        objs = [
-            skytools.DBLanguage("plpgsql"),
-            skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
-            skytools.DBSchema("pgq", sql_file="pgq.sql"),
-            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"),
-            skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"),
-        ]
-        objs += self.extra_objs
-        skytools.db_install(db.cursor(), objs, self.log)
-        db.commit()
-
-    #
-    # Print status of whole set.
-    #
-
-    def cmd_status(self):
-        root_db = self.find_root_db()
-        sinf = self.load_set_info(root_db)
-
-        for mname, minf in sinf.member_map.iteritems():
-            db = self.get_database('look_db', connstr = minf.location, autocommit = 1)
-            curs = db.cursor()
-            curs.execute("select * from pgq_set.get_node_info(%s)", [self.set_name])
-            node = NodeInfo(self.set_name, curs.fetchone())
-            node.load_status(curs)
-            self.load_extra_status(curs, node)
-            sinf.add_node(node)
-            self.close_database('look_db')
-
-        sinf.print_tree()
-
-    def load_extra_status(self, curs, node):
-        pass
-
-    #
-    # Normal commands.
-    #
-
-    def cmd_change_provider(self, node_name, new_provider):
-        old_provider = None
-
-        self.load_local_info()
-        node_location = self.set_info.get_member(node_name).location
-        node_db = self.get_node_database(node_name)
-        node_set_info = self.load_set_info(node_db)
-        node = node_set_info.local_node
-        old_provider = node.provider_node
-
-        if old_provider == new_provider:
-            self.log.info("Node %s has already %s as provider" % (node_name, new_provider))
-
-        # pause target node
-        self.pause_node(node_name)
-
-        # reload node info
-        node_set_info = self.load_set_info(node_db)
-        node = node_set_info.local_node
-
-        # subscribe on new provider
-        q = "select * from pgq_set.add_member(%s, %s, %s, false)"
-        self.node_cmd(new_provider, q, [self.set_name, node_name, node_location])
-        q = 'select * from pgq_set.subscribe_node(%s, %s, %s)'
-        self.node_cmd(new_provider, q, [self.set_name, node_name, node.completed_tick])
-
-        # change provider on node
-        q = 'select * from pgq_set.change_provider(%s, %s)'
-        self.node_cmd(node_name, q, [self.set_name, new_provider])
-
-        # unsubscribe from old provider
-        q = "select * from pgq_set.unsubscribe_node(%s, %s)"
-        self.node_cmd(old_provider, q, [self.set_name, node_name])
-
-        # resume node
-        self.resume_node(node_name)
-
-    def cmd_rename_node(self, old_name, new_name):
-
-        self.load_local_info()
-
-        root_db = self.find_root_db()
-
-        # pause target node
-        self.pause_node(old_name)
-        node = self.load_node_info(old_name)
-        provider_node = node.provider_node
-        subscriber_list = self.get_node_subscriber_list(old_name)
-
-
-        # create copy of member info / subscriber+queue info
-        step1 = 'select * from pgq_set.rename_node_step1(%s, %s, %s)'
-        # rename node itself, drop copies
-        step2 = 'select * from pgq_set.rename_node_step2(%s, %s, %s)'
-
-        # step1
-        self.exec_cmd(root_db, step1, [self.set_name, old_name, new_name])
-        self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
-        self.node_cmd(old_name, step1, [self.set_name, old_name, new_name])
-        for child in subscriber_list:
-            self.node_cmd(child, step1, [self.set_name, old_name, new_name])
-
-        # step1
-        self.node_cmd(old_name, step2, [self.set_name, old_name, new_name])
-        self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
-        for child in subscriber_list:
-            self.node_cmd(child, step2, [self.set_name, old_name, new_name])
-        self.exec_cmd(root_db, step2, [self.set_name, old_name, new_name])
-
-        # resume node
-        self.resume_node(old_name)
-
-    def switchover_nonroot(self, old_node, new_node):
-        # see if we need to change new nodes' provider
-        tmp_node = new_node
-        while 1:
-            if tmp_node.is_root():
-                break
-            if tmp_node.name == old_node:
-                # yes, old_node is new_nodes provider,
-                # switch it around
-                self.change_provider(new_node, old_node.parent_node)
-                break
-        self.change_provider(old_node.name, new_node.name)
-
-    def switchover_root(self, old_node, new_node):
-        self.pause_node(old_node.name)
-        self.extra_lockdown(old_node)
-
-        self.wait_for_catchup(new_node, old_node)
-        self.pause_node(new_node.name)
-        self.promote_node(new_node.name)
-        self.subscribe_node(new_node.name, old_node.name, tick_pos)
-        self.unsubscribe_node(new_node.parent_node, new_node.name)
-        self.resume_node(new_node.name)
-
-        # demote & set provider on node
-        q = 'select * from pgq_set.demote_root(%s, %s)'
-        self.node_cmd(old_node.name, q, [self.set_name, new_node.name])
-
-        self.resume_node(old_node.name)
-
-    def cmd_switchover(self, old_node_name, new_node_name):
-        self.load_local_info()
-        old_node = self.get_node_info(old_node_name)
-        new_node = self.get_node_info(new_node_name)
-        if old_node.name == new_node.name:
-            self.log.info("same node?")
-            return
-
-        if old_node.is_root():
-            self.switchover_root(old_node, new_node)
-        else:
-            self.switchover_nonroot(old_node, new_node)
-
-        # switch subscribers around
-        if self.options.all:
-            for n in self.get_node_subscriber_list(old_node.name):
-                self.change_provider(n, new_node.name)
-
-    def cmd_pause(self, node_name):
-        self.load_local_info()
-        self.pause_node(node_name)
-
-    def cmd_resume(self, node_name):
-        self.load_local_info()
-        self.resume_node(node_name)
-
-    def cmd_members(self):
-        db = self.get_database(self.initial_db_name)
-        q = "select node_name from pgq_set.get_node_info(%s)"
-        rows = self.exec_query(db, q, [self.set_name])
-
-        desc = 'Member info on %s:' % rows[0]['node_name']
-        q = "select node_name, dead, node_location"\
-            " from pgq_set.get_member_info(%s) order by 1"
-        self.display_table(db, desc, q, [self.set_name])
-
-    #
-    # Shortcuts for operating on nodes.
-    #
-
-    def load_local_info(self):
-        """fetch set info from local node."""
-        db = self.get_database(self.initial_db_name)
-        self.set_info = self.load_set_info(db)
-
-    def get_node_database(self, node_name):
-        """Connect to node."""
-        if node_name == self.set_info.local_node.name:
-            db = self.get_database(self.initial_db_name)
-        else:
-            m = self.set_info.get_member(node_name)
-            if not m:
-                self.log.error("cannot resolve %s" % node_name)
-                sys.exit(1)
-            loc = m.location
-            db = self.get_database('node.' + node_name, connstr = loc)
-        return db
-
-    def close_node_database(self, node_name):
-        """Disconnect node's connection."""
-        if node_name == self.set_info.local_node.name:
-            self.close_database(self.initial_db_name)
-        else:
-            self.close_database("node." + node_name)
-
-    def node_cmd(self, node_name, sql, args):
-        """Execute SQL command on particular node."""
-        db = self.get_node_database(node_name)
-        return self.exec_cmd(db, sql, args)
-
-    #
-    # Various operation on nodes.
-    #
-
-    def set_paused(self, db, pause_flag):
-        q = "select * from pgq_set.set_node_paused(%s, %s)"
-        self.exec_cmd(db, q, [self.set_name, pause_flag])
-
-        self.log.info('Waiting for worker to accept')
-        while 1:
-            q = "select * from pgq_set.get_node_info(%s)"
-            stat = self.exec_query(db, q, [self.set_name])[0]
-            if stat['paused'] != pause_flag:
-                raise Exception('operation canceled? %s <> %s' % (repr(stat['paused']), repr(pause_flag)))
-
-            if stat['uptodate']:
-                break
-            time.sleep(1)
-
-        op = pause_flag and "paused" or "resumed"
-
-        self.log.info("Node %s %s" % (stat['node_name'], op))
-
-    def pause_node(self, node_name):
-        db = self.get_node_database(node_name)
-        self.set_paused(db, True)
-
-    def resume_node(self, node_name):
-        db = self.get_node_database(node_name)
-        self.set_paused(db, False)
-
-    def subscribe_node(self, target_node, subscriber_node, tick_pos):
-        q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
-        self.node_cmd(target_node, q, [self.set_name, subscribe_node, tick_pos])
-
-    def unsubscribe_node(self, target_node, subscriber_node):
-        q = "select * from pgq_set.unsubscribe_node(%s, %s)"
-        self.node_cmd(target_node, q, [self.set_name, subscribe_node])
-
-    def load_node_info(self, node_name):
-        db = self.get_node_database(node_name)
-        q = "select * from pgq_set.get_node_info(%s)"
-        rows = self.exec_query(db, q, [self.set_name])
-        return NodeInfo(self.set_name, rows[0])
-
-    def load_set_info(self, db):
-        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-        info = res[0]
-
-        q = "select * from pgq_set.get_member_info(%s)"
-        member_list = self.exec_query(db, q, [self.set_name])
-
-        return SetInfo(self.set_name, info, member_list)
-
-    def get_node_subscriber_list(self, node_name):
-        q = "select node_name, local_watermark from pgq_set.get_subscriber_info(%s)"
-        db = self.get_node_database(node_name)
-        rows = self.exec_query(db, q, [self.set_name])
-        return [r['node_name'] for r in rows]
-
-if __name__ == '__main__':
-    script = SetAdmin('set_admin', sys.argv[1:])
-    script.start()
-
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py
deleted file mode 100644 (file)
index d17ecf5..0000000
+++ /dev/null
@@ -1,247 +0,0 @@
-#! /usr/bin/env python
-
-import sys, time, skytools
-
-from pgq.rawconsumer import RawQueue
-from pgq.setinfo import *
-
-__all__ = ['SetConsumer']
-
-class SetConsumer(skytools.DBScript):
-    last_local_wm_publish_time = 0
-    last_global_wm_publish_time = 0
-    main_worker = True
-    reg_ok = False
-    actual_dst_event_id = 0
-    batch_max_event_id = 0
-    seq_buffer = 10000
-    def __init__(self, service_name, args,
-                 node_db_name = 'node_db'):
-        skytools.DBScript.__init__(self, service_name, args)
-        self.node_db_name = node_db_name
-        self.consumer_name = self.cf.get('consumer_name', self.job_name)
-
-    def work(self):
-        self.tick_id_cache = {}
-
-        self.set_name = self.cf.get('set_name')
-        dst_db = self.get_database(self.node_db_name)
-        dst_curs = dst_db.cursor()
-
-        dst_node = self.load_node_info(dst_db)
-        if self.main_worker:
-            self.consumer_name = dst_node.name
-            if not dst_node.uptodate:
-                self.tag_node_uptodate(dst_db)
-
-        if dst_node.paused:
-            return 0
-
-        if dst_node.need_action('global-wm-event'):
-            self.publish_global_watermark(dst_db, dst_node.local_watermark)
-
-        if not dst_node.need_action('process-batch'):
-            return 0
-
-        #
-        # batch processing follows
-        #
-
-        src_db = self.get_database('src_db', connstr = dst_node.provider_location)
-        src_curs = src_db.cursor()
-        src_node = self.load_node_info(src_db)
-        
-        # get batch
-        src_queue = RawQueue(src_node.queue_name, self.consumer_name)
-        self.src_queue = src_queue
-        self.dst_queue = None
-
-        if not self.main_worker and not self.reg_ok:
-            self.register_consumer(src_curs)
-
-        batch_id = src_queue.next_batch(src_curs)
-        src_db.commit()
-        if batch_id is None:
-            return 0
-
-        self.log.debug("New batch: tick_id=%d / batch_id=%d" % (src_queue.cur_tick, batch_id))
-
-        if dst_node.need_action('wait-behind'):
-            if dst_node.should_wait(src_queue.cur_tick):
-                return 0
-
-        if dst_node.need_action('process-events'):
-            # load and process batch data
-            ev_list = src_queue.get_batch_events(src_curs)
-
-            if dst_node.need_action('copy-events'):
-                self.dst_queue = RawQueue(dst_node.get_target_queue(), self.consumer_name)
-            self.process_set_batch(src_db, dst_db, ev_list)
-            if self.dst_queue:
-                self.dst_queue.finish_bulk_insert(dst_curs)
-                self.copy_tick(dst_curs, src_queue, self.dst_queue)
-
-            # COMBINED_BRANCH needs to sync with part sets
-            if dst_node.need_action('sync-part-pos'):
-                self.move_part_positions(dst_curs)
-            if dst_node.need_action('update-event-seq'):
-                self.update_event_seq(dst_curs)
-
-        # we are done on target
-        self.set_tick_complete(dst_curs, src_queue.cur_tick)
-        dst_db.commit()
-
-        # done on source
-        src_queue.finish_batch(src_curs)
-        src_db.commit()
-
-        # occasinally send watermark upwards
-        if dst_node.need_action('local-wm-publish'):
-            self.send_local_watermark_upwards(src_db, dst_node)
-
-        # got a batch so there can be more
-        return 1
-
-    def process_set_batch(self, src_db, dst_db, ev_list):
-        dst_curs = dst_db.cursor()
-        max_id = 0
-        for ev in ev_list:
-            self.process_set_event(dst_curs, ev)
-            if self.dst_queue:
-                self.dst_queue.bulk_insert(dst_curs, ev)
-            if ev.id > max_id:
-                max_id = ev.id
-        self.batch_max_event_id = max_id
-        self.stat_increase('count', len(ev_list))
-
-    def update_event_seq(self, dst_curs):
-        qname = self.dst_queue.queue_name
-        if self.actual_dst_event_id == 0:
-            q = "select pgq.seq_getval(queue_event_seq) from pgq.queue where queue_name = %s"
-            dst_curs.execute(q, [qname])
-            self.actual_dst_event_id = dst_curs.fetchone()[0]
-            self.log.debug('got local event_id value = %d' % self.actual_dst_event_id)
-
-        if self.batch_max_event_id + self.seq_buffer >= self.actual_dst_event_id:
-            next_id = self.batch_max_event_id + 2 * self.seq_buffer
-            q = "select pgq.seq_setval(queue_event_seq, %s) from  pgq.queue where queue_name = %s"
-            self.log.debug('set local event_id value = %d' % next_id)
-            dst_curs.execute(q, [next_id, qname])
-            self.actual_dst_event_id = next_id
-
-    def process_set_event(self, dst_curs, ev):
-        if ev.type == 'set-tick':
-            self.handle_set_tick(dst_curs, ev)
-        elif ev.type == 'member-info':
-            self.handle_member_info(dst_curs, ev)
-        elif ev.type == 'global-watermark':
-            self.handle_global_watermark(dst_curs, ev)
-        else:
-            raise Exception('bad event for set consumer')
-
-    def handle_global_watermark(self, dst_curs, ev):
-        set_name = ev.extra1
-        tick_id = ev.data
-        if set_name == self.set_name:
-            self.set_global_watermark(dst_curs, tick_id)
-
-    def handle_set_tick(self, dst_curs, ev):
-        data = skytools.db_urldecode(ev.data)
-        set_name = data['set_name']
-        tick_id = data['tick_id']
-        self.tick_id_cache[set_name] = tick_id
-
-    def move_part_positions(self, dst_curs):
-        q = "select * from pgq_set.set_partition_watermark(%s, %s, %s)"
-        for set_name, tick_id in self.tick_id_cache.items():
-            dst_curs.execute(q, [self.set_name, set_name, tick_id])
-
-    def handle_member_info(self, dst_curs, ev):
-        node_name = ev.ev_data
-        set_name = ev.ev_extra1
-        node_location = ev.ev_extra2
-        dead = ev.ev_extra3
-        # this can also be member for part set, ignore then
-        if set_name != self.set_name:
-            return
-
-        q = "select * from pgq_set.add_member(%s, %s, %s, %s)"
-        dst_curs.execute(q, [set_name, node_name, node_location, dead])
-
-    def send_local_watermark_upwards(self, src_db, node):
-        # fixme - delay
-        now = time.time()
-        delay = now - self.last_local_wm_publish_time
-        if delay < 1*60:
-            return
-        self.last_local_wm_publish_time = now
-
-        self.log.debug("send_local_watermark_upwards")
-        src_curs = src_db.cursor()
-        q = "select pgq_set.set_subscriber_watermark(%s, %s, %s)"
-        src_curs.execute(q, [self.set_name, node.name, node.local_watermark])
-        src_db.commit()
-
-    def set_global_watermark(self, dst_curs, tick_id):
-        self.log.debug("set_global_watermark: %s" % tick_id)
-        q = "select pgq_set.set_global_watermark(%s, %s)"
-        dst_curs.execute(q, [self.set_name, tick_id])
-
-    def publish_global_watermark(self, dst_db, watermark):
-        now = time.time()
-        delay = now - self.last_global_wm_publish_time
-        if delay < 1*60:
-            return
-        self.last_global_wm_publish_time = now
-
-        self.set_global_watermark(dst_db.cursor(), watermark)
-        dst_db.commit()
-
-    def load_node_info(self, db):
-        curs = db.cursor()
-
-        q = "select * from pgq_set.get_node_info(%s)"
-        curs.execute(q, [self.set_name])
-        node_row = curs.dictfetchone()
-        if not node_row:
-            raise Exception('node not initialized')
-
-        q = "select * from pgq_set.get_member_info(%s)"
-        curs.execute(q, [self.set_name])
-        mbr_list = curs.dictfetchall()
-        db.commit()
-
-        return NodeInfo(self.set_name, node_row, self.main_worker)
-
-    def tag_node_uptodate(self, dst_db):
-        dst_curs = dst_db.cursor()
-        q = "select * from pgq_set.set_node_uptodate(%s, true)"
-        dst_curs.execute(q, [self.set_name])
-        dst_db.commit()
-
-    def copy_tick(self, dst_curs, src_queue, dst_queue):
-        q = "select * from pgq.ticker(%s, %s, %s)"
-        dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick, src_queue.tick_time])
-
-    def set_tick_complete(self, dst_curs, tick_id):
-        q = "select * from pgq_set.set_completed_tick(%s, %s, %s)"
-        dst_curs.execute(q, [self.set_name, self.consumer_name, tick_id])
-
-    def register_consumer(self, src_curs):
-        if self.main_worker:
-            raise Exception('main set worker should not play with registrations')
-
-        q = "select * from pgq.register_consumer(%s, %s)"
-        src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name])
-
-    def unregister_consumer(self, src_curs):
-        if self.main_worker:
-            raise Exception('main set worker should not play with registrations')
-
-        q = "select * from pgq.unregister_consumer(%s, %s)"
-        src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name])
-
-if __name__ == '__main__':
-    script = SetConsumer('setconsumer', sys.argv[1:])
-    script.start()
-
index f17539def29c32004a4edae22af15f6371c88f1a..d3f285c0050179a53b2729f64b8b6885266f180c 100644 (file)
@@ -4,15 +4,18 @@
 
 import sys, os, skytools
 
-def ival(data, as = None):
+__all__ = ['PGQStatus']
+
+def ival(data, _as = None):
     "Format interval for output"
-    if not as:
-        as = data.split('.')[-1]
+    if not _as:
+        _as = data.split('.')[-1]
     numfmt = 'FM9999999'
     expr = "coalesce(to_char(extract(epoch from %s), '%s') || 's', 'NULL') as %s"
-    return expr % (data, numfmt, as)
+    return expr % (data, numfmt, _as)
 
 class PGQStatus(skytools.DBScript):
+    """Info gathering and display."""
     def __init__(self, args, check = 0):
         skytools.DBScript.__init__(self, 'pgqadm', args)
 
@@ -28,7 +31,7 @@ class PGQStatus(skytools.DBScript):
         pgver = cx.fetchone()[0]
         cx.execute("select pgq.version()")
         qver = cx.fetchone()[0]
-        print "Postgres version: %s   PgQ version: %s" % (pgver, qver)
+        print("Postgres version: %s   PgQ version: %s" % (pgver, qver))
 
         q = """select f.queue_name, f.queue_ntables, %s, %s,
                       %s, %s, q.queue_ticker_max_count
@@ -50,36 +53,37 @@ class PGQStatus(skytools.DBScript):
         cx.execute(q)
         consumer_rows = cx.dictfetchall()
 
-        print "\n%-45s %9s %13s %6s" % ('Event queue',
-                            'Rotation', 'Ticker', 'TLag')
-        print '-' * 78
+        print("\n%-45s %9s %13s %6s" % ('Event queue',
+                            'Rotation', 'Ticker', 'TLag'))
+        print('-' * 78)
         for ev_row in event_rows:
             tck = "%s/%s/%s" % (ev_row['queue_ticker_max_count'],
                     ev_row['queue_ticker_max_lag'],
                     ev_row['queue_ticker_idle_period'])
             rot = "%s/%s" % (ev_row['queue_ntables'], ev_row['queue_rotation_period'])
-            print   "%-45s %9s %13s %6s" % (
+            print("%-45s %9s %13s %6s" % (
                 ev_row['queue_name'],
                 rot,
                 tck,
                 ev_row['ticker_lag'],
-            )
-        print '-' * 78
-        print "\n%-56s %9s %9s" % (
-                'Consumer', 'Lag', 'LastSeen')
-        print '-' * 78
+            ))
+        print('-' * 78)
+        print("\n%-56s %9s %9s" % (
+                'Consumer', 'Lag', 'LastSeen'))
+        print('-' * 78)
         for ev_row in event_rows:
             cons = self.pick_consumers(ev_row, consumer_rows)
             self.show_queue(ev_row, cons)
-        print '-' * 78
+        print('-' * 78)
         db.commit()
 
     def show_consumer(self, cons):
-        print "  %-54s %9s %9s" % (
+        print("  %-54s %9s %9s" % (
                     cons['consumer_name'],
-                    cons['lag'], cons['last_seen'])
+                    cons['lag'], cons['last_seen']))
+
     def show_queue(self, ev_row, consumer_rows):
-        print "%(queue_name)s:" % ev_row
+        print("%(queue_name)s:" % ev_row)
         for cons in consumer_rows:
             self.show_consumer(cons)
 
index 327864d5cfa36961ab6c253cf197b988a02d42f2..ba8aadb27bd65f4f7fa3d627b22fb4ce9d89ca48 100644 (file)
@@ -8,74 +8,10 @@ import skytools
 
 from maint import MaintenanceJob
 
-__all__ = ['SmartTicker']
+__all__ = ['SmallTicker']
 
-def is_txid_sane(curs):
-    curs.execute("select txid_current()")
-    txid = curs.fetchone()[0]
-
-    # on 8.2 theres no such table
-    if not skytools.exists_table(curs, 'txid.epoch'):
-        return 1
-
-    curs.execute("select epoch, last_value from txid.epoch")
-    epoch, last_val = curs.fetchone()
-    stored_val = (epoch << 32) | last_val
-
-    if stored_val <= txid:
-        return 1
-    else:
-        return 0
-
-class QueueStatus(object):
-    def __init__(self, name):
-        self.queue_name = name
-        self.seq_name = None
-        self.idle_period = 60
-        self.max_lag = 3
-        self.max_count = 200
-        self.last_tick_time = 0
-        self.last_count = 0
-        self.quiet_count = 0
-
-    def set_data(self, row):
-        self.seq_name = row['queue_event_seq']
-        self.idle_period = row['queue_ticker_idle_period']
-        self.max_lag = row['queue_ticker_max_lag']
-        self.max_count = row['queue_ticker_max_count']
-
-    def need_tick(self, cur_count, cur_time):
-        # check if tick is needed
-        need_tick = 0
-        lag = cur_time - self.last_tick_time
-
-        if cur_count == self.last_count:
-            # totally idle database
-
-            # don't go immidiately to big delays, as seq grows before commit
-            if self.quiet_count < 5:
-                if lag >= self.max_lag:
-                    need_tick = 1
-                    self.quiet_count += 1
-            else:
-                if lag >= self.idle_period:
-                    need_tick = 1
-        else:
-            self.quiet_count = 0
-            # somewhat loaded machine
-            if cur_count - self.last_count >= self.max_count:
-                need_tick = 1
-            elif lag >= self.max_lag:
-                need_tick = 1
-        if need_tick:
-            self.last_tick_time = cur_time
-            self.last_count = cur_count
-        return need_tick
-
-class SmartTicker(skytools.DBScript):
-    last_tick_event = 0
-    last_tick_time = 0
-    quiet_count = 0
+class SmallTicker(skytools.DBScript):
+    """Ticker that periodically calls pgq.ticker()."""
     tick_count = 0
     maint_thread = None
 
@@ -84,8 +20,6 @@ class SmartTicker(skytools.DBScript):
 
         self.ticker_log_time = 0
         self.ticker_log_delay = 5*60
-        self.queue_map = {}
-        self.refresh_time = 0
 
     def reload(self):
         skytools.DBScript.reload(self)
@@ -95,79 +29,22 @@ class SmartTicker(skytools.DBScript):
         if self.maint_thread:
             return
 
-        db = self.get_database("db", autocommit = 1)
-        cx = db.cursor()
-        ok = is_txid_sane(cx)
-        if not ok:
-            self.log.error('txid in bad state')
-            sys.exit(1)
-
+        # launch maint thread
         self.maint_thread = MaintenanceJob(self, [self.cf.filename])
         t = threading.Thread(name = 'maint_thread',
                              target = self.maint_thread.run)
         t.setDaemon(1)
         t.start()
 
-    def refresh_queues(self, cx):
-        q = "select queue_name, queue_event_seq,"\
-            " extract('epoch' from queue_ticker_idle_period) as queue_ticker_idle_period,"\
-            " extract('epoch' from queue_ticker_max_lag) as queue_ticker_max_lag,"\
-            " queue_ticker_max_count"\
-            " from pgq.queue"\
-            " where not queue_external_ticker"
-        cx.execute(q)
-        new_map = {}
-        data_list = []
-        from_list = []
-        for row in cx.dictfetchall():
-            queue_name = row['queue_name']
-            try:
-                que = self.queue_map[queue_name]
-            except KeyError, x:
-                que = QueueStatus(queue_name)
-            que.set_data(row)
-            new_map[queue_name] = que
-
-            p1 = "'%s', %s.last_value" % (queue_name, que.seq_name)
-            data_list.append(p1)
-            from_list.append(que.seq_name)
-
-        self.queue_map = new_map
-        self.seq_query = "select %s from %s" % (
-                ", ".join(data_list),
-                ", ".join(from_list))
-
-        if len(from_list) == 0:
-            self.seq_query = None
-
-        self.refresh_time = time.time()
-        
     def work(self):
         db = self.get_database("db", autocommit = 1)
         cx = db.cursor()
-        queue_refresh = self.cf.getint('queue_refresh_period', 30)
 
-        cur_time = time.time()
-
-        if cur_time >= self.refresh_time + queue_refresh:
-            self.refresh_queues(cx)
-
-        if not self.seq_query:
-            return
-
-        # now check seqs
-        cx.execute(self.seq_query)
-        res = cx.fetchone()
-        pos = 0
-        while pos < len(res):
-            id = res[pos]
-            val = res[pos + 1]
-            pos += 2
-            que = self.queue_map[id]
-            if que.need_tick(val, cur_time):
-                cx.execute("select pgq.ticker(%s)", [que.queue_name])
-                self.tick_count += 1
+        # run ticker
+        cx.execute("select pgq.ticker()")
+        self.tick_count += cx.fetchone()[0]
 
+        cur_time = time.time()
         if cur_time > self.ticker_log_time + self.ticker_log_delay:
             self.ticker_log_time = cur_time
             self.stat_increase('ticks', self.tick_count)
index 7c2bf74022e99f818277c45398aebb6630c8ce49..2687ed8adbe60aa6a4025f9b81a251bb130a00cc 100755 (executable)
@@ -4,11 +4,8 @@
 """
 
 import sys
-import skytools
-
-from pgq.ticker import SmartTicker
-from pgq.status import PGQStatus
-#from pgq.admin import PGQAdmin
+import skytools, pgq
+from pgq.cascade.admin import CascadeAdmin
 
 """TODO:
 pgqadm ini check
@@ -17,7 +14,7 @@ pgqadm ini check
 command_usage = """
 %prog [options] INI CMD [subcmd args]
 
-commands:
+local queue commands:
   ticker                   start ticking & maintenance process
 
   status                   show overview of queue health
@@ -28,6 +25,17 @@ commands:
   register QNAME CONS      install code into db
   unregister QNAME CONS    install code into db
   config QNAME [VAR=VAL]   show or change queue config
+
+cascaded queue commands:
+  create-node
+  rename-node
+  pause-node
+  resume-node
+  change-provider
+  tag-alive
+  tag-dead
+  switchover
+  failover
 """
 
 config_allowed_list = {
@@ -38,12 +46,14 @@ config_allowed_list = {
 }
 
 class PGQAdmin(skytools.DBScript):
+    """PgQ admin + maint script."""
     def __init__(self, args):
+        """Initialize pgqadm."""
         skytools.DBScript.__init__(self, 'pgqadm', args)
         self.set_single_loop(1)
 
         if len(self.args) < 2:
-            print "need command"
+            print("need command")
             sys.exit(1)
 
         int_cmds = {
@@ -55,16 +65,20 @@ class PGQAdmin(skytools.DBScript):
             'config': self.change_config,
         }
 
+        cascade_cmds = ['create-node']
+
         cmd = self.args[1]
         if cmd == "ticker":
-            script = SmartTicker(args)
+            script = pgq.SmallTicker(args)
         elif cmd == "status":
-            script = PGQStatus(args)
+            script = pgq.PGQStatus(args)
+        elif cmd in cascade_cmds:
+            script = CascadeAdmin(self.service_name, 'db', args)
         elif cmd in int_cmds:
             script = None
             self.work = int_cmds[cmd]
         else:
-            print "unknown command"
+            print("unknown command")
             sys.exit(1)
 
         if self.pidfile:
@@ -80,6 +94,7 @@ class PGQAdmin(skytools.DBScript):
     def init_optparse(self, parser=None):
         p = skytools.DBScript.init_optparse(self, parser)
         p.set_usage(command_usage.strip())
+        p.add_option("--queue", help = 'cascading: specify queue name')
         return p
 
     def installer(self):
@@ -118,8 +133,8 @@ class PGQAdmin(skytools.DBScript):
 
     def change_config(self):
         if len(self.args) < 3:
-            list = self.get_queue_list()
-            for qname in list:
+            qlist = self.get_queue_list()
+            for qname in qlist:
                 self.show_config(qname)
             return
 
@@ -139,7 +154,7 @@ class PGQAdmin(skytools.DBScript):
             expr = "%s=%s" % (k, skytools.quote_literal(v))
             alist.append(expr)
         self.log.info('Change queue %s config to: %s' % (qname, ", ".join(alist)))
-        sql = "update pgq.queue set %s where queue_name = %s" % ( 
+        sql = "update pgq.queue set %s where queue_name = %s" % (
                         ", ".join(alist), skytools.quote_literal(qname))
         self.exec_sql(sql, [])
 
@@ -168,15 +183,15 @@ class PGQAdmin(skytools.DBScript):
         db.commit()
 
         if res is None:
-            print "no such queue:", qname
+            print("no such queue: " + qname)
             return
 
-        print qname
+        print(qname)
         for k in config_allowed_list:
             n = k
             if k[:6] == "queue_":
                 n = k[6:]
-            print "    %s\t=%7s" % (n, res[k])
+            print("    %s\t=%7s" % (n, res[k]))
 
     def get_queue_list(self):
         db = self.get_database('db')
@@ -185,10 +200,10 @@ class PGQAdmin(skytools.DBScript):
         rows = curs.fetchall()
         db.commit()
         
-        list = []
+        qlist = []
         for r in rows:
-            list.append(r[0])
-        return list
+            qlist.append(r[0])
+        return qlist
 
 if __name__ == '__main__':
     script = PGQAdmin(sys.argv[1:])
index 61cfdc7753a27e8cdfeb4cf5ed243d894e625b21..2c93d4ef115da3b69e1db497b8e7e1d3cf7408e8 100755 (executable)
@@ -1,8 +1,11 @@
 #! /usr/bin/env python
 
-import sys, pgq.setadmin
+"""SetAdmin launcher.
+"""
+
+import sys, pgq.cascade.admin
 
 if __name__ == '__main__':
-    script = pgq.setadmin.SetAdmin('set_admin', sys.argv[1:])
+    script = pgq.cascade.admin.CascadeAdmin('cascade_admin', 'node_db', sys.argv[1:], worker_setup = False)
     script.start()