out last_seen interval,
out last_tick bigint,
out current_batch bigint,
- out next_tick bigint)
+ out next_tick bigint,
+ out pending_events bigint)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq.get_consumer_info(0)
-- ----------------------------------------------------------------------
begin
for queue_name, consumer_name, lag, last_seen,
- last_tick, current_batch, next_tick
+ last_tick, current_batch, next_tick, pending_events
in
select f.queue_name, f.consumer_name, f.lag, f.last_seen,
- f.last_tick, f.current_batch, f.next_tick
+ f.last_tick, f.current_batch, f.next_tick, f.pending_events
from pgq.get_consumer_info(null, null) f
loop
return next;
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
- out next_tick bigint)
+ out next_tick bigint,
+ out pending_events bigint)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq.get_consumer_info(1)
-- ----------------------------------------------------------------------
begin
for queue_name, consumer_name, lag, last_seen,
- last_tick, current_batch, next_tick
+ last_tick, current_batch, next_tick, pending_events
in
select f.queue_name, f.consumer_name, f.lag, f.last_seen,
- f.last_tick, f.current_batch, f.next_tick
+ f.last_tick, f.current_batch, f.next_tick, f.pending_events
from pgq.get_consumer_info(i_queue_name, null) f
loop
return next;
out last_seen interval,
out last_tick bigint,
out current_batch bigint,
- out next_tick bigint)
+ out next_tick bigint,
+ out pending_events bigint)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq.get_consumer_info(2)
-- ----------------------------------------------------------------------
begin
for queue_name, consumer_name, lag, last_seen,
- last_tick, current_batch, next_tick
+ last_tick, current_batch, next_tick, pending_events
in
select q.queue_name, c.co_name,
current_timestamp - t.tick_time,
current_timestamp - s.sub_active,
- s.sub_last_tick, s.sub_batch, s.sub_next_tick
- from pgq.queue q, pgq.consumer c,
- pgq.subscription s left join pgq.tick t
- on (t.tick_queue = s.sub_queue and t.tick_id = s.sub_last_tick)
+ s.sub_last_tick, s.sub_batch, s.sub_next_tick,
+ top.tick_event_seq - t.tick_event_seq
+ from pgq.queue q
+ left join pgq.tick top
+ on (top.tick_queue = q.queue_id
+ and top.tick_id = (select tmp.tick_id from pgq.tick tmp
+ where tmp.tick_queue = q.queue_id
+ order by tmp.tick_queue desc, tmp.tick_id desc
+ limit 1)),
+ pgq.consumer c,
+ pgq.subscription s
+ left join pgq.tick t
+ on (t.tick_queue = s.sub_queue and t.tick_id = s.sub_last_tick)
where q.queue_id = s.sub_queue
and c.co_id = s.sub_consumer
and (i_queue_name is null or q.queue_name = i_queue_name)
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
- out ticker_lag interval)
+ out ticker_lag interval,
+ out ev_per_sec float8,
+ out ev_new bigint)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq.get_queue_info(0)
for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
queue_switch_time, queue_external_ticker, queue_ticker_paused,
queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
- ticker_lag
+ ticker_lag, ev_per_sec, ev_new
in select
f.queue_name, f.queue_ntables, f.queue_cur_table, f.queue_rotation_period,
f.queue_switch_time, f.queue_external_ticker, f.queue_ticker_paused,
f.queue_ticker_max_count, f.queue_ticker_max_lag, f.queue_ticker_idle_period,
- f.ticker_lag
+ f.ticker_lag, f.ev_per_sec, f.ev_new
from pgq.get_queue_info(null) f
loop
return next;
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
- out ticker_lag interval)
+ out ticker_lag interval,
+ out ev_per_sec float8,
+ out ev_new bigint)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq.get_queue_info(1)
for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
queue_switch_time, queue_external_ticker, queue_ticker_paused,
queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
- ticker_lag
+ ticker_lag, ev_per_sec, ev_new
in select
q.queue_name, q.queue_ntables, q.queue_cur_table,
q.queue_rotation_period, q.queue_switch_time,
q.queue_ticker_idle_period,
(select current_timestamp - tick_time
from pgq.tick where tick_queue = queue_id
- order by tick_queue desc, tick_id desc limit 1)
+ order by tick_queue desc, tick_id desc limit 1),
+ case when ht.tick_time < top.tick_time
+ then (top.tick_event_seq - ht.tick_event_seq) / extract(epoch from (top.tick_time - ht.tick_time))
+ else null end,
+ pgq.seq_getval(q.queue_event_seq) - top.tick_event_seq
from pgq.queue q
+ left join pgq.tick top
+ on (top.tick_queue = q.queue_id
+ and top.tick_id = (select tmp.tick_id from pgq.tick tmp
+ where tmp.tick_queue = q.queue_id
+ order by tmp.tick_queue desc, tmp.tick_id desc
+ limit 1))
+ left join pgq.tick ht
+ on (ht.tick_queue = q.queue_id
+ and ht.tick_id = (select tmp2.tick_id from pgq.tick tmp2
+ where tmp2.tick_queue = q.queue_id
+ and tmp2.tick_id >= top.tick_id - 20
+ order by tmp2.tick_queue asc, tmp2.tick_id asc
+ limit 1))
where (i_queue_name is null or q.queue_name = i_queue_name)
order by q.queue_name
loop