CascadedWorker: call pgq_node.set_global_watermark() only on branch
authorMarko Kreen <markokr@gmail.com>
Wed, 26 Jan 2011 14:54:26 +0000 (16:54 +0200)
committerMarko Kreen <markokr@gmail.com>
Wed, 26 Jan 2011 14:54:26 +0000 (16:54 +0200)
python/pgq/cascade/worker.py

index beb13dd59699483ea40f34b4dc7923f4e9356999..711f3fc8db049815c5dfc83925d4588a9fd55617 100644 (file)
@@ -30,6 +30,8 @@ class WorkerState:
     keep_event_ids = 0      # ok
     create_tick = 0         # ok
     filtered_copy = 0       # ok
+    process_global_wm = 0   # ok
+
     def __init__(self, queue_name, nst):
         self.node_type = nst['node_type']
         self.node_name = nst['node_name']
@@ -47,6 +49,7 @@ class WorkerState:
             self.process_tick_event = 1
             self.keep_event_ids = 1
             self.create_tick = 1
+            self.process_global_wm = 1
         elif ntype == 'leaf' and not ctype:
             self.process_batch = 1
             self.process_events = 1
@@ -228,6 +231,7 @@ class CascadedWorker(CascadedConsumer):
             raise Exception("bad event in queue: "+str(ev))
 
         self.log.info("got cascade event: %s" % t)
+        st = self._worker_state
         if t == "pgq.location-info":
             node = ev.ev_data
             loc = ev.ev_extra2
@@ -235,14 +239,14 @@ class CascadedWorker(CascadedConsumer):
             q = "select * from pgq_node.register_location(%s, %s, %s, %s)"
             dst_curs.execute(q, [self.pgq_queue_name, node, loc, dead])
         elif t == "pgq.global-watermark":
-            tick_id = int(ev.ev_data)
-            q = "select * from pgq_node.set_global_watermark(%s, %s)"
-            dst_curs.execute(q, [self.pgq_queue_name, tick_id])
+            if st.process_global_wm:
+                tick_id = int(ev.ev_data)
+                q = "select * from pgq_node.set_global_watermark(%s, %s)"
+                dst_curs.execute(q, [self.pgq_queue_name, tick_id])
         elif t == "pgq.tick-id":
             tick_id = int(ev.ev_data)
             if ev.ev_extra1 == self.pgq_queue_name:
                 raise Exception('tick-id event for own queue?')
-            st = self._worker_state
             if st.process_tick_event:
                 q = "select * from pgq_node.set_partition_watermark(%s, %s, %s)"
                 dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id])