pgq.next_batch_custom(): extended next_batch
authorMarko Kreen <markokr@gmail.com>
Fri, 11 Sep 2009 11:08:51 +0000 (14:08 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 11 Sep 2009 11:08:51 +0000 (14:08 +0300)
Supports create batch of several ticks and also forced lag.

sql/pgq/functions/pgq.find_tick_helper.sql [new file with mode: 0644]
sql/pgq/functions/pgq.next_batch.sql
sql/pgq/structure/func_internal.sql

diff --git a/sql/pgq/functions/pgq.find_tick_helper.sql b/sql/pgq/functions/pgq.find_tick_helper.sql
new file mode 100644 (file)
index 0000000..77afcb4
--- /dev/null
@@ -0,0 +1,72 @@
+create or replace function pgq.find_tick_helper(
+    in i_queue_id int4,
+    in i_prev_tick_id int8,
+    in i_prev_tick_time timestamptz,
+    in i_prev_tick_seq int8,
+    in i_min_count int8,
+    in i_min_interval interval,
+    out next_tick_id int8,
+    out next_tick_time timestamptz,
+    out next_tick_seq int8)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.find_tick_helper(6)
+--
+--      Helper function for pgq.next_batch_custom() to do extended tick search.
+-- ----------------------------------------------------------------------
+declare
+    ok      boolean;
+    t       record;
+    cnt     int8;
+    ival    interval;
+begin
+    -- first, fetch last tick of the queue
+    select tick_id, tick_time, tick_seq into t
+        from pgq.tick
+        where tick_queue = i_queue_id
+          and tick_id > i_tick_id
+        order by tick_queue desc, tick_id desc
+        limit 1;
+    if not found then
+        return;
+    end if;
+    
+    -- check if it is reasonably ok
+    ok := true;
+    if i_min_count is not null then
+        cnt = t.tick_seq - i_prev_tick_seq;
+        if cnt < i_min_count then
+            return;
+        end if;
+        if cnt > i_min_count * 2 then
+            ok := false;
+        end if;
+    end if;
+    if i_min_interval is not null then
+        ival = t.tick_time - i_prev_tick_time;
+        if ival < i_min_interval then
+            return;
+        end if;
+        if ival > i_min_interval * 2 then
+            ok := false;
+        end if;
+    end if;
+
+    -- if last tick too far away, do large scan
+    if not ok then
+        select tick_id, tick_time, tick_seq into t
+            from pgq.tick
+            where tick_queue = i_queue_id
+              and tick_id > i_prev_tick_id
+              and (i_min_count is null or (tick_seq - i_prev_tick_seq) >= i_min_count)
+              and (i_min_interval is null or (tick_time - i_prev_tick_time) >= i_min_interval)
+            order by tick_queue asc, tick_id asc
+            limit 1;
+    end if;
+    next_tick_id := t.tick_id;
+    next_tick_time := t.tick_time;
+    next_tick_seq := t.tick_seq;
+    return;
+end;
+$$ language plpgsql stable;
+
index 5f9981b2d2bc9312b606ac7a7dfb097e9151e38e..dbbefad419fbec317d12a840168e7a454c824635 100644 (file)
@@ -35,6 +35,90 @@ as $$
 --      prev_tick_time      - Start tick time.
 --      prev_tick_event_seq - value from event id sequence at the time tick was issued.
 -- ----------------------------------------------------------------------
+begin
+    select f.batch_id, f.cur_tick_id, f.prev_tick_id,
+           f.cur_tick_time, f.prev_tick_time,
+           f.cur_tick_event_seq, f.prev_tick_event_seq
+        into batch_id, cur_tick_id, prev_tick_id, cur_tick_time, prev_tick_time,
+             cur_tick_event_seq, prev_tick_event_seq
+        from pgq.next_batch_custom(i_queue_name, i_consumer_name, NULL, NULL, NULL) f;
+    return;
+end;
+$$ language plpgsql;
+
+create or replace function pgq.next_batch(
+    in i_queue_name text,
+    in i_consumer_name text)
+returns int8 as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.next_batch(2)
+--
+--      Old function that returns just batch_id.
+--
+-- Parameters:
+--      i_queue_name        - Name of the queue
+--      i_consumer_name     - Name of the consumer
+--
+-- Returns:
+--      Batch ID or NULL if there are no more events available.
+-- ----------------------------------------------------------------------
+declare
+    res int8;
+begin
+    select batch_id into res
+        from pgq.next_batch_info(i_queue_name, i_consumer_name);
+    return res;
+end;
+$$ language plpgsql;
+
+create or replace function pgq.next_batch_custom(
+    in i_queue_name text,
+    in i_consumer_name text,
+    in i_min_lag interval,
+    in i_min_count int4,
+    in i_min_interval interval,
+    out batch_id int8,
+    out cur_tick_id int8,
+    out prev_tick_id int8,
+    out cur_tick_time timestamptz,
+    out prev_tick_time timestamptz,
+    out cur_tick_event_seq int8,
+    out prev_tick_event_seq int8)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.next_batch_custom(5)
+--
+--      Makes next block of events active.  Block size can be tuned
+--      with i_min_count, i_min_interval parameters.  Events age can
+--      be tuned with i_min_lag.
+--
+--      If it returns NULL, there is no events available in queue.
+--      Consumer should sleep then.
+--
+--      The values from event_id sequence may give hint how big the
+--      batch may be.  But they are inexact, they do not give exact size.
+--      Client *MUST NOT* use them to detect whether the batch contains any
+--      events at all - the values are unfit for that purpose.
+--
+-- Note:
+--      i_min_lag together with i_min_interval/i_min_count is inefficient.
+--
+-- Parameters:
+--      i_queue_name        - Name of the queue
+--      i_consumer_name     - Name of the consumer
+--      i_min_lag           - Consumer wants events older than that
+--      i_min_count         - Consumer wants batch to contain at least this many events
+--      i_min_interval      - Consumer wants batch to cover at least this much time
+--
+-- Returns:
+--      batch_id            - Batch ID or NULL if there are no more events available.
+--      cur_tick_id         - End tick id.
+--      cur_tick_time       - End tick time.
+--      cur_tick_event_seq  - Value from event id sequence at the time tick was issued.
+--      prev_tick_id        - Start tick id.
+--      prev_tick_time      - Start tick time.
+--      prev_tick_event_seq - value from event id sequence at the time tick was issued.
+-- ----------------------------------------------------------------------
 declare
     errmsg          text;
     queue_id        integer;
@@ -73,15 +157,34 @@ begin
         return;
     end if;
 
-    -- find next tick
-    select tick_id, tick_time, tick_event_seq
-        into cur_tick_id, cur_tick_time, cur_tick_event_seq
-        from pgq.tick
-        where tick_id > prev_tick_id
-          and tick_queue = queue_id
-        order by tick_queue asc, tick_id asc
-        limit 1;
-    if not found then
+    if i_min_interval is null and i_min_count is null then
+        -- find next tick
+        select tick_id, tick_time, tick_event_seq
+            into cur_tick_id, cur_tick_time, cur_tick_event_seq
+            from pgq.tick
+            where tick_id > prev_tick_id
+              and tick_queue = queue_id
+            order by tick_queue asc, tick_id asc
+            limit 1;
+    else
+        -- find custom tick
+        select next_tick_id, next_tick_time, next_tick_seq
+          into cur_tick_id, cur_tick_time, cur_tick_event_seq
+          from pgq.find_tick_helper(queue_id, prev_tick_id,
+                                    prev_tick_time, prev_tick_event_seq,
+                                    i_min_count, i_min_interval);
+    end if;
+
+    if i_min_lag is not null then
+        -- enforce min lag
+        if now() - cur_tick_time < i_min_lag then
+            cur_tick_id := NULL;
+            cur_tick_time := NULL;
+            cur_tick_event_seq := NULL;
+        end if;
+    end if;
+
+    if cur_tick_id is null then
         -- nothing to do
         prev_tick_id := null;
         prev_tick_time := null;
@@ -101,29 +204,3 @@ begin
 end;
 $$ language plpgsql security definer;
 
-
-create or replace function pgq.next_batch(
-    in i_queue_name text,
-    in i_consumer_name text)
-returns int8 as $$
--- ----------------------------------------------------------------------
--- Function: pgq.next_batch(2)
---
---      Old function that returns just batch_id.
---
--- Parameters:
---      i_queue_name        - Name of the queue
---      i_consumer_name     - Name of the consumer
---
--- Returns:
---      Batch ID or NULL if there are no more events available.
--- ----------------------------------------------------------------------
-declare
-    res int8;
-begin
-    select batch_id into res
-        from pgq.next_batch_info(i_queue_name, i_consumer_name);
-    return res;
-end;
-$$ language plpgsql;
-
index 1531cb5194f3a75ee178fdbb6a29c0b9b22a1921..e8ce09538c0a23d66f82d49788f48e5611f37334 100644 (file)
@@ -5,6 +5,7 @@
 \i functions/pgq.batch_event_sql.sql
 \i functions/pgq.batch_event_tables.sql
 \i functions/pgq.event_retry_raw.sql
+\i functions/pgq.find_tick_helper.sql
 
 -- \i functions/pgq.insert_event_raw.sql
 \i lowlevel/pgq_lowlevel.sql