SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))),
SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))),
SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))),
- SWord('paused', EQ(Value(w_qargs2, name = 'external_ticker'))))
+ SWord('paused', EQ(Value(w_qargs2, name = 'ticker_paused'))))
w_qargs2.set_real(WList(
w_done,
"queue_ticker_max_count as max_cnt",
"queue_ticker_max_lag as max_lag",
"queue_ticker_idle_period as idle_period",
- "queue_external_ticker as paused",
+ "queue_ticker_paused as paused",
"ticker_lag",
]
pfx = "select " + ",".join(fields)
+ queue_ticker_max_count * 2 + 1000) as tmp
into q from pgq.queue
where queue_name = i_queue_name
- and not queue_external_ticker;
+ and not queue_external_ticker
+ and not queue_ticker_paused;
--if not found then
-- raise notice 'queue not found or ticks not allowed';
out queue_rotation_period interval,
out queue_switch_time timestamptz,
out queue_external_ticker boolean,
+ out queue_ticker_paused boolean,
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
-- ----------------------------------------------------------------------
begin
for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
- queue_switch_time, queue_external_ticker, queue_ticker_max_count,
- queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag
+ queue_switch_time, queue_external_ticker, queue_ticker_paused,
+ queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
+ ticker_lag
in select
f.queue_name, f.queue_ntables, f.queue_cur_table, f.queue_rotation_period,
- f.queue_switch_time, f.queue_external_ticker, f.queue_ticker_max_count,
- f.queue_ticker_max_lag, f.queue_ticker_idle_period, f.ticker_lag
+ f.queue_switch_time, f.queue_external_ticker, f.queue_ticker_paused,
+ f.queue_ticker_max_count, f.queue_ticker_max_lag, f.queue_ticker_idle_period,
+ f.ticker_lag
from pgq.get_queue_info(null) f
loop
return next;
out queue_rotation_period interval,
out queue_switch_time timestamptz,
out queue_external_ticker boolean,
+ out queue_ticker_paused boolean,
out queue_ticker_max_count integer,
out queue_ticker_max_lag interval,
out queue_ticker_idle_period interval,
-- ----------------------------------------------------------------------
begin
for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
- queue_switch_time, queue_external_ticker, queue_ticker_max_count,
- queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag
+ queue_switch_time, queue_external_ticker, queue_ticker_paused,
+ queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
+ ticker_lag
in select
q.queue_name, q.queue_ntables, q.queue_cur_table,
q.queue_rotation_period, q.queue_switch_time,
- q.queue_external_ticker,
+ q.queue_external_ticker, q.queue_ticker_paused,
q.queue_ticker_max_count, q.queue_ticker_max_lag,
q.queue_ticker_idle_period,
(select current_timestamp - tick_time
'queue_ticker_max_count',
'queue_ticker_max_lag',
'queue_ticker_idle_period',
+ 'queue_ticker_paused',
'queue_rotation_period',
'queue_external_ticker')
then
select queue_id, i_tick_id, i_orig_timestamp, i_event_seq
from pgq.queue
where queue_name = i_queue_name
- and queue_external_ticker;
+ and queue_external_ticker
+ and not queue_ticker_paused;
if not found then
- raise exception 'queue not found or external ticker disabled: %', i_queue_name;
+ raise exception 'queue not found or ticker disabled: %', i_queue_name;
end if;
-- make sure seqs stay current
select queue_id, queue_tick_seq, queue_external_ticker,
queue_ticker_max_count, queue_ticker_max_lag,
queue_ticker_idle_period, queue_event_seq,
- pgq.seq_getval(queue_event_seq) as event_seq
+ pgq.seq_getval(queue_event_seq) as event_seq,
+ queue_ticker_paused
into q
from pgq.queue where queue_name = i_queue_name;
if not found then
raise exception 'This queue has external tick source.';
end if;
+ if q.queue_ticker_paused then
+ raise exception 'Ticker has been paused for this queue';
+ end if;
+
-- load state from last tick
select now() - tick_time as lag,
q.event_seq - tick_event_seq as new_events,
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(0)
--
--- Creates ticks for all queues which dont have external ticker.
+-- Creates ticks for all unpaused queues which dont have external ticker.
--
-- Returns:
-- Number of queues that were processed.
for q in
select queue_name from pgq.queue
where not queue_external_ticker
+ and not queue_ticker_paused
order by queue_name
loop
if pgq.ticker(q.queue_name) > 0 then
-- queue_switch_step2 - tx after rotation was committed
-- queue_switch_time - time when switch happened
-- queue_external_ticker - ticks come from some external sources
+-- queue_ticker_paused - ticker is paused
-- queue_disable_insert - disallow pgq.insert_event()
-- queue_ticker_max_count - batch should not contain more events
-- queue_ticker_max_lag - events should not age more
queue_external_ticker boolean not null default false,
queue_disable_insert boolean not null default false,
+ queue_ticker_paused boolean not null default false,
queue_ticker_max_count integer not null default 500,
queue_ticker_max_lag interval not null default '3 seconds',
#define W_SOCK 1
#define W_TIME 2
+typedef void (*libev_cb)(int sock, short flags, void *arg);
+
struct PgSocket {
struct event ev;
void *handler_arg;
};
-typedef void (*libev_cb)(int sock, short flags, void *arg);
-
static void send_event(struct PgSocket *db, enum PgEvent ev)
{
db->handler_func(db, db->handler_arg, ev, NULL);