From 42935e8107c3f4eb787b1cb7d55298e3c1b4de41 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Mon, 8 Jun 2009 10:25:59 +0300 Subject: [PATCH] pgq_node.set_global_watermark: ignore missing ticks set_global_watermark() can be called with non-existing tick-ids if worker is processing old batches. --- .../pgq_node.set_global_watermark.sql | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql index b422a86b..00a9bab5 100644 --- a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql +++ b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql @@ -20,12 +20,14 @@ declare _wm bigint; wm_consumer text; begin + wm_consumer = '.global_watermark'; + select node_type, queue_name, worker_name into this from pgq_node.node_info where queue_name = i_queue_name for update; if not found then - select 200, 'Queue' || i_queue_name || ' not found' + select 404, 'Queue' || i_queue_name || ' not found' into ret_code, ret_note; return; end if; @@ -47,15 +49,10 @@ begin into ret_code, ret_note; return; end if; - end if; - -- move watermark on pgq - if this.node_type in ('root', 'branch') then - wm_consumer = '.global_watermark'; + -- move watermark perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm); - end if; - if this.node_type = 'root' then -- send event downstream perform pgq.insert_event(i_queue_name, 'pgq.global-watermark', _wm::text, i_queue_name, null, null, null); @@ -64,6 +61,26 @@ begin set last_tick_id = _wm where queue_name = i_queue_name and consumer_name = this.worker_name; + elsif this.node_type = 'branch' then + -- tick can be missing if we are processing + -- old batches that set watermark outside + -- current range + perform 1 from pgq.tick t, pgq.queue q + where q.queue_name = i_queue_name + and t.tick_queue = q.queue_id + and t.tick_id = _wm; + if not found then + select 200, 'Skipping global watermark update to ' || _wm + into ret_code, ret_note; + return; + end if; + + -- move watermark + perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm); + else + select 100, 'Ignoring global watermark in leaf' + into ret_code, ret_note; + return; end if; select 200, 'Global watermark set to ' || _wm -- 2.39.5