pgq_node.get_worker_state: new function
authorMarko Kreen <markokr@gmail.com>
Fri, 4 Feb 2011 14:44:30 +0000 (16:44 +0200)
committerMarko Kreen <markokr@gmail.com>
Fri, 4 Feb 2011 14:44:30 +0000 (16:44 +0200)
combines get_node_info & get_consumer_info

sql/pgq_node/functions/pgq_node.get_worker_state.sql [new file with mode: 0644]
sql/pgq_node/sql/pgq_node_test.sql
sql/pgq_node/structure/functions.sql

diff --git a/sql/pgq_node/functions/pgq_node.get_worker_state.sql b/sql/pgq_node/functions/pgq_node.get_worker_state.sql
new file mode 100644 (file)
index 0000000..13313e6
--- /dev/null
@@ -0,0 +1,117 @@
+
+create or replace function pgq_node.get_worker_state(
+    in i_queue_name text,
+
+    out ret_code int4,
+    out ret_note text,
+
+    out node_type text,
+    out node_name text,
+    out completed_tick bigint,
+    out provider_node text,
+    out provider_location text,
+    out paused boolean,
+    out uptodate boolean,
+    out cur_error text,
+
+    out worker_name text,
+    out global_watermark bigint,
+    out local_watermark bigint,
+    out local_queue_top bigint,
+    out combined_queue text,
+    out combined_type text
+) returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_node.get_worker_state(1)
+--
+--      Get info for consumer that maintains local node.
+--
+-- Parameters:
+--      i_queue_name  - cascaded queue name
+--
+-- Returns:
+--      node_type - local node type
+--      node_name - local node name
+--      completed_tick - last committed tick
+--      provider_node - provider node name
+--      provider_location - connect string to provider node
+--      paused - this node should not do any work
+--      uptodate - if consumer has loaded last changes
+--      cur_error - failure reason
+
+--      worker_name - consumer name that maintains this node
+--      global_watermark - queue's global watermark
+--      local_watermark - queue's local watermark, for this and below nodes
+--      local_queue_top - last tick in local queue
+--      combined_queue - queue name for target set
+--      combined_type - node type of target setA
+-- ----------------------------------------------------------------------
+begin
+    select n.node_type, n.node_name, n.worker_name, n.combined_queue
+      into node_type, node_name, worker_name, combined_queue
+      from pgq_node.node_info n
+     where n.queue_name = i_queue_name;
+    if not found then
+        select 404, 'Unknown queue: ' || i_queue_name
+          into ret_code, ret_note;
+        return;
+    end if;
+    select s.last_tick_id, s.provider_node, s.paused, s.uptodate, s.cur_error
+      into completed_tick, provider_node, paused, uptodate, cur_error
+      from pgq_node.local_state s
+     where s.queue_name = i_queue_name
+       and s.consumer_name = worker_name;
+    if not found then
+        select 404, 'Unknown consumer: ' || i_queue_name || '/' || worker_name
+          into ret_code, ret_note;
+        return;
+    end if;
+    select 100, 'Ok', p.node_location
+      into ret_code, ret_note, provider_location
+      from pgq_node.node_location p
+     where p.queue_name = i_queue_name
+      and p.node_name = provider_node;
+    if not found then
+        select 404, 'Unknown provider node: ' || i_queue_name || '/' || provider_node
+          into ret_code, ret_note;
+        return;
+    end if;
+
+    if combined_queue is not null then
+        select n.node_type into combined_type
+          from pgq_node.node_info n
+         where n.queue_name = combined_queue;
+        if not found then
+            select 404, 'Combinde queue node not found: ' || combined_queue
+              into ret_code, ret_note;
+            return;
+        end if;
+    end if;
+
+    if node_type in ('root', 'branch') then
+        select min(case when consumer_name = '.global_watermark' then null else last_tick end),
+               min(case when consumer_name = '.global_watermark' then last_tick else null end)
+          into local_watermark, global_watermark
+          from pgq.get_consumer_info(i_queue_name);
+        if local_watermark is null then
+            select t.tick_id into local_watermark
+              from pgq.tick t, pgq.queue q
+             where t.tick_queue = q.queue_id
+               and q.queue_name = i_queue_name
+             order by 1 desc
+             limit 1;
+        end if;
+
+        select tick_id from pgq.tick t, pgq.queue q
+         where q.queue_name = i_queue_name
+           and t.tick_queue = q.queue_id
+         order by t.tick_queue desc, t.tick_id desc
+         limit 1 into local_queue_top;
+    else
+        local_watermark := completed_tick;
+    end if;
+
+    return;
+end;
+$$ language plpgsql security definer;
+
index abe138a8b09571ae92fd32c5d9c8f702ea8d60f5..2999ad44d78880f2a2e166289ad350a662a67d9a 100644 (file)
@@ -27,6 +27,8 @@ select * from pgq_node.get_consumer_info('aqueue');
 select * from pgq_node.unregister_subscriber('aqueue', 'node3');
 select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
 
+select * from pgq_node.get_worker_state('aqueue');
+
 update pgq.queue set queue_ticker_max_lag = '0', queue_ticker_idle_period = '0';
 select * from pgq.ticker('aqueue');
 select * from pgq.ticker('aqueue');
@@ -52,6 +54,9 @@ select * from pgq_node.get_node_info('aqueue');
 select * from pgq_node.get_node_info('bqueue');
 select * from pgq_node.get_node_info('cqueue');
 
+select * from pgq_node.get_worker_state('aqueue');
+select * from pgq_node.get_worker_state('bqueue');
+select * from pgq_node.get_worker_state('cqueue');
 
 select * from pgq_node.is_root_node('aqueue');
 select * from pgq_node.is_root_node('bqueue');
@@ -86,6 +91,13 @@ select * from pgq_node.demote_root('aqueue', 2, 'node3');
 select * from pgq_node.demote_root('aqueue', 3, 'node3');
 select * from pgq_node.demote_root('aqueue', 3, 'node3');
 
+-- leaf node
+select * from pgq_node.register_location('mqueue', 'node1', 'dbname=node1', false);
+select * from pgq_node.register_location('mqueue', 'node2', 'dbname=node2', false);
+select * from pgq_node.register_location('mqueue', 'node3', 'dbname=node3', false);
+select * from pgq_node.create_node('mqueue', 'leaf', 'node2', 'node2_worker', 'node1', 13, 'aqueue');
+select * from pgq_node.get_worker_state('mqueue');
+
 \q
 
 select * from pgq_node.subscribe_node('aqueue', 'node2');
index 732a76ec6ae6388fc208828e2663dbee6baa10dd..2ba5cc405b222227dab6b187917159962118bfb5 100644 (file)
@@ -26,6 +26,7 @@
 \i   functions/pgq_node.set_subscriber_watermark.sql
 
 -- Group: Subscriber side operations - worker
+\i   functions/pgq_node.get_worker_state.sql
 \i   functions/pgq_node.set_global_watermark.sql
 \i   functions/pgq_node.set_partition_watermark.sql