pgq_node.set_global_watermark: ignore missing ticks
authorMarko Kreen <markokr@gmail.com>
Mon, 8 Jun 2009 07:25:59 +0000 (10:25 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 8 Jun 2009 07:25:59 +0000 (10:25 +0300)
set_global_watermark() can be called with non-existing tick-ids
if worker is processing old batches.

sql/pgq_node/functions/pgq_node.set_global_watermark.sql

index b422a86b91be90e341401f65fa5cefb09e75cb2b..00a9bab5d0a14ed7b8388dd6724c3ee3af5dff9c 100644 (file)
@@ -20,12 +20,14 @@ declare
     _wm         bigint;
     wm_consumer text;
 begin
+    wm_consumer = '.global_watermark';
+
     select node_type, queue_name, worker_name into this
         from pgq_node.node_info
         where queue_name = i_queue_name
         for update;
     if not found then
-        select 200, 'Queue' || i_queue_name || ' not found'
+        select 404, 'Queue' || i_queue_name || ' not found'
           into ret_code, ret_note;
         return;
     end if;
@@ -47,15 +49,10 @@ begin
                 into ret_code, ret_note;
             return;
         end if;
-    end if;
 
-    -- move watermark on pgq
-    if this.node_type in ('root', 'branch') then
-        wm_consumer = '.global_watermark';
+        -- move watermark
         perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm);
-    end if;
 
-    if this.node_type = 'root' then
         -- send event downstream
         perform pgq.insert_event(i_queue_name, 'pgq.global-watermark', _wm::text,
                                  i_queue_name, null, null, null);
@@ -64,6 +61,26 @@ begin
             set last_tick_id = _wm
             where queue_name = i_queue_name
                 and consumer_name = this.worker_name;
+    elsif this.node_type = 'branch' then
+        -- tick can be missing if we are processing
+        -- old batches that set watermark outside
+        -- current range
+        perform 1 from pgq.tick t, pgq.queue q
+          where q.queue_name = i_queue_name
+            and t.tick_queue = q.queue_id
+            and t.tick_id = _wm;
+        if not found then
+            select 200, 'Skipping global watermark update to ' || _wm
+                into ret_code, ret_note;
+            return;
+        end if;
+
+        -- move watermark
+        perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm);
+    else
+        select 100, 'Ignoring global watermark in leaf'
+            into ret_code, ret_note;
+        return;
     end if;
 
     select 200, 'Global watermark set to ' || _wm