sql/pgq: add quick stats to get_{queue|consumer}_info
authorMarko Kreen <markokr@gmail.com>
Tue, 3 Nov 2009 12:30:27 +0000 (14:30 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 3 Nov 2009 12:43:44 +0000 (14:43 +0200)
get_consumer_info().pending_events

  How batched events are availble for read.

get_queue_info().ev_per_sec

  Average event creation speed, based on recent 20 batches.

get_queue_info().ev_new

  How many events have appreased in queue since recent tick.
  Those are not yet in any batch so consumers cannot read them yet.

sql/pgq/functions/pgq.get_consumer_info.sql
sql/pgq/functions/pgq.get_queue_info.sql

index 6e0f92cd407e95405b0646ba4b297013def2f01b..76467a85cc5a39589c5e92727178c97402507304 100644 (file)
@@ -6,7 +6,8 @@ create or replace function pgq.get_consumer_info(
     out last_seen       interval,
     out last_tick       bigint,
     out current_batch   bigint,
-    out next_tick       bigint)
+    out next_tick       bigint,
+    out pending_events  bigint)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_consumer_info(0)
@@ -18,10 +19,10 @@ returns setof record as $$
 -- ----------------------------------------------------------------------
 begin
     for queue_name, consumer_name, lag, last_seen,
-        last_tick, current_batch, next_tick
+        last_tick, current_batch, next_tick, pending_events
     in
         select f.queue_name, f.consumer_name, f.lag, f.last_seen,
-               f.last_tick, f.current_batch, f.next_tick
+               f.last_tick, f.current_batch, f.next_tick, f.pending_events
             from pgq.get_consumer_info(null, null) f
     loop
         return next;
@@ -40,7 +41,8 @@ create or replace function pgq.get_consumer_info(
     out last_seen       interval,
     out last_tick       bigint,
     out current_batch   bigint,
-    out next_tick       bigint)
+    out next_tick       bigint,
+    out pending_events  bigint)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_consumer_info(1)
@@ -52,10 +54,10 @@ returns setof record as $$
 -- ----------------------------------------------------------------------
 begin
     for queue_name, consumer_name, lag, last_seen,
-        last_tick, current_batch, next_tick
+        last_tick, current_batch, next_tick, pending_events
     in
         select f.queue_name, f.consumer_name, f.lag, f.last_seen,
-               f.last_tick, f.current_batch, f.next_tick
+               f.last_tick, f.current_batch, f.next_tick, f.pending_events
             from pgq.get_consumer_info(i_queue_name, null) f
     loop
         return next;
@@ -75,7 +77,8 @@ create or replace function pgq.get_consumer_info(
     out last_seen       interval,
     out last_tick       bigint,
     out current_batch   bigint,
-    out next_tick       bigint)
+    out next_tick       bigint,
+    out pending_events  bigint)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_consumer_info(2)
@@ -97,15 +100,24 @@ returns setof record as $$
 -- ----------------------------------------------------------------------
 begin
     for queue_name, consumer_name, lag, last_seen,
-        last_tick, current_batch, next_tick
+        last_tick, current_batch, next_tick, pending_events
     in
         select q.queue_name, c.co_name,
                current_timestamp - t.tick_time,
                current_timestamp - s.sub_active,
-               s.sub_last_tick, s.sub_batch, s.sub_next_tick
-          from pgq.queue q, pgq.consumer c,
-               pgq.subscription s left join pgq.tick t
-               on (t.tick_queue = s.sub_queue and t.tick_id = s.sub_last_tick)
+               s.sub_last_tick, s.sub_batch, s.sub_next_tick,
+               top.tick_event_seq - t.tick_event_seq
+          from pgq.queue q
+               left join pgq.tick top
+                 on (top.tick_queue = q.queue_id
+                     and top.tick_id = (select tmp.tick_id from pgq.tick tmp
+                                         where tmp.tick_queue = q.queue_id
+                                         order by tmp.tick_queue desc, tmp.tick_id desc
+                                         limit 1)),
+               pgq.consumer c,
+               pgq.subscription s
+               left join pgq.tick t
+                 on (t.tick_queue = s.sub_queue and t.tick_id = s.sub_last_tick)
          where q.queue_id = s.sub_queue
            and c.co_id = s.sub_consumer
            and (i_queue_name is null or q.queue_name = i_queue_name)
index dcc34c2cc7fa0d7f9353941d51683f78565d36c0..d512b69538f4bc066c6ed3255cb7acfbb6311679 100644 (file)
@@ -9,7 +9,9 @@ create or replace function pgq.get_queue_info(
     out queue_ticker_max_count      integer,
     out queue_ticker_max_lag        interval,
     out queue_ticker_idle_period    interval,
-    out ticker_lag                  interval)
+    out ticker_lag                  interval,
+    out ev_per_sec                  float8,
+    out ev_new                bigint)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_queue_info(0)
@@ -23,12 +25,12 @@ begin
     for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
         queue_switch_time, queue_external_ticker, queue_ticker_paused,
         queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
-        ticker_lag
+        ticker_lag, ev_per_sec, ev_new
     in select
         f.queue_name, f.queue_ntables, f.queue_cur_table, f.queue_rotation_period,
         f.queue_switch_time, f.queue_external_ticker, f.queue_ticker_paused,
         f.queue_ticker_max_count, f.queue_ticker_max_lag, f.queue_ticker_idle_period,
-        f.ticker_lag
+        f.ticker_lag, f.ev_per_sec, f.ev_new
         from pgq.get_queue_info(null) f
     loop
         return next;
@@ -49,7 +51,9 @@ create or replace function pgq.get_queue_info(
     out queue_ticker_max_count      integer,
     out queue_ticker_max_lag        interval,
     out queue_ticker_idle_period    interval,
-    out ticker_lag                  interval)
+    out ticker_lag                  interval,
+    out ev_per_sec                  float8,
+    out ev_new                bigint)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.get_queue_info(1)
@@ -63,7 +67,7 @@ begin
     for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
         queue_switch_time, queue_external_ticker, queue_ticker_paused,
         queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
-        ticker_lag
+        ticker_lag, ev_per_sec, ev_new
     in select
         q.queue_name, q.queue_ntables, q.queue_cur_table,
         q.queue_rotation_period, q.queue_switch_time,
@@ -72,8 +76,25 @@ begin
         q.queue_ticker_idle_period,
         (select current_timestamp - tick_time
            from pgq.tick where tick_queue = queue_id
-          order by tick_queue desc, tick_id desc limit 1)
+          order by tick_queue desc, tick_id desc limit 1),
+        case when ht.tick_time < top.tick_time
+             then (top.tick_event_seq - ht.tick_event_seq) / extract(epoch from (top.tick_time - ht.tick_time))
+             else null end,
+        pgq.seq_getval(q.queue_event_seq) - top.tick_event_seq
         from pgq.queue q
+          left join pgq.tick top
+            on (top.tick_queue = q.queue_id
+                and top.tick_id = (select tmp.tick_id from pgq.tick tmp
+                                    where tmp.tick_queue = q.queue_id
+                                    order by tmp.tick_queue desc, tmp.tick_id desc
+                                    limit 1))
+          left join pgq.tick ht
+            on (ht.tick_queue = q.queue_id
+                and ht.tick_id = (select tmp2.tick_id from pgq.tick tmp2
+                                   where tmp2.tick_queue = q.queue_id
+                                     and tmp2.tick_id >= top.tick_id - 20
+                                   order by tmp2.tick_queue asc, tmp2.tick_id asc
+                                   limit 1))
         where (i_queue_name is null or q.queue_name = i_queue_name)
         order by q.queue_name
     loop