pgq.CoopConsumer for Python
authorMarko Kreen <markokr@gmail.com>
Wed, 9 Sep 2009 10:52:58 +0000 (13:52 +0300)
committerMarko Kreen <markokr@gmail.com>
Wed, 9 Sep 2009 10:52:58 +0000 (13:52 +0300)
Simply wrapper around Consumer which redirects few calls to pgq_coop schema.

python/pgq/__init__.py
python/pgq/consumer.py
python/pgq/coopconsumer.py [new file with mode: 0644]

index b34ef625c1296553fcf9134170a7528fe3b918f5..638ee372e5545a593326586ebaf251ccc51517f3 100644 (file)
@@ -19,6 +19,7 @@ import pgq.cascade.worker
 
 from pgq.event import *
 from pgq.consumer import *
+from pgq.coopconsumer import *
 from pgq.remoteconsumer import *
 from pgq.producer import *
 
@@ -34,6 +35,7 @@ from pgq.cascade.worker import *
 __all__ = (
     pgq.event.__all__ +
     pgq.consumer.__all__ +
+    pgq.coopconsumer.__all__ +
     pgq.remoteconsumer.__all__ +
     pgq.cascade.nodeinfo.__all__ +
     pgq.cascade.admin.__all__ +
index a23b882e1d5f8e71d7efadfb8131c551393eff52..32fa9f76f97f11408b120043fad03ed4d38e89af 100644 (file)
@@ -245,8 +245,8 @@ class Consumer(skytools.DBScript):
         curs.execute(q, [self.queue_name, self.consumer_name])
         return curs.fetchone()[0]
 
-    def _finish_batch(self, curs, batch_id, list):
-        """Tag events and notify that the batch is done."""
+    def _flush_retry(self, curs, list):
+        """Tag retry events."""
 
         retry = 0
         if self.pgq_lazy_fetch:
@@ -269,6 +269,11 @@ class Consumer(skytools.DBScript):
         if retry:
             self.stat_increase('retry-events', retry)
 
+    def _finish_batch(self, curs, batch_id, list):
+        """Tag events and notify that the batch is done."""
+
+        self._flush_retry(curs, batch_id, list)
+
         curs.execute("select pgq.finish_batch(%s)", [batch_id])
 
     def _tag_retry(self, cx, batch_id, ev_id, retry_time):
diff --git a/python/pgq/coopconsumer.py b/python/pgq/coopconsumer.py
new file mode 100644 (file)
index 0000000..3e6cdec
--- /dev/null
@@ -0,0 +1,63 @@
+
+"""PgQ cooperative consumer for Python.
+"""
+
+from pgq.consumer import Consumer
+
+__all__ = ['CoopConsumer']
+
+class CoopConsumer(Consumer):
+    """Cooperative Consumer base class.
+
+    There will be one dbscript process per subconsumer.
+    """
+
+    def __init__(self, service_name, db_name, args):
+        """Initialize new subconsumer.
+
+        @param service_name: service_name for DBScript
+        @param db_name: name of database for get_database()
+        @param args: cmdline args for DBScript
+        """
+
+        Consumer.__init__(self, service_name, db_name, args)
+
+        self.subconsumer_name = self.cf.get("subconsumer_name")
+
+    def register_consumer(self):
+        """Registration for subconsumer."""
+
+        self.log.info("Registering consumer on source queue")
+        db = self.get_database(self.db_name)
+        cx = db.cursor()
+        cx.execute("select pgq_coop.register_subconsumer(%s, %s, %s)",
+                [self.queue_name, self.consumer_name, self.subconsumer_name])
+        res = cx.fetchone()[0]
+        db.commit()
+
+        return res
+
+    def unregister_consumer(self):
+        """Unregistration for subconsumer."""
+
+        self.log.info("Unregistering consumer from source queue")
+        db = self.get_database(self.db_name)
+        cx = db.cursor()
+        cx.execute("select pgq_coop.unregister_consumer(%s, %s, %s)",
+                    [self.queue_name, self.consumer_name, self.subconsumer_name])
+        db.commit()
+
+
+    def _load_next_batch(self, curs):
+        """Allocate next batch. (internal)"""
+
+        q = "select pgq_coop.next_batch(%s, %s, %s)"
+        curs.execute(q, [self.queue_name, self.consumer_name, self.subconsumer_name])
+        return curs.fetchone()[0]
+
+    def _finish_batch(self, curs, batch_id, list):
+        """Finish batch. (internal)"""
+
+        self._flush_retry(curs, batch_id, list)
+        curs.execute("select pgq_coop.finish_batch(%s)", [batch_id])
+