pgq.LocalConsumer: consumer that tracks processed ticks in local file
authorMarko Kreen <markokr@gmail.com>
Mon, 9 Jul 2012 11:46:48 +0000 (14:46 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 13 Jul 2012 19:44:24 +0000 (22:44 +0300)
python/pgq/__init__.py
python/pgq/localconsumer.py [new file with mode: 0644]
tests/localconsumer/init.sh [new file with mode: 0755]
tests/localconsumer/regen.sh [new file with mode: 0755]
tests/localconsumer/testconsumer.py [new file with mode: 0755]

index 638ee372e5545a593326586ebaf251ccc51517f3..dc6ece295a57a22c5bc6562ceb55f8faf6829efe 100644 (file)
@@ -21,6 +21,7 @@ from pgq.event import *
 from pgq.consumer import *
 from pgq.coopconsumer import *
 from pgq.remoteconsumer import *
+from pgq.localconsumer import *
 from pgq.producer import *
 
 from pgq.ticker import *
@@ -37,6 +38,7 @@ __all__ = (
     pgq.consumer.__all__ +
     pgq.coopconsumer.__all__ +
     pgq.remoteconsumer.__all__ +
+    pgq.localconsumer.__all__ +
     pgq.cascade.nodeinfo.__all__ +
     pgq.cascade.admin.__all__ +
     pgq.cascade.consumer.__all__ +
diff --git a/python/pgq/localconsumer.py b/python/pgq/localconsumer.py
new file mode 100644 (file)
index 0000000..780f921
--- /dev/null
@@ -0,0 +1,211 @@
+
+"""
+Consumer that stores last applied position in local file.
+
+For cases where the consumer cannot use single database for remote tracking.
+
+"""
+
+import sys
+import os
+import errno
+import skytools
+import pgq
+
+__all__ = ['LocalConsumer']
+
+class LocalConsumer(pgq.Consumer):
+    """Consumer that applies batches sequentially in second database.
+
+    Requirements:
+     - Whole batch in one TX.
+     - Must not use retry queue.
+
+    Features:
+     - Can detect if several batches are already applied to dest db.
+     - If some ticks are lost. allows to seek back on queue.
+       Whether it succeeds, depends on pgq configuration.
+
+    Config options::
+
+        ## Parameters for LocalConsumer ##
+
+        # file location where last applied tick is tracked
+        local_tracking_file = ~/state/%(job_name)s.tick
+    """
+
+    def reload(self):
+        super(LocalConsumer, self).reload()
+
+        self.local_tracking_file = self.cf.getfile('local_tracking_file')
+
+    def init_optparse(self, parser = None):
+        p = super(LocalConsumer, self).init_optparse(parser)
+        p.add_option("--rewind", action = "store_true",
+                help = "change queue position according to local tick")
+        p.add_option("--reset", action = "store_true",
+                help = "reset local tick based on queue position")
+        return p
+
+    def startup(self):
+        if self.options.rewind:
+            self.rewind()
+            sys.exit(0)
+        if self.options.reset:
+            self.dst_reset()
+            sys.exit(0)
+        super(LocalConsumer, self).startup()
+
+        self.check_queue()
+
+    def check_queue(self):
+        queue_tick = -1
+        local_tick = self.load_local_tick()
+
+        db = self.get_database(self.db_name)
+        curs = db.cursor()
+        q = "select last_tick from pgq.get_consumer_info(%s, %s)"
+        curs.execute(q, [self.queue_name, self.consumer_name])
+        rows = curs.fetchall()
+        if len(rows) == 1:
+            queue_tick = rows[0]['last_tick']
+        db.commit()
+
+        if queue_tick < 0:
+            if local_tick >= 0:
+                self.log.info("Registering consumer at tick %d", local_tick)
+                q = "select * from pgq.register_consumer_at(%s, %s, %s)"
+                curs.execute(q, [self.queue_name, self.consumer_name, local_tick])
+            else:
+                self.log.info("Registering consumer at queue top")
+                q = "select * from pgq.register_consumer(%s, %s)"
+                curs.execute(q, [self.queue_name, self.consumer_name])
+        elif local_tick < 0:
+            self.log.info("Local tick missing, storing queueu tick %d", queue_tick)
+            self.save_local_tick(queue_tick)
+        elif local_tick > queue_tick:
+            self.log.warning("Tracking out of sync: queue=%d local=%d.  Repositioning on queue.  [Database failure?]",
+                             queue_tick, local_tick)
+            q = "select * from pgq.register_consumer_at(%s, %s, %s)"
+            curs.execute(q, [self.queue_name, self.consumer_name, local_tick])
+        elif local_tick < queue_tick:
+            self.log.warning("Tracking out of sync: queue=%d local=%d.  Rewinding queue.  [Lost file data?]",
+                             queue_tick, local_tick)
+            q = "select * from pgq.register_consumer_at(%s, %s, %s)"
+            curs.execute(q, [self.queue_name, self.consumer_name, local_tick])
+        else:
+            self.log.info("Ticks match: Queue=%d Local=%d", queue_tick, local_tick)
+
+    def work(self):
+        if self.work_state < 0:
+            self.check_queue()
+        return super(LocalConsumer, self).work()
+
+    def process_batch(self, db, batch_id, event_list):
+        """Process all events in batch.
+        """
+
+        # check if done
+        if self.is_batch_done():
+            return
+
+        # actual work
+        self.process_local_batch(db, batch_id, event_list)
+
+        # finish work
+        self.set_batch_done()
+
+    def process_local_batch(self, db, batch_id, event_list):
+        for ev in event_list:
+            self.process_local_event(db, batch_id, ev)
+
+    def process_local_event(self, db, batch_id, ev):
+        raise Exception('process_remote_batch not implemented')
+
+    def is_batch_done(self):
+        """Helper function to keep track of last successful batch
+        in external database.
+        """
+
+        local_tick = self.load_local_tick()
+
+        cur_tick = self.batch_info['tick_id']
+        prev_tick = self.batch_info['prev_tick_id']
+
+        if local_tick < 0:
+            # seems this consumer has not run yet?
+            return False
+
+        if prev_tick == local_tick:
+            # on track
+            return False
+
+        if cur_tick == local_tick:
+            # current batch is already applied, skip it
+            return True
+
+        # anything else means problems
+        raise Exception('Lost position: batch %d..%d, dst has %d' % (
+                        prev_tick, cur_tick, local_tick))
+
+    def set_batch_done(self):
+        """Helper function to set last successful batch
+        in external database.
+        """
+        tick_id = self.batch_info['tick_id']
+        self.save_local_tick(tick_id)
+
+    def register_consumer(self):
+        new = super(LocalConsumer, self).register_consumer()
+        if new: # fixme
+            self.dst_reset()
+
+    def unregister_consumer(self):
+        """If unregistering, also clean completed tick table on dest."""
+
+        super(LocalConsumer, self).unregister_consumer()
+        self.dst_reset()
+
+    def rewind(self):
+        dst_tick = self.load_local_tick()
+        if dst_tick >= 0:
+            src_db = self.get_database(self.db_name)
+            src_curs = src_db.cursor()
+
+            self.log.info("Rewinding queue to tick local tick %d", dst_tick)
+            q = "select pgq.register_consumer_at(%s, %s, %s)"
+            src_curs.execute(q, [self.queue_name, self.consumer_name, dst_tick])
+
+            src_db.commit()
+        else:
+            self.log.error('Cannot rewind, no tick found in local file')
+        
+    def dst_reset(self):
+        self.log.info("Removing local tracking file")
+        try:
+            os.remove(self.local_tracking_file)
+        except:
+            pass
+
+    def load_local_tick(self):
+        """Reads stored tick or -1."""
+        try:
+            f = open(self.local_tracking_file, 'r')
+            buf = f.read()
+            f.close()
+            data = buf.strip()
+            if data:
+                tick_id = int(data)
+            else:
+                tick_id = -1
+            return tick_id
+        except IOError, ex:
+            if ex.errno == errno.ENOENT:
+                return -1
+            raise
+
+    def save_local_tick(self, tick_id):
+        """Store tick in local file."""
+        data = str(tick_id)
+        skytools.write_atomic(self.local_tracking_file, data)
+
diff --git a/tests/localconsumer/init.sh b/tests/localconsumer/init.sh
new file mode 100755 (executable)
index 0000000..0b88415
--- /dev/null
@@ -0,0 +1,9 @@
+#! /bin/sh
+
+. ../env.sh
+
+mkdir -p log pid
+
+dropdb qdb
+createdb qdb
+
diff --git a/tests/localconsumer/regen.sh b/tests/localconsumer/regen.sh
new file mode 100755 (executable)
index 0000000..f0e6cf8
--- /dev/null
@@ -0,0 +1,47 @@
+#! /bin/sh
+
+. ../testlib.sh
+
+for db in qdb; do
+  cleardb $db
+done
+
+rm -f log/*.log
+mkdir -p state
+rm -f state/*
+
+set -e
+
+title LocalConsumer test
+
+title2 Initialization
+
+msg Install PgQ
+
+run_qadmin qdb "install pgq;"
+run_qadmin qdb "create queue test_queue;"
+
+msg Run ticker
+
+cat_file conf/pgqd.ini <<EOF
+[pgqd]
+database_list = qdb
+logfile = log/pgqd.log
+pidfile = pid/pgqd.pid
+EOF
+
+run pgqd -d conf/pgqd.ini
+
+msg Run consumer
+
+cat_file conf/testconsumer_qdb.ini <<EOF
+[testconsumer]
+queue_name = test_queue
+db = dbname=qdb
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+local_tracking_file = state/%(job_name)s.tick
+EOF
+
+run ./testconsumer.py -v conf/testconsumer_qdb.ini
+
diff --git a/tests/localconsumer/testconsumer.py b/tests/localconsumer/testconsumer.py
new file mode 100755 (executable)
index 0000000..bf4e836
--- /dev/null
@@ -0,0 +1,12 @@
+#! /usr/bin/env python
+
+import sys, time, skytools, pgq
+
+class TestLocalConsumer(pgq.LocalConsumer):
+    def process_local_event(self, src_db, batch_id, ev):
+        self.log.info("event: type=%s data=%s", ev.type, ev.data)
+
+if __name__ == '__main__':
+    script = TestLocalConsumer('testconsumer', 'db', sys.argv[1:])
+    script.start()
+