out provider_node text,
out last_tick_id int8,
out paused boolean,
- out uptodate boolean)
+ out uptodate boolean,
+ out cur_error text)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.get_consumer_info(1)
-- last_tick_id - last committed tick
-- paused - if consumer is paused
-- uptodate - if consumer is uptodate
+-- cur_error - failure reason
-- ----------------------------------------------------------------------
begin
- for consumer_name, provider_node, last_tick_id, paused, uptodate in
+ for consumer_name, provider_node, last_tick_id, paused, uptodate, cur_error in
select s.consumer_name, s.provider_node, s.last_tick_id,
- s.paused, s.uptodate
+ s.paused, s.uptodate, s.cur_error
from pgq_node.local_state s
where s.queue_name = i_queue_name
order by 1
out provider_node text,
out provider_location text,
out paused boolean,
- out uptodate boolean
+ out uptodate boolean,
+ out cur_error text
) returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.get_consumer_state(2)
-- provider_location - connect string to provider node
-- paused - this node should not do any work
-- uptodate - if consumer has loaded last changes
+-- cur_error - failure reason
-- ----------------------------------------------------------------------
begin
select 100, 'Ok', n.node_type, n.node_name, s.last_tick_id,
- s.provider_node, p.node_location, s.paused, s.uptodate
+ s.provider_node, p.node_location, s.paused, s.uptodate, s.cur_error
into ret_code, ret_note, node_type, node_name, completed_tick,
- provider_node, provider_location, paused, uptodate
+ provider_node, provider_location, paused, uptodate, cur_error
from pgq_node.node_info n, pgq_node.local_state s, pgq_node.node_location p
where n.queue_name = i_queue_name
and s.queue_name = n.queue_name
-- ----------------------------------------------------------------------
begin
update pgq_node.local_state
- set last_tick_id = i_tick_id
+ set last_tick_id = i_tick_id,
+ cur_error = NULL
where queue_name = i_queue_name
and consumer_name = i_consumer_name;
if found then
--- /dev/null
+
+create or replace function pgq_node.set_consumer_error(
+ in i_queue_name text,
+ in i_consumer_name text,
+ in i_error_msg text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_node.set_consumer_error(3)
+--
+-- If batch processing fails, consumer can store it's last error in db.
+-- ----------------------------------------------------------------------
+begin
+ update pgq_node.local_state
+ set cur_error = i_error_msg
+ where queue_name = i_queue_name
+ and consumer_name = i_consumer_name;
+ if found then
+ select 100, 'Consumer ' || i_consumer_name || ' error = ' || i_error_msg
+ into ret_code, ret_note;
+ else
+ select 404, 'Consumer not known: '
+ || i_queue_name || '/' || i_consumer_name
+ into ret_code, ret_note;
+ end if;
+ return;
+end;
+$$ language plpgsql security definer;
+
+
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer');
select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+select * from pgq_node.set_consumer_error('bqueue', 'random_consumer2', 'failure');
+select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+
select * from pgq_node.set_consumer_completed('bqueue', 'random_consumer2', 2);
+select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
+
select * from pgq_node.set_consumer_paused('bqueue', 'random_consumer2', true);
select * from pgq_node.set_consumer_uptodate('bqueue', 'random_consumer2', true);
\i functions/pgq_node.set_consumer_uptodate.sql
\i functions/pgq_node.set_consumer_paused.sql
\i functions/pgq_node.set_consumer_completed.sql
+\i functions/pgq_node.set_consumer_error.sql
-- consumer_name - cascaded consumer name
-- provider_node - node name the consumer reads from
-- last_tick_id - last committed tick id on this node
+-- cur_error - reason why current batch failed
-- paused - whether consumer should wait
-- uptodate - if consumer has seen new state
-- ----------------------------------------------------------------------
consumer_name text not null,
provider_node text not null,
last_tick_id bigint not null,
+ cur_error text,
paused boolean not null default false,
uptodate boolean not null default false,