cascade: fix watermark publishing
authorMarko Kreen <markokr@gmail.com>
Sat, 2 May 2009 11:47:39 +0000 (14:47 +0300)
committerMarko Kreen <markokr@gmail.com>
Sat, 2 May 2009 11:47:39 +0000 (14:47 +0300)
* CascadedWorker:
  - fix time comparision in local wm handling
  - publish wm also for leaf nodes
  - fields node_type, node_name, local_watermark were missing in WorkerState
* pgq_node.get_node_info: on leaf node set local watermark to last consumer tick
* pgq_node.set_global_watermark: don't depend on exact code from pgq_node.get_node_info()

Trying to special-case leaf nodes does not seem to be good idea, because
then provider nodes also need to know subscriber node type.  So stop doing it.

python/pgq/cascade/worker.py
sql/pgq_node/functions/pgq_node.get_node_info.sql
sql/pgq_node/functions/pgq_node.set_global_watermark.sql

index b16f53d90439077ed5b04aabdc5b1187b123e9f6..20fbbf2688277cc95f9396cafb59e6765d438f67 100644 (file)
@@ -20,7 +20,7 @@ class WorkerState:
     process_batch = 0       # handled in CascadedConsumer
     copy_events = 0         # ok
     global_wm_event = 0     # ok
-    local_wm_publish = 0    # ok
+    local_wm_publish = 1    # ok
 
     process_events = 0      # ok
     send_tick_event = 0     # ok
@@ -31,17 +31,20 @@ class WorkerState:
     create_tick = 0         # ok
     filtered_copy = 0       # ok
     def __init__(self, queue_name, nst):
+        self.node_type = nst['node_type']
+        self.node_name = nst['node_name']
+        self.local_watermark = nst['local_watermark']
         ntype = nst['node_type']
         ctype = nst['combined_type']
         if ntype == 'root':
             self.global_wm_event = 1
+            self.local_wm_publish = 0
         elif ntype == 'branch':
             self.target_queue = queue_name
             self.process_batch = 1
             self.process_events = 1
             self.copy_events = 1
             self.process_tick_event = 1
-            self.local_wm_publish = 1
             self.keep_event_ids = 1
             self.create_tick = 1
         elif ntype == 'leaf' and not ctype:
@@ -141,9 +144,10 @@ class CascadedWorker(CascadedConsumer):
         if not self.main_worker:
             return
         t = time.time()
-        if t - self.local_wm_publish_time >= self.local_wm_publish_period:
+        if t - self.local_wm_publish_time < self.local_wm_publish_period:
             return
 
+        self.log.debug("Publishing local watermark: %d" % st.local_watermark)
         st = self._worker_state
         src_curs = src_db.cursor()
         q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
@@ -255,6 +259,7 @@ class CascadedWorker(CascadedConsumer):
         if t - self.global_wm_publish_time < self.global_wm_publish_period:
             return
 
+        self.log.debug("Publishing global watermark")
         dst_curs = dst_db.cursor()
         q = "select * from pgq_node.set_global_watermark(%s, NULL)"
         dst_curs.execute(q, [self.pgq_queue_name])
index 209787629804d3e61aa6149e859a9a7681a293ca..f64670a4b00b7dedacaff2c23fb73cd67d4db75e 100644 (file)
@@ -73,6 +73,8 @@ begin
              order by 1 desc
              limit 1;
         end if;
+    else
+        local_watermark := worker_last_tick;
     end if;
     return;
 end;
index a664830dbdd1843989f663b324dbc4725cd691a9..1f27177383863d738de0f77304232935ff1ddebc 100644 (file)
@@ -36,7 +36,7 @@ begin
             select f.ret_code, f.ret_note, f.local_watermark
                 into ret_code, ret_note, _wm
                 from pgq_node.get_node_info(i_queue_name) f;
-            if ret_code <> 200 then
+            if ret_code >= 300 then
                 return;
             end if;
             if _wm is null then