From 0801a025297e412cc142680798bdd01d2d887f88 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Sat, 2 May 2009 14:47:39 +0300 Subject: [PATCH] cascade: fix watermark publishing * CascadedWorker: - fix time comparision in local wm handling - publish wm also for leaf nodes - fields node_type, node_name, local_watermark were missing in WorkerState * pgq_node.get_node_info: on leaf node set local watermark to last consumer tick * pgq_node.set_global_watermark: don't depend on exact code from pgq_node.get_node_info() Trying to special-case leaf nodes does not seem to be good idea, because then provider nodes also need to know subscriber node type. So stop doing it. --- python/pgq/cascade/worker.py | 11 ++++++++--- sql/pgq_node/functions/pgq_node.get_node_info.sql | 2 ++ .../functions/pgq_node.set_global_watermark.sql | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index b16f53d9..20fbbf26 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -20,7 +20,7 @@ class WorkerState: process_batch = 0 # handled in CascadedConsumer copy_events = 0 # ok global_wm_event = 0 # ok - local_wm_publish = 0 # ok + local_wm_publish = 1 # ok process_events = 0 # ok send_tick_event = 0 # ok @@ -31,17 +31,20 @@ class WorkerState: create_tick = 0 # ok filtered_copy = 0 # ok def __init__(self, queue_name, nst): + self.node_type = nst['node_type'] + self.node_name = nst['node_name'] + self.local_watermark = nst['local_watermark'] ntype = nst['node_type'] ctype = nst['combined_type'] if ntype == 'root': self.global_wm_event = 1 + self.local_wm_publish = 0 elif ntype == 'branch': self.target_queue = queue_name self.process_batch = 1 self.process_events = 1 self.copy_events = 1 self.process_tick_event = 1 - self.local_wm_publish = 1 self.keep_event_ids = 1 self.create_tick = 1 elif ntype == 'leaf' and not ctype: @@ -141,9 +144,10 @@ class CascadedWorker(CascadedConsumer): if not self.main_worker: return t = time.time() - if t - self.local_wm_publish_time >= self.local_wm_publish_period: + if t - self.local_wm_publish_time < self.local_wm_publish_period: return + self.log.debug("Publishing local watermark: %d" % st.local_watermark) st = self._worker_state src_curs = src_db.cursor() q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)" @@ -255,6 +259,7 @@ class CascadedWorker(CascadedConsumer): if t - self.global_wm_publish_time < self.global_wm_publish_period: return + self.log.debug("Publishing global watermark") dst_curs = dst_db.cursor() q = "select * from pgq_node.set_global_watermark(%s, NULL)" dst_curs.execute(q, [self.pgq_queue_name]) diff --git a/sql/pgq_node/functions/pgq_node.get_node_info.sql b/sql/pgq_node/functions/pgq_node.get_node_info.sql index 20978762..f64670a4 100644 --- a/sql/pgq_node/functions/pgq_node.get_node_info.sql +++ b/sql/pgq_node/functions/pgq_node.get_node_info.sql @@ -73,6 +73,8 @@ begin order by 1 desc limit 1; end if; + else + local_watermark := worker_last_tick; end if; return; end; diff --git a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql index a664830d..1f271773 100644 --- a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql +++ b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql @@ -36,7 +36,7 @@ begin select f.ret_code, f.ret_note, f.local_watermark into ret_code, ret_note, _wm from pgq_node.get_node_info(i_queue_name) f; - if ret_code <> 200 then + if ret_code >= 300 then return; end if; if _wm is null then -- 2.39.5