def __init__(self, service_name, db_name, remote_db, args):
Consumer.__init__(self, service_name, db_name, args)
self.remote_db = remote_db
- self.dst_completed_table = "pgq_ext.completed_tick"
+ self.dst_schema = "pgq_ext"
self.cur_batch_info = None
def startup(self):
prev_tick = self.cur_batch_info['prev_tick_id']
- q = "select last_tick_id from %s where consumer_id = %%s" % (
- self.dst_completed_table ,)
+ q = "select %s.get_last_tick(%%s)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
res = dst_curs.fetchone()
in external database.
"""
tick_id = self.cur_batch_info['tick_id']
- q = "delete from %s where consumer_id = %%s; "\
- "insert into %s (consumer_id, last_tick_id) values (%%s, %%s)" % (
- self.dst_completed_table,
- self.dst_completed_table)
- dst_curs.execute(q, [ self.consumer_id,
- self.consumer_id, tick_id ])
+ q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
+ dst_curs.execute(q, [ self.consumer_id, tick_id ])
def attach(self):
new = Consumer.attach(self)
self.log.info("removing completed tick from dst")
dst_db = self.get_database(self.remote_db)
dst_curs = dst_db.cursor()
-
- q = "delete from %s where consumer_id = %%s" % (
- self.dst_completed_table,)
+ q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
dst_db.commit()
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- q = "select last_tick_id from %s where consumer_id = %%s" % (
- self.dst_completed_table,)
+ q = "select %s.get_last_tick(%%s)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
row = dst_curs.fetchone()
if row:
dst_tick = row[0]
- q = "select pgq.register_consumer(%s, %s, %s)"
+ q = "select pgq.register_consumer_at(%s, %s, %s)"
src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick])
else:
self.log.warning('No tick found on dst side')
self.log.info("Resetting queue tracking on dst side")
dst_db = self.get_database(self.remote_db)
dst_curs = dst_db.cursor()
-
- q = "delete from %s where consumer_id = %%s" % (
- self.dst_completed_table,)
+ q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
dst_db.commit()