--- /dev/null
+create or replace function pgq.find_tick_helper(
+ in i_queue_id int4,
+ in i_prev_tick_id int8,
+ in i_prev_tick_time timestamptz,
+ in i_prev_tick_seq int8,
+ in i_min_count int8,
+ in i_min_interval interval,
+ out next_tick_id int8,
+ out next_tick_time timestamptz,
+ out next_tick_seq int8)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.find_tick_helper(6)
+--
+-- Helper function for pgq.next_batch_custom() to do extended tick search.
+-- ----------------------------------------------------------------------
+declare
+ ok boolean;
+ t record;
+ cnt int8;
+ ival interval;
+begin
+ -- first, fetch last tick of the queue
+ select tick_id, tick_time, tick_seq into t
+ from pgq.tick
+ where tick_queue = i_queue_id
+ and tick_id > i_tick_id
+ order by tick_queue desc, tick_id desc
+ limit 1;
+ if not found then
+ return;
+ end if;
+
+ -- check if it is reasonably ok
+ ok := true;
+ if i_min_count is not null then
+ cnt = t.tick_seq - i_prev_tick_seq;
+ if cnt < i_min_count then
+ return;
+ end if;
+ if cnt > i_min_count * 2 then
+ ok := false;
+ end if;
+ end if;
+ if i_min_interval is not null then
+ ival = t.tick_time - i_prev_tick_time;
+ if ival < i_min_interval then
+ return;
+ end if;
+ if ival > i_min_interval * 2 then
+ ok := false;
+ end if;
+ end if;
+
+ -- if last tick too far away, do large scan
+ if not ok then
+ select tick_id, tick_time, tick_seq into t
+ from pgq.tick
+ where tick_queue = i_queue_id
+ and tick_id > i_prev_tick_id
+ and (i_min_count is null or (tick_seq - i_prev_tick_seq) >= i_min_count)
+ and (i_min_interval is null or (tick_time - i_prev_tick_time) >= i_min_interval)
+ order by tick_queue asc, tick_id asc
+ limit 1;
+ end if;
+ next_tick_id := t.tick_id;
+ next_tick_time := t.tick_time;
+ next_tick_seq := t.tick_seq;
+ return;
+end;
+$$ language plpgsql stable;
+
-- prev_tick_time - Start tick time.
-- prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- ----------------------------------------------------------------------
+begin
+ select f.batch_id, f.cur_tick_id, f.prev_tick_id,
+ f.cur_tick_time, f.prev_tick_time,
+ f.cur_tick_event_seq, f.prev_tick_event_seq
+ into batch_id, cur_tick_id, prev_tick_id, cur_tick_time, prev_tick_time,
+ cur_tick_event_seq, prev_tick_event_seq
+ from pgq.next_batch_custom(i_queue_name, i_consumer_name, NULL, NULL, NULL) f;
+ return;
+end;
+$$ language plpgsql;
+
+create or replace function pgq.next_batch(
+ in i_queue_name text,
+ in i_consumer_name text)
+returns int8 as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.next_batch(2)
+--
+-- Old function that returns just batch_id.
+--
+-- Parameters:
+-- i_queue_name - Name of the queue
+-- i_consumer_name - Name of the consumer
+--
+-- Returns:
+-- Batch ID or NULL if there are no more events available.
+-- ----------------------------------------------------------------------
+declare
+ res int8;
+begin
+ select batch_id into res
+ from pgq.next_batch_info(i_queue_name, i_consumer_name);
+ return res;
+end;
+$$ language plpgsql;
+
+create or replace function pgq.next_batch_custom(
+ in i_queue_name text,
+ in i_consumer_name text,
+ in i_min_lag interval,
+ in i_min_count int4,
+ in i_min_interval interval,
+ out batch_id int8,
+ out cur_tick_id int8,
+ out prev_tick_id int8,
+ out cur_tick_time timestamptz,
+ out prev_tick_time timestamptz,
+ out cur_tick_event_seq int8,
+ out prev_tick_event_seq int8)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.next_batch_custom(5)
+--
+-- Makes next block of events active. Block size can be tuned
+-- with i_min_count, i_min_interval parameters. Events age can
+-- be tuned with i_min_lag.
+--
+-- If it returns NULL, there is no events available in queue.
+-- Consumer should sleep then.
+--
+-- The values from event_id sequence may give hint how big the
+-- batch may be. But they are inexact, they do not give exact size.
+-- Client *MUST NOT* use them to detect whether the batch contains any
+-- events at all - the values are unfit for that purpose.
+--
+-- Note:
+-- i_min_lag together with i_min_interval/i_min_count is inefficient.
+--
+-- Parameters:
+-- i_queue_name - Name of the queue
+-- i_consumer_name - Name of the consumer
+-- i_min_lag - Consumer wants events older than that
+-- i_min_count - Consumer wants batch to contain at least this many events
+-- i_min_interval - Consumer wants batch to cover at least this much time
+--
+-- Returns:
+-- batch_id - Batch ID or NULL if there are no more events available.
+-- cur_tick_id - End tick id.
+-- cur_tick_time - End tick time.
+-- cur_tick_event_seq - Value from event id sequence at the time tick was issued.
+-- prev_tick_id - Start tick id.
+-- prev_tick_time - Start tick time.
+-- prev_tick_event_seq - value from event id sequence at the time tick was issued.
+-- ----------------------------------------------------------------------
declare
errmsg text;
queue_id integer;
return;
end if;
- -- find next tick
- select tick_id, tick_time, tick_event_seq
- into cur_tick_id, cur_tick_time, cur_tick_event_seq
- from pgq.tick
- where tick_id > prev_tick_id
- and tick_queue = queue_id
- order by tick_queue asc, tick_id asc
- limit 1;
- if not found then
+ if i_min_interval is null and i_min_count is null then
+ -- find next tick
+ select tick_id, tick_time, tick_event_seq
+ into cur_tick_id, cur_tick_time, cur_tick_event_seq
+ from pgq.tick
+ where tick_id > prev_tick_id
+ and tick_queue = queue_id
+ order by tick_queue asc, tick_id asc
+ limit 1;
+ else
+ -- find custom tick
+ select next_tick_id, next_tick_time, next_tick_seq
+ into cur_tick_id, cur_tick_time, cur_tick_event_seq
+ from pgq.find_tick_helper(queue_id, prev_tick_id,
+ prev_tick_time, prev_tick_event_seq,
+ i_min_count, i_min_interval);
+ end if;
+
+ if i_min_lag is not null then
+ -- enforce min lag
+ if now() - cur_tick_time < i_min_lag then
+ cur_tick_id := NULL;
+ cur_tick_time := NULL;
+ cur_tick_event_seq := NULL;
+ end if;
+ end if;
+
+ if cur_tick_id is null then
-- nothing to do
prev_tick_id := null;
prev_tick_time := null;
end;
$$ language plpgsql security definer;
-
-create or replace function pgq.next_batch(
- in i_queue_name text,
- in i_consumer_name text)
-returns int8 as $$
--- ----------------------------------------------------------------------
--- Function: pgq.next_batch(2)
---
--- Old function that returns just batch_id.
---
--- Parameters:
--- i_queue_name - Name of the queue
--- i_consumer_name - Name of the consumer
---
--- Returns:
--- Batch ID or NULL if there are no more events available.
--- ----------------------------------------------------------------------
-declare
- res int8;
-begin
- select batch_id into res
- from pgq.next_batch_info(i_queue_name, i_consumer_name);
- return res;
-end;
-$$ language plpgsql;
-