sql = ev.data
# fixme: curs?
+ pgver = dst_curs.connection.server_version
+ if pgver >= 80300:
+ curs.execute("set local session_replication_role = 'local'")
q = "select * from londiste.execute_start(%s, %s, %s, false)"
res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql], commit = False)
ret = res[0]['ret_code']
dst_curs.execute(stmt)
q = "select * from londiste.execute_finish(%s, %s)"
self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False)
+ if pgver >= 80300:
+ curs.execute("set local session_replication_role = 'replica'")
def apply_sql(self, sql, dst_curs):
# set replica role for EXECUTE transaction
if db.server_version >= 80300:
- curs.execute("set local session_replication_role = 'replica'")
+ curs.execute("set local session_replication_role = 'local'")
for fn in files:
fname = os.path.basename(fn)
logtrg_name text;
b_queue_name bytea;
begin
+ -- skip if no triggers found on that table
+ perform 1 from pg_catalog.pg_trigger where tgrelid = londiste.find_table_oid(i_table_name);
+ if not found then
+ return;
+ end if;
-- cast to bytea
b_queue_name := replace(i_queue_name, E'\\', E'\\\\')::bytea;
for logtrg_name in
select tgname from pg_catalog.pg_trigger
where tgrelid = londiste.find_table_oid(i_table_name)
- and tgfoid in ('pgq.sqltriga'::regproc::oid, 'pgq.logutriga'::regproc::oid)
+ and londiste.is_replica_func(tgfoid)
and substring(tgargs for (position(E'\\000'::bytea in tgargs) - 1)) = b_queue_name
loop
execute 'drop trigger ' || quote_ident(logtrg_name)
perform pgq.insert_event(i_queue_name, 'EXECUTE', sql, i_file_name, null, null, null);
end if;
- -- try educated guess of previous state
- if is_root then
- SET LOCAL session_replication_role = 'origin';
- else
- SET LOCAL session_replication_role = 'replica';
- end if;
-
select 200, 'Execute finished: ' || i_file_name into ret_code, ret_note;
return;
end;
insert into londiste.applied_execute (queue_name, execute_file, execute_sql)
values (i_queue_name, i_file_name, i_sql);
- SET LOCAL session_replication_role = 'local';
-
select 200, 'Executing: ' || i_file_name into ret_code, ret_note;
return;
end;
--- /dev/null
+
+create or replace function londiste.is_replica_func(func_oid oid)
+returns boolean as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.is_replica_func(1)
+--
+-- Returns true if function is a PgQ-based replication functions.
+-- This also means it takes queue name as first argument.
+-- ----------------------------------------------------------------------
+select count(1) > 0
+ from pg_proc f join pg_namespace n on (n.oid = f.pronamespace)
+ where f.oid = $1 and n.nspname = 'pgq' and f.proname in ('sqltriga', 'logutriga');
+$$ language sql strict stable;
+
if pgversion >= 90000 then
select tg.tgname into logtrg_previous
- from pg_class r, pg_trigger tg
+ from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
where r.oid = londiste.find_table_oid(fq_table_name)
and not tg.tgisinternal
and tg.tgname < lg_name::name
-- per-row AFTER trigger
and (tg.tgtype & 3) = 1 -- bits: 0:ROW, 1:BEFORE
-- current londiste
- and tg.tgfoid not in ('pgq.sqltriga'::regproc::oid, 'pgq.logutriga'::regproc::oid)
+ and not londiste.is_replica_func(tg.tgfoid)
-- old londiste
and substring(tg.tgname from 1 for 10) != '_londiste_'
and substring(tg.tgname from char_length(tg.tgname) - 6) != '_logger'
order by 1 limit 1;
else
select tg.tgname into logtrg_previous
- from pg_class r, pg_trigger tg
+ from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
where r.oid = londiste.find_table_oid(fq_table_name)
and not tg.tgisconstraint
and tg.tgname < lg_name::name
-- per-row AFTER trigger
and (tg.tgtype & 3) = 1 -- bits: 0:ROW, 1:BEFORE
-- current londiste
- and tg.tgfoid not in ('pgq.sqltriga'::regproc::oid, 'pgq.logutriga'::regproc::oid)
+ and not londiste.is_replica_func(t.tgfoid)
-- old londiste
and substring(tg.tgname from 1 for 10) != '_londiste_'
and substring(tg.tgname from char_length(tg.tgname) - 6) != '_logger'
\i functions/londiste.split_fqname.sql
\i functions/londiste.table_info_trigger.sql
\i functions/londiste.drop_table_triggers.sql
+\i functions/londiste.is_replica_func.sql
-- current_batch - Current batch ID, if one is active or NULL
-- next_tick - If batch is active, then its final tick.
-- ----------------------------------------------------------------------
+declare
+ _pending_events bigint;
+ _queue_id bigint;
begin
for queue_name, consumer_name, lag, last_seen,
- last_tick, current_batch, next_tick, pending_events
+ last_tick, current_batch, next_tick, _pending_events, _queue_id
in
select q.queue_name, c.co_name,
current_timestamp - t.tick_time,
current_timestamp - s.sub_active,
s.sub_last_tick, s.sub_batch, s.sub_next_tick,
- top.tick_event_seq - t.tick_event_seq
- from pgq.queue q
- left join pgq.tick top
- on (top.tick_queue = q.queue_id
- and top.tick_id = (select tmp.tick_id from pgq.tick tmp
- where tmp.tick_queue = q.queue_id
- order by tmp.tick_queue desc, tmp.tick_id desc
- limit 1)),
+ t.tick_event_seq, q.queue_id
+ from pgq.queue q,
pgq.consumer c,
pgq.subscription s
left join pgq.tick t
and (i_consumer_name is null or c.co_name = i_consumer_name)
order by 1,2
loop
+ select t.tick_event_seq - _pending_events
+ into pending_events
+ from pgq.tick t
+ where t.tick_queue = _queue_id
+ order by t.tick_queue desc, t.tick_id desc
+ limit 1;
return next;
end loop;
return;
-- Returns:
-- One pgq.ret_queue_info record.
-- ----------------------------------------------------------------------
+declare
+ _ticker_lag interval;
+ _top_tick_id bigint;
+ _ht_tick_id bigint;
+ _top_tick_time timestamptz;
+ _top_tick_event_seq bigint;
+ _ht_tick_time timestamptz;
+ _ht_tick_event_seq bigint;
+ _queue_id integer;
+ _queue_event_seq text;
begin
for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
queue_switch_time, queue_external_ticker, queue_ticker_paused,
queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
- ticker_lag, ev_per_sec, ev_new
+ _queue_id, _queue_event_seq
in select
q.queue_name, q.queue_ntables, q.queue_cur_table,
q.queue_rotation_period, q.queue_switch_time,
q.queue_external_ticker, q.queue_ticker_paused,
q.queue_ticker_max_count, q.queue_ticker_max_lag,
q.queue_ticker_idle_period,
- (select current_timestamp - tick_time
- from pgq.tick where tick_queue = queue_id
- order by tick_queue desc, tick_id desc limit 1),
- case when ht.tick_time < top.tick_time
- then (top.tick_event_seq - ht.tick_event_seq) / extract(epoch from (top.tick_time - ht.tick_time))
- else null end,
- pgq.seq_getval(q.queue_event_seq) - top.tick_event_seq
+ q.queue_id, q.queue_event_seq
from pgq.queue q
- left join pgq.tick top
- on (top.tick_queue = q.queue_id
- and top.tick_id = (select tmp.tick_id from pgq.tick tmp
- where tmp.tick_queue = q.queue_id
- order by tmp.tick_queue desc, tmp.tick_id desc
- limit 1))
- left join pgq.tick ht
- on (ht.tick_queue = q.queue_id
- and ht.tick_id = (select tmp2.tick_id from pgq.tick tmp2
- where tmp2.tick_queue = q.queue_id
- and tmp2.tick_id >= top.tick_id - 20
- order by tmp2.tick_queue asc, tmp2.tick_id asc
- limit 1))
where (i_queue_name is null or q.queue_name = i_queue_name)
order by q.queue_name
loop
+ -- most recent tick
+ select (current_timestamp - t.tick_time),
+ tick_id, t.tick_time, t.tick_event_seq
+ into ticker_lag, _top_tick_id, _top_tick_time, _top_tick_event_seq
+ from pgq.tick t
+ where t.tick_queue = _queue_id
+ order by t.tick_queue desc, t.tick_id desc
+ limit 1;
+ -- slightly older tick
+ select ht.tick_id, ht.tick_time, ht.tick_event_seq
+ into _ht_tick_id, _ht_tick_time, _ht_tick_event_seq
+ from pgq.tick ht
+ where ht.tick_queue = _queue_id
+ and ht.tick_id >= _top_tick_id - 20
+ order by ht.tick_queue asc, ht.tick_id asc
+ limit 1;
+ if _ht_tick_time < _top_tick_time then
+ ev_per_sec = (_top_tick_event_seq - _ht_tick_event_seq) / extract(epoch from (_top_tick_time - _ht_tick_time));
+ else
+ ev_per_sec = null;
+ end if;
+ ev_new = pgq.seq_getval(_queue_event_seq) - _top_tick_event_seq;
return next;
end loop;
return;
(1 row)
select * from pgq_node.set_consumer_uptodate('bqueue', 'random_consumer2', true);
- ret_code | ret_note
-----------+--------------------------
- 200 | Consumer uptodate = true
+ ret_code | ret_note
+----------+-----------------------
+ 200 | Consumer uptodate = 1
(1 row)
select * from pgq_node.change_consumer_provider('bqueue', 'random_consumer2', 'node3');
declare
sql text;
begin
- select 100, 'Ok', n.node_type, n.node_name, g.last_tick,
+ select 100, 'Ok', n.node_type, n.node_name,
c.node_type, c.queue_name, w.provider_node, l.node_location,
n.worker_name, w.paused, w.uptodate, w.last_tick_id
- into ret_code, ret_note, node_type, node_name, global_watermark,
+ into ret_code, ret_note, node_type, node_name,
combined_type, combined_queue, provider_node, provider_location,
worker_name, worker_paused, worker_uptodate, worker_last_tick
from pgq_node.node_info n
left join pgq_node.node_info c on (c.queue_name = n.combined_queue)
left join pgq_node.local_state w on (w.queue_name = n.queue_name and w.consumer_name = n.worker_name)
left join pgq_node.node_location l on (l.queue_name = w.queue_name and l.node_name = w.provider_node)
- left join pgq.get_consumer_info(i_queue_name, '.global_watermark') g on (g.queue_name = n.queue_name)
where n.queue_name = i_queue_name;
if not found then
select 404, 'Unknown queue: ' || i_queue_name into ret_code, ret_note;
end if;
if node_type in ('root', 'branch') then
- select min(last_tick) into local_watermark
- from pgq.get_consumer_info(i_queue_name)
- where consumer_name <> '.global_watermark';
+ select min(case when consumer_name = '.global_watermark' then null else last_tick end),
+ min(case when consumer_name = '.global_watermark' then last_tick else null end)
+ into local_watermark, global_watermark
+ from pgq.get_consumer_info(i_queue_name);
if local_watermark is null then
select t.tick_id into local_watermark
from pgq.tick t, pgq.queue q
-- worker_name - consumer that maintains remote node
-- local_watermark - lowest tick_id on subscriber
-- ----------------------------------------------------------------------
+declare
+ _watermark_name text;
begin
- for node_name, worker_name, node_watermark in
- select s.subscriber_node, s.worker_name,
- (select last_tick from pgq.get_consumer_info(i_queue_name, s.watermark_name)) as wm_pos
+ for node_name, worker_name, _watermark_name in
+ select s.subscriber_node, s.worker_name, s.watermark_name
from pgq_node.subscriber_info s
where s.queue_name = i_queue_name
order by 1
loop
+ select last_tick into node_watermark
+ from pgq.get_consumer_info(i_queue_name, _watermark_name);
return next;
end loop;
return;
where queue_name = i_queue_name
and consumer_name = i_consumer_name;
if found then
- select 200, 'Consumer uptodate = '||i_uptodate into ret_code, ret_note;
+ select 200, 'Consumer uptodate = ' || i_uptodate::int4::text
+ into ret_code, ret_note;
else
select 404, 'Consumer not known: '
|| i_queue_name || '/' || i_consumer_name