cst = self._consumer_state
if cst['completed_tick'] >= tick_id:
return
- time.sleep(10 * self.loop_delay)
+ self.sleep(10 * self.loop_delay)
self._consumer_state = self.refresh_state(dst_db)
+ if not self.looping:
+ sys.exit(0)
def is_batch_done(self, state, batch_info):
wst = self._worker_state
"""Worker-specific cleanup on target node.
"""
+ # merge-leaf on branch should not update tick pos
+ wst = self._worker_state
+ if wst.wait_behind:
+ dst_db.commit()
+ return
+
if self.main_worker:
st = self._worker_state
dst_curs = dst_db.cursor()