pgq.SerialConsumer: use functions instead of direct access at tables
authorMarko Kreen <markokr@gmail.com>
Fri, 20 Jul 2007 16:25:12 +0000 (16:25 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 20 Jul 2007 16:25:12 +0000 (16:25 +0000)
python/pgq/consumer.py

index 8160f1d1beaea237ea5532dc2dc92b563e705f0d..4b9c221b2bceff0f6588abcc18c3a2d18ff9d0f6 100644 (file)
@@ -265,7 +265,7 @@ class SerialConsumer(Consumer):
     def __init__(self, service_name, db_name, remote_db, args):
         Consumer.__init__(self, service_name, db_name, args)
         self.remote_db = remote_db
-        self.dst_completed_table = "pgq_ext.completed_tick"
+        self.dst_schema = "pgq_ext"
         self.cur_batch_info = None
 
     def startup(self):
@@ -319,8 +319,7 @@ class SerialConsumer(Consumer):
 
         prev_tick = self.cur_batch_info['prev_tick_id']
 
-        q = "select last_tick_id from %s where consumer_id = %%s" % (
-                self.dst_completed_table ,)
+        q = "select %s.get_last_tick(%%s)" % self.dst_schema
         dst_curs.execute(q, [self.consumer_id])
         res = dst_curs.fetchone()
 
@@ -347,12 +346,8 @@ class SerialConsumer(Consumer):
         in external database.
         """
         tick_id = self.cur_batch_info['tick_id']
-        q = "delete from %s where consumer_id = %%s; "\
-            "insert into %s (consumer_id, last_tick_id) values (%%s, %%s)" % (
-                    self.dst_completed_table,
-                    self.dst_completed_table)
-        dst_curs.execute(q, [ self.consumer_id,
-                              self.consumer_id, tick_id ])
+        q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
+        dst_curs.execute(q, [ self.consumer_id, tick_id ])
 
     def attach(self):
         new = Consumer.attach(self)
@@ -369,9 +364,7 @@ class SerialConsumer(Consumer):
         self.log.info("removing completed tick from dst")
         dst_db = self.get_database(self.remote_db)
         dst_curs = dst_db.cursor()
-
-        q = "delete from %s where consumer_id = %%s" % (
-                self.dst_completed_table,)
+        q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
         dst_curs.execute(q, [self.consumer_id])
         dst_db.commit()
 
@@ -385,13 +378,12 @@ class SerialConsumer(Consumer):
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
 
-        q = "select last_tick_id from %s where consumer_id = %%s" % (
-                self.dst_completed_table,)
+        q = "select %s.get_last_tick(%%s)" % self.dst_schema
         dst_curs.execute(q, [self.consumer_id])
         row = dst_curs.fetchone()
         if row:
             dst_tick = row[0]
-            q = "select pgq.register_consumer(%s, %s, %s)"
+            q = "select pgq.register_consumer_at(%s, %s, %s)"
             src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick])
         else:
             self.log.warning('No tick found on dst side')
@@ -403,9 +395,7 @@ class SerialConsumer(Consumer):
         self.log.info("Resetting queue tracking on dst side")
         dst_db = self.get_database(self.remote_db)
         dst_curs = dst_db.cursor()
-
-        q = "delete from %s where consumer_id = %%s" % (
-                self.dst_completed_table,)
+        q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
         dst_curs.execute(q, [self.consumer_id])
         dst_db.commit()