x_sub_id integer;
_sub_id_cnt integer;
begin
- select sub_id into x_sub_id
- from pgq.subscription, pgq.consumer, pgq.queue
- where sub_queue = queue_id
- and sub_consumer = co_id
- and queue_name = x_queue_name
- and co_name = x_consumer_name;
+ select s.sub_id into x_sub_id
+ from pgq.subscription s, pgq.consumer c, pgq.queue q
+ where s.sub_queue = q.queue_id
+ and s.sub_consumer = c.co_id
+ and q.queue_name = x_queue_name
+ and c.co_name = x_consumer_name
+ for update of s;
if not found then
- raise exception 'consumer not registered on queue';
+ return 0;
end if;
- -- subconsumer-aware
+ -- consumer + subconsumer count
select count(*) into _sub_id_cnt
from pgq.subscription
where sub_id = x_sub_id;
- if _sub_id_cnt > 1 then
- raise exception 'unregistered subconsumers detected';
- end if;
+ -- retry events
delete from pgq.retry_queue
where ev_owner = x_sub_id;
+ -- this will drop subconsumers too
delete from pgq.subscription
where sub_id = x_sub_id;
- return 1;
+ return _sub_id_cnt;
end;
$$ language plpgsql security definer;