CascadedConsumer.__init__(self, service_name, db_name, args)
+ def reload(self):
+ CascadedConsumer.reload(self)
+
+ self.global_wm_publish_period = self.cf.getfloat('global_wm_publish_period', CascadedWorker.global_wm_publish_period)
+ self.local_wm_publish_period = self.cf.getfloat('local_wm_publish_period', CascadedWorker.local_wm_publish_period)
+
def process_remote_batch(self, src_db, tick_id, event_list, dst_db):
"""Worker-specific event processing."""
self.ev_buf = []
if t - self.local_wm_publish_time < self.local_wm_publish_period:
return
- self.log.debug("Publishing local watermark: %d" % st.local_watermark)
st = self._worker_state
+ self.log.debug("Publishing local watermark: %d" % st.local_watermark)
src_curs = src_db.cursor()
q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
src_curs.execute(q, [self.pgq_queue_name, st.node_name, st.local_watermark])