keep_event_ids = 0 # ok
create_tick = 0 # ok
filtered_copy = 0 # ok
+ process_global_wm = 0 # ok
+
def __init__(self, queue_name, nst):
self.node_type = nst['node_type']
self.node_name = nst['node_name']
self.process_tick_event = 1
self.keep_event_ids = 1
self.create_tick = 1
+ self.process_global_wm = 1
elif ntype == 'leaf' and not ctype:
self.process_batch = 1
self.process_events = 1
raise Exception("bad event in queue: "+str(ev))
self.log.info("got cascade event: %s" % t)
+ st = self._worker_state
if t == "pgq.location-info":
node = ev.ev_data
loc = ev.ev_extra2
q = "select * from pgq_node.register_location(%s, %s, %s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, node, loc, dead])
elif t == "pgq.global-watermark":
- tick_id = int(ev.ev_data)
- q = "select * from pgq_node.set_global_watermark(%s, %s)"
- dst_curs.execute(q, [self.pgq_queue_name, tick_id])
+ if st.process_global_wm:
+ tick_id = int(ev.ev_data)
+ q = "select * from pgq_node.set_global_watermark(%s, %s)"
+ dst_curs.execute(q, [self.pgq_queue_name, tick_id])
elif t == "pgq.tick-id":
tick_id = int(ev.ev_data)
if ev.ev_extra1 == self.pgq_queue_name:
raise Exception('tick-id event for own queue?')
- st = self._worker_state
if st.process_tick_event:
q = "select * from pgq_node.set_partition_watermark(%s, %s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id])