move londiste & pgq_node maint into pgq.maint_operations
authorMarko Kreen <markokr@gmail.com>
Tue, 12 Oct 2010 11:58:29 +0000 (14:58 +0300)
committerMarko Kreen <markokr@gmail.com>
Tue, 12 Oct 2010 11:58:29 +0000 (14:58 +0300)
sql/pgq/functions/pgq.maint_operations.sql
sql/pgq_node/expected/pgq_node_test.out
sql/pgq_node/functions/pgq_node.maint_watermark.sql [new file with mode: 0644]
sql/pgq_node/sql/pgq_node_test.sql
sql/pgq_node/structure/functions.sql

index d03b5bd65ac833842524817a54eab1f056da8093..f3b84420fc032d1b5052c2fb2c89148187dfbb01 100644 (file)
@@ -67,6 +67,45 @@ begin
         return next;
     end loop;
 
+    --
+    -- pgq_node & londiste
+    --
+    -- although they belong to queue_extra_maint, they are
+    -- common enough so its more effective to handle them here.
+    --
+
+    perform 1 from pg_proc p, pg_namespace n
+      where p.pronamespace = n.oid
+        and n.nspname = 'pgq_node'
+        and p.proname = 'maint_watermark';
+    if found then
+        func_name := 'pgq_node.maint_watermark';
+        for func_arg in
+            select n.queue_name
+              from pgq_node n
+              where n.node_type = 'root'
+        loop
+            return next;
+        end loop;
+    end if;
+
+    perform 1 from pg_proc p, pg_namespace n
+      where p.pronamespace = n.oid
+        and n.nspname = 'londiste'
+        and p.proname = 'root_check_seqs';
+    if found then
+        func_name := 'londiste.root_check_seqs';
+        for func_arg in
+            select distinct s.queue_name
+              from londiste.seq_info s, pgq_node n
+              where s.local
+                and n.node_type = 'root'
+                and n.queue_name = s.queue_name
+        loop
+            return next;
+        end loop;
+    end if;
+
     return;
 end;
 $$ language plpgsql;
index e7c74d8c7dd689530eb994a6f66841940a7f3a3e..47195a2a05d3f05ec5fe03cc3e6088a35bc002bd 100644 (file)
@@ -84,6 +84,18 @@ select * from pgq_node.register_subscriber('aqueue', 'node3', 'node3_worker', nu
       200 | Subscriber registered: node3 |                1
 (1 row)
 
+select * from pgq_node.maint_watermark('aqueue');
+ maint_watermark 
+-----------------
+               0
+(1 row)
+
+select * from pgq_node.maint_watermark('aqueue-x');
+ maint_watermark 
+-----------------
+               0
+(1 row)
+
 select * from pgq_node.get_consumer_info('aqueue');
  consumer_name | provider_node | last_tick_id | paused | uptodate | cur_error 
 ---------------+---------------+--------------+--------+----------+-----------
diff --git a/sql/pgq_node/functions/pgq_node.maint_watermark.sql b/sql/pgq_node/functions/pgq_node.maint_watermark.sql
new file mode 100644 (file)
index 0000000..cdf0c0c
--- /dev/null
@@ -0,0 +1,31 @@
+
+create or replace function pgq_node.maint_watermark(i_queue_name text)
+returns int4 as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_node.maint_watermark(1)
+--
+--      Move global watermark on root node.
+--
+-- Returns:
+--      0 - tells pgqd to call just once
+-- ----------------------------------------------------------------------
+declare
+    _lag interval;
+begin
+    perform 1 from pgq_node.node_info
+      where queue_name = i_queue_name
+        and node_type = 'root'
+      for update;
+    if not found then
+        return 0;
+    end if;
+
+    select lag into _lag from pgq.get_consumer_info(i_queue_name, '.global_watermark');
+    if _lag >= '5 minutes'::interval then
+        perform pgq_node.set_global_watermark(i_queue_name, NULL);
+    end if;
+
+    return 0;
+end;
+$$ language plpgsql;
+
index 3fc0b557fef3065b9c5d90e966a87f4ae5f8535e..abe138a8b09571ae92fd32c5d9c8f702ea8d60f5 100644 (file)
@@ -20,6 +20,9 @@ select * from pgq_node.create_node('aqueue', 'root', 'node1', 'node1_worker', nu
 select * from pgq_node.register_subscriber('aqueue', 'node2', 'node2_worker', null);
 select * from pgq_node.register_subscriber('aqueue', 'node3', 'node3_worker', null);
 
+select * from pgq_node.maint_watermark('aqueue');
+select * from pgq_node.maint_watermark('aqueue-x');
+
 select * from pgq_node.get_consumer_info('aqueue');
 select * from pgq_node.unregister_subscriber('aqueue', 'node3');
 select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
index 9c77a4fb1d16fec01eec53b650282b9a04edf46a..74df04c3763e6c4732ba86617b9003e7053e7957 100644 (file)
@@ -39,3 +39,6 @@
 \i   functions/pgq_node.set_consumer_completed.sql
 \i   functions/pgq_node.set_consumer_error.sql
 
+-- Group: Maintenance operations
+\i functions/pgq_node.maint_watermark.sql
+