pgq: add fields for get_consumer_info
authorMarko Kreen <markokr@gmail.com>
Mon, 16 Apr 2007 12:39:29 +0000 (12:39 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 16 Apr 2007 12:39:29 +0000 (12:39 +0000)
sql/pgq/expected/pgq_core.out
sql/pgq/functions/pgq.get_consumer_info.sql
sql/pgq/sql/pgq_core.sql
sql/pgq/structure/types.sql

index 1c8d03433a25a11ab97f8cbd912b80b7b7293a8e..733e97cbca99145227cff6e7bf7e1adee3fdf219 100644 (file)
@@ -86,16 +86,23 @@ select queue_name, consumer_name, prev_tick_id, tick_id, lag from pgq.get_batch_
  myqueue    | consumer      |            1 |       2 | @ 0.00 secs
 (1 row)
 
-select queue_name from pgq.get_queue_info() order by 1;
- queue_name 
-------------
- myqueue
-(1 row)
-
-select queue_name, consumer_name from pgq.get_consumer_info() order by 1, 2;
- queue_name | consumer_name 
-------------+---------------
- myqueue    | consumer
+select queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
+       queue_switch_time <= now() as switch_time_exists,
+       queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag,
+       queue_ticker_idle_period, ticker_lag < '2 hours' as ticker_lag_exists
+  from pgq.get_queue_info() order by 1;
+ queue_name | queue_ntables | queue_cur_table | queue_rotation_period | switch_time_exists | queue_external_ticker | queue_ticker_max_count | queue_ticker_max_lag | queue_ticker_idle_period | ticker_lag_exists 
+------------+---------------+-----------------+-----------------------+--------------------+-----------------------+------------------------+----------------------+--------------------------+-------------------
+ myqueue    |             3 |               0 | @ 2 hours             | t                  | f                     |                    500 | @ 3 secs             | @ 1 min                  | t
+(1 row)
+
+select queue_name, consumer_name, lag < '30 seconds' as lag_exists,
+       last_seen < '30 seconds' as last_seen_exists,
+       last_tick, current_batch, next_tick
+  from pgq.get_consumer_info() order by 1, 2;
+ queue_name | consumer_name | lag_exists | last_seen_exists | last_tick | current_batch | next_tick 
+------------+---------------+------------+------------------+-----------+---------------+-----------
+ myqueue    | consumer      | t          | t                |         1 |             1 |         2
 (1 row)
 
 select pgq.finish_batch(1);
index 3444421df83d9e17452b4266c0ce1d20558c3557..27dd444d6b3cc679f549578bc2dac65db372ff2c 100644 (file)
@@ -5,13 +5,10 @@ returns setof pgq.ret_consumer_info as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_consumer_info(0)
 --
---      Desc
---
--- Parameters:
---      arg - desc
+--      Returns info about all consumers on all queues.
 --
 -- Returns:
---      desc
+--      See pgq.get_consumer_info(2)
 -- ----------------------------------------------------------------------
 declare
     ret  pgq.ret_consumer_info%rowtype;
@@ -36,13 +33,13 @@ returns setof pgq.ret_consumer_info as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_consumer_info(1)
 --
---      Desc
+--      Returns info about consumers on one particular queue.
 --
 -- Parameters:
---      arg - desc
+--      x_queue_name    - Queue name
 --
 -- Returns:
---      desc
+--      See pgq.get_consumer_info(2)
 -- ----------------------------------------------------------------------
 declare
     ret  pgq.ret_consumer_info%rowtype;
@@ -82,7 +79,13 @@ returns setof pgq.ret_consumer_info as $$
 --      x_consumer_name     - name of a consumer
 --
 -- Returns:
---      info
+--      queue_name          - Queue name
+--      consumer_name       - Consumer name
+--      lag                 - How old are events the consumer is processing
+--      last_seen           - When the consumer seen by pgq
+--      last_tick           - Tick ID of last processed tick
+--      current_batch       - Current batch ID, if one is active or NULL
+--      next_tick           - If batch is active, then its final tick.
 -- ----------------------------------------------------------------------
 declare
     ret  pgq.ret_consumer_info%rowtype;
@@ -90,7 +93,10 @@ begin
     for ret in 
         select queue_name, co_name,
                current_timestamp - tick_time as lag,
-               current_timestamp - sub_active as last_seen
+               current_timestamp - sub_active as last_seen,
+               sub_last_tick as last_tick,
+               sub_batch as current_batch,
+               sub_next_tick as next_tick
           from pgq.subscription, pgq.tick, pgq.queue, pgq.consumer
          where tick_id = sub_last_tick
            and queue_id = sub_queue
index b25e20d55545411f6e301065401072e1e22dde1b..82515d60df449c1094d2a275acb7f7eb141609c3 100644 (file)
@@ -16,8 +16,16 @@ select pgq.next_batch('myqueue', 'consumer');
 select pgq.next_batch('myqueue', 'consumer');
 
 select queue_name, consumer_name, prev_tick_id, tick_id, lag from pgq.get_batch_info(1);
-select queue_name from pgq.get_queue_info() order by 1;
-select queue_name, consumer_name from pgq.get_consumer_info() order by 1, 2;
+
+select queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
+       queue_switch_time <= now() as switch_time_exists,
+       queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag,
+       queue_ticker_idle_period, ticker_lag < '2 hours' as ticker_lag_exists
+  from pgq.get_queue_info() order by 1;
+select queue_name, consumer_name, lag < '30 seconds' as lag_exists,
+       last_seen < '30 seconds' as last_seen_exists,
+       last_tick, current_batch, next_tick
+  from pgq.get_consumer_info() order by 1, 2;
 
 select pgq.finish_batch(1);
 select pgq.finish_batch(1);
index c89ce50005444803480b42f08f0477e11085096c..75a32c12660547fd08603d7e3ff176104bece516 100644 (file)
@@ -1,47 +1,50 @@
 
 create type pgq.ret_queue_info as (
-    queue_name text,
-    queue_ntables integer,
-    queue_cur_table integer,
-    queue_rotation_period interval,
-    queue_switch_time timestamptz,
-    queue_external_ticker boolean,
-    queue_ticker_max_count integer,
-    queue_ticker_max_lag interval,
-    queue_ticker_idle_period interval,
-    ticker_lag interval
+    queue_name                  text,
+    queue_ntables               integer,
+    queue_cur_table             integer,
+    queue_rotation_period       interval,
+    queue_switch_time           timestamptz,
+    queue_external_ticker       boolean,
+    queue_ticker_max_count      integer,
+    queue_ticker_max_lag        interval,
+    queue_ticker_idle_period    interval,
+    ticker_lag                  interval
 );
 
 create type pgq.ret_consumer_info as (
-    queue_name text,
-    consumer_name text,
-    lag interval,
-    last_seen interval
+    queue_name      text,
+    consumer_name   text,
+    lag             interval,
+    last_seen       interval,
+    last_tick       bigint,
+    current_batch   bigint,
+    next_tick       bigint
 );
 
 create type pgq.ret_batch_info as (
-    queue_name text,
-    consumer_name text,
-    batch_start timestamptz,
-    batch_end timestamptz,
-    prev_tick_id bigint,
-    tick_id bigint,
-    lag interval
+    queue_name      text,
+    consumer_name   text,
+    batch_start     timestamptz,
+    batch_end       timestamptz,
+    prev_tick_id    bigint,
+    tick_id         bigint,
+    lag             interval
 );
 
 
 create type pgq.ret_batch_event as (
-       ev_id               bigint,
-        ev_time             timestamptz,
-
-        ev_txid             bigint,
-        ev_retry            int4,
-
-        ev_type             text,
-        ev_data             text,
-        ev_extra1           text,
-        ev_extra2           text,
-        ev_extra3           text,
-        ev_extra4           text
+    ev_id          bigint,
+    ev_time         timestamptz,
+
+    ev_txid         bigint,
+    ev_retry        int4,
+
+    ev_type         text,
+    ev_data         text,
+    ev_extra1       text,
+    ev_extra2       text,
+    ev_extra3       text,
+    ev_extra4       text
 );