process_batch = 0 # handled in CascadedConsumer
copy_events = 0 # ok
global_wm_event = 0 # ok
- local_wm_publish = 0 # ok
+ local_wm_publish = 1 # ok
process_events = 0 # ok
send_tick_event = 0 # ok
create_tick = 0 # ok
filtered_copy = 0 # ok
def __init__(self, queue_name, nst):
+ self.node_type = nst['node_type']
+ self.node_name = nst['node_name']
+ self.local_watermark = nst['local_watermark']
ntype = nst['node_type']
ctype = nst['combined_type']
if ntype == 'root':
self.global_wm_event = 1
+ self.local_wm_publish = 0
elif ntype == 'branch':
self.target_queue = queue_name
self.process_batch = 1
self.process_events = 1
self.copy_events = 1
self.process_tick_event = 1
- self.local_wm_publish = 1
self.keep_event_ids = 1
self.create_tick = 1
elif ntype == 'leaf' and not ctype:
if not self.main_worker:
return
t = time.time()
- if t - self.local_wm_publish_time >= self.local_wm_publish_period:
+ if t - self.local_wm_publish_time < self.local_wm_publish_period:
return
+ self.log.debug("Publishing local watermark: %d" % st.local_watermark)
st = self._worker_state
src_curs = src_db.cursor()
q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
if t - self.global_wm_publish_time < self.global_wm_publish_period:
return
+ self.log.debug("Publishing global watermark")
dst_curs = dst_db.cursor()
q = "select * from pgq_node.set_global_watermark(%s, NULL)"
dst_curs.execute(q, [self.pgq_queue_name])
select f.ret_code, f.ret_note, f.local_watermark
into ret_code, ret_note, _wm
from pgq_node.get_node_info(i_queue_name) f;
- if ret_code <> 200 then
+ if ret_code >= 300 then
return;
end if;
if _wm is null then