_wm bigint;
wm_consumer text;
begin
+ wm_consumer = '.global_watermark';
+
select node_type, queue_name, worker_name into this
from pgq_node.node_info
where queue_name = i_queue_name
for update;
if not found then
- select 200, 'Queue' || i_queue_name || ' not found'
+ select 404, 'Queue' || i_queue_name || ' not found'
into ret_code, ret_note;
return;
end if;
into ret_code, ret_note;
return;
end if;
- end if;
- -- move watermark on pgq
- if this.node_type in ('root', 'branch') then
- wm_consumer = '.global_watermark';
+ -- move watermark
perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm);
- end if;
- if this.node_type = 'root' then
-- send event downstream
perform pgq.insert_event(i_queue_name, 'pgq.global-watermark', _wm::text,
i_queue_name, null, null, null);
set last_tick_id = _wm
where queue_name = i_queue_name
and consumer_name = this.worker_name;
+ elsif this.node_type = 'branch' then
+ -- tick can be missing if we are processing
+ -- old batches that set watermark outside
+ -- current range
+ perform 1 from pgq.tick t, pgq.queue q
+ where q.queue_name = i_queue_name
+ and t.tick_queue = q.queue_id
+ and t.tick_id = _wm;
+ if not found then
+ select 200, 'Skipping global watermark update to ' || _wm
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- move watermark
+ perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm);
+ else
+ select 100, 'Ignoring global watermark in leaf'
+ into ret_code, ret_note;
+ return;
end if;
select 200, 'Global watermark set to ' || _wm