pgq_node.set_consumer_error(): to store pending error condition
authorMarko Kreen <markokr@gmail.com>
Mon, 8 Jun 2009 08:03:49 +0000 (11:03 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 8 Jun 2009 08:03:49 +0000 (11:03 +0300)
this may help admin tools which can show also pending error
of why processing last batch failed.

sql/pgq_node/functions/pgq_node.get_consumer_info.sql
sql/pgq_node/functions/pgq_node.get_consumer_state.sql
sql/pgq_node/functions/pgq_node.set_consumer_completed.sql
sql/pgq_node/functions/pgq_node.set_consumer_error.sql [new file with mode: 0644]
sql/pgq_node/sql/pgq_node_test.sql
sql/pgq_node/structure/functions.sql
sql/pgq_node/structure/tables.sql

index 326dfaa39d1146457828ee2bf658fdc9a789cc61..4c4b07b4e54cbbb89522328f80bc027ba5ec6a3f 100644 (file)
@@ -6,7 +6,8 @@ create or replace function pgq_node.get_consumer_info(
     out provider_node text,
     out last_tick_id int8,
     out paused boolean,
-    out uptodate boolean)
+    out uptodate boolean,
+    out cur_error text)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq_node.get_consumer_info(1)
@@ -22,11 +23,12 @@ returns setof record as $$
 --      last_tick_id    - last committed tick
 --      paused          - if consumer is paused
 --      uptodate        - if consumer is uptodate
+--      cur_error       - failure reason
 -- ----------------------------------------------------------------------
 begin
-    for consumer_name, provider_node, last_tick_id, paused, uptodate in
+    for consumer_name, provider_node, last_tick_id, paused, uptodate, cur_error in
         select s.consumer_name, s.provider_node, s.last_tick_id,
-               s.paused, s.uptodate
+               s.paused, s.uptodate, s.cur_error
             from pgq_node.local_state s
             where s.queue_name = i_queue_name
             order by 1
index 389d985a6159d6b45549c0da657b58e498e96ec2..a8fd48bbc6573e5d5529a7d4a8d0a03dd5171fd9 100644 (file)
@@ -12,7 +12,8 @@ create or replace function pgq_node.get_consumer_state(
     out provider_node text,
     out provider_location text,
     out paused boolean,
-    out uptodate boolean
+    out uptodate boolean,
+    out cur_error text
 ) returns record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq_node.get_consumer_state(2)
@@ -31,12 +32,13 @@ create or replace function pgq_node.get_consumer_state(
 --      provider_location - connect string to provider node
 --      paused - this node should not do any work
 --      uptodate - if consumer has loaded last changes
+--      cur_error - failure reason
 -- ----------------------------------------------------------------------
 begin
     select 100, 'Ok', n.node_type, n.node_name, s.last_tick_id,
-           s.provider_node, p.node_location, s.paused, s.uptodate
+           s.provider_node, p.node_location, s.paused, s.uptodate, s.cur_error
       into ret_code, ret_note, node_type, node_name, completed_tick,
-           provider_node, provider_location, paused, uptodate
+           provider_node, provider_location, paused, uptodate, cur_error
       from pgq_node.node_info n, pgq_node.local_state s, pgq_node.node_location p
      where n.queue_name = i_queue_name
        and s.queue_name = n.queue_name
index 6bcbf5358a20d251a42f96aeaaa4bc5fd83dce6b..0c1bcc46c74e3196f3731ffb944c0dfcb7e3b52e 100644 (file)
@@ -19,7 +19,8 @@ as $$
 -- ----------------------------------------------------------------------
 begin
     update pgq_node.local_state
-       set last_tick_id = i_tick_id
+       set last_tick_id = i_tick_id,
+           cur_error = NULL
      where queue_name = i_queue_name
        and consumer_name = i_consumer_name;
     if found then
diff --git a/sql/pgq_node/functions/pgq_node.set_consumer_error.sql b/sql/pgq_node/functions/pgq_node.set_consumer_error.sql
new file mode 100644 (file)
index 0000000..6d60078
--- /dev/null
@@ -0,0 +1,31 @@
+
+create or replace function pgq_node.set_consumer_error(
+    in i_queue_name text,
+    in i_consumer_name text,
+    in i_error_msg text,
+    out ret_code int4,
+    out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_node.set_consumer_error(3)
+--
+--      If batch processing fails, consumer can store it's last error in db.
+-- ----------------------------------------------------------------------
+begin
+    update pgq_node.local_state
+       set cur_error = i_error_msg
+     where queue_name = i_queue_name
+       and consumer_name = i_consumer_name;
+    if found then
+        select 100, 'Consumer ' || i_consumer_name || ' error = ' || i_error_msg
+            into ret_code, ret_note;
+    else
+        select 404, 'Consumer not known: '
+               || i_queue_name || '/' || i_consumer_name
+          into ret_code, ret_note;
+    end if;
+    return;
+end;
+$$ language plpgsql security definer;
+
+
index 4ddbec7126b1878dfcff36a149b7e6b01cfbe00a..3fc0b557fef3065b9c5d90e966a87f4ae5f8535e 100644 (file)
@@ -57,7 +57,12 @@ select * from pgq_node.is_root_node('cqueue');
 select * from pgq_node.get_consumer_state('bqueue', 'random_consumer');
 select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
 
+select * from pgq_node.set_consumer_error('bqueue', 'random_consumer2', 'failure');
+select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+
 select * from pgq_node.set_consumer_completed('bqueue', 'random_consumer2', 2);
+select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+
 select * from pgq_node.set_consumer_paused('bqueue', 'random_consumer2', true);
 select * from pgq_node.set_consumer_uptodate('bqueue', 'random_consumer2', true);
 
index 1914d625c18a4ceab93004317cd0a81c66f80d9a..702f13a2eb2670e09b6039b9cf8c24d82997a2d3 100644 (file)
@@ -36,4 +36,5 @@
 \i   functions/pgq_node.set_consumer_uptodate.sql
 \i   functions/pgq_node.set_consumer_paused.sql
 \i   functions/pgq_node.set_consumer_completed.sql
+\i   functions/pgq_node.set_consumer_error.sql
 
index 2fb4adcd5828291638a7cf9fba94779c3e26a4ef..73abc774ffa6efe978d374d43bb34690247ff8f1 100644 (file)
@@ -82,6 +82,7 @@ create table pgq_node.node_info (
 --      consumer_name   - cascaded consumer name
 --      provider_node   - node name the consumer reads from
 --      last_tick_id    - last committed tick id on this node
+--      cur_error       - reason why current batch failed
 --      paused          - whether consumer should wait
 --      uptodate        - if consumer has seen new state
 -- ----------------------------------------------------------------------
@@ -90,6 +91,7 @@ create table pgq_node.local_state (
     consumer_name   text not null,
     provider_node   text not null,
     last_tick_id    bigint not null,
+    cur_error       text,
 
     paused          boolean not null default false,
     uptodate        boolean not null default false,