From 6a58e74aa18aa5a43761418b29eefd7e3dfa026f Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 18 Sep 2012 14:37:25 +0300 Subject: [PATCH] pgq_coop.next_batch: check and create subconsumers after locking main consumer Otherwise, there is race with other consumers who might remove just-checked subconsumer. --- .../functions/pgq_coop.next_batch.sql | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/pgq_coop/functions/pgq_coop.next_batch.sql b/sql/pgq_coop/functions/pgq_coop.next_batch.sql index a3cf6f1d..7863acfe 100644 --- a/sql/pgq_coop/functions/pgq_coop.next_batch.sql +++ b/sql/pgq_coop/functions/pgq_coop.next_batch.sql @@ -133,16 +133,6 @@ declare _sub_id integer; _dead record; begin - -- automatically register subconsumers - perform 1 from pgq.subscription s, pgq.consumer c, pgq.queue q - where q.queue_name = i_queue_name - and s.sub_queue = q.queue_id - and s.sub_consumer = c.co_id - and c.co_name = i_consumer_name || '.' || i_subconsumer_name; - if not found then - perform pgq_coop.register_subconsumer(i_queue_name, i_consumer_name, i_subconsumer_name); - end if; - -- fetch master consumer details, lock the row select q.queue_id, c.co_id, s.sub_next_tick into _queue_id, _consumer_id, _cur_tick @@ -159,6 +149,16 @@ begin raise exception 'main consumer has batch open?'; end if; + -- automatically register subconsumers + perform 1 from pgq.subscription s, pgq.consumer c, pgq.queue q + where q.queue_name = i_queue_name + and s.sub_queue = q.queue_id + and s.sub_consumer = c.co_id + and c.co_name = i_consumer_name || '.' || i_subconsumer_name; + if not found then + perform pgq_coop.register_subconsumer(i_queue_name, i_consumer_name, i_subconsumer_name); + end if; + -- fetch subconsumer details select s.sub_batch, sc.co_id, s.sub_id into _batch_id, _subcon_id, _sub_id -- 2.39.5