convert Syncer to new API (untested...)
authorMarko Kreen <markokr@gmail.com>
Tue, 3 Apr 2007 11:41:13 +0000 (11:41 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 3 Apr 2007 11:41:13 +0000 (11:41 +0000)
doc/TODO.txt
python/londiste/syncer.py

index efc8a54ecaeb43c3d62689e6afa88e14f3ed27ac..12f17b2320b4042431b6315e0e5b8df8993bdd77 100644 (file)
@@ -14,6 +14,7 @@ Immidiate
 * londiste swithcover support / deny triggers
 * deb: /etc/skylog.ini should be conffile
 * RemoteConsumer/SerialConsumer/pgq_ext sanity, too much duplication
+* londiste * remove tbl should work also if table is already dropped
 
 Near future
 ============
index eaee3468657aac407953ba6e847848658979cda4..6e1f2ce659926f4508ec30d1552c6fd4921ac59d 100644 (file)
@@ -26,14 +26,11 @@ class Syncer(skytools.DBScript):
         src_curs = src_db.cursor()
         
         # before locking anything check if consumer is working ok
-        q = "select extract(epoch from ticker_lag) from pgq.get_queue_list()"\
-                " where queue_name = %s"
+        q = "select extract(epoch from ticker_lag) from pgq.get_queue_info(%s)"
         src_curs.execute(q, [self.pgq_queue_name])
         ticker_lag = src_curs.fetchone()[0]
         q = "select extract(epoch from lag)"\
-            " from pgq.get_consumer_list()"\
-            " where queue_name = %s"\
-            "   and consumer_name = %s"
+            " from pgq.get_consumer_info(%s, %s)"
         src_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id])
         res = src_curs.fetchall()
         src_db.commit()
@@ -129,11 +126,9 @@ class Syncer(skytools.DBScript):
         while 1:
             time.sleep(0.2)
 
-            q = """select now() - lag > %s, now(), lag
-                     from pgq.get_consumer_list()
-                   where consumer_name = %s
-                     and queue_name = %s"""
-            src_curs.execute(q, [tpos, self.pgq_consumer_id, self.pgq_queue_name])
+            q = "select now() - lag > %s, now(), lag"\
+                " from pgq.get_consumer_info(%s, %s)"
+            src_curs.execute(q, [tpos, self.pgq_queue_name, self.pgq_consumer_id])
             res = src_curs.fetchall()
             src_db.commit()