_node record;
_tbloid oid;
_extra_args text;
- _skip_prefix text := 'zzzkip';
+ _skip_prefix text := 'zzz_';
_skip_trg_count integer;
_skip_trg_name text;
+ _merge_all boolean := false;
+ _skip_truncate boolean := false;
+ _no_triggers boolean := false;
+ _skip boolean := false;
+ _queue_name text;
begin
_extra_args := '';
fq_table_name := londiste.make_fqname(i_table_name);
raise exception 'lost table: %', fq_table_name;
end if;
- -- skip triggers on leaf node
+ -- new trigger
+ lg_name := '_londiste_' || i_queue_name;
+ lg_func := 'pgq.logutriga';
+ lg_event := '';
+ lg_args := quote_literal(i_queue_name);
+ lg_pos := 'after';
+
+ -- parse extra args
+ if array_lower(i_trg_args, 1) is not null then
+ for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
+ arg := i_trg_args[i];
+ if arg like 'tgflags=%' then
+ -- special flag handling
+ arg := upper(substr(arg, 9));
+ if position('B' in arg) > 0 then
+ lg_pos := 'before';
+ end if;
+ if position('A' in arg) > 0 then
+ lg_pos := 'after';
+ end if;
+ if position('Q' in arg) > 0 then
+ lg_func := 'pgq.sqltriga';
+ end if;
+ if position('L' in arg) > 0 then
+ lg_func := 'pgq.logutriga';
+ end if;
+ if position('I' in arg) > 0 then
+ lg_event := lg_event || ' or insert';
+ end if;
+ if position('U' in arg) > 0 then
+ lg_event := lg_event || ' or update';
+ end if;
+ if position('D' in arg) > 0 then
+ lg_event := lg_event || ' or delete';
+ end if;
+ if position('S' in arg) > 0 then
+ _skip := true;
+ end if;
+ elsif arg = 'expect_sync' then
+ -- already handled
+ elsif arg = 'skip_truncate' then
+ _skip_truncate := true;
+ elsif arg = 'no_triggers' then
+ _no_triggers := true;
+ elsif arg = 'merge_all' then
+ _merge_all = true;
+ elsif lower(arg) = 'skip' then
+ _skip := true;
+ else
+ -- ordinary arg
+ lg_args := lg_args || ', ' || quote_literal(arg);
+ end if;
+ end loop;
+ end if;
+
+ -- merge all table sources on leaf
if _node.node_type = 'leaf' then
+ for _queue_name in
+ select t2.queue_name
+ from londiste.table_info t
+ join pgq_node.node_info n on (n.queue_name = t.queue_name)
+ left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
+ (n2.combined_queue is null and n.combined_queue is null))
+ left join londiste.table_info t2 on (t2.table_name = t.table_name and t2.queue_name = n2.queue_name)
+ where t.queue_name = i_queue_name
+ and t.table_name = fq_table_name
+ and t2.local = false
+ and t2.queue_name != i_queue_name -- skip self
+ loop
+ if not _merge_all then
+ select 405, 'multiple source tables '|| fq_table_name
+ || ' found, use merge_all'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ update londiste.table_info
+ set local = true,
+ merge_state = new_state
+ where queue_name = _queue_name and table_name = fq_table_name;
+ if not found then
+ raise exception 'lost table: %', fq_table_name;
+ end if;
+ end loop;
+
-- on weird leafs the trigger funcs may not exist
perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
_extra_args := ', ' || quote_literal('deny');
end if;
+ -- if skip param given
+ if _skip then
+ -- get count and name of existing skip triggers
+ select count(*), min(t.tgname)
+ into _skip_trg_count, _skip_trg_name
+ from pg_catalog.pg_trigger t
+ where t.tgrelid = londiste.find_table_oid(fq_table_name)
+ and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
+ -- if no previous skip triggers, prefix name and add SKIP to args
+ if _skip_trg_count = 0 then
+ lg_name := _skip_prefix || lg_name;
+ lg_args := lg_args || ', ' || quote_literal('SKIP');
+ -- if one previous skip trigger, check it's prefix and
+ -- do not use SKIP on current trigger
+ elsif _skip_trg_count = 1 then
+ -- if not prefixed then rename
+ if position(_skip_prefix in _skip_trg_name) != 1 then
+ sql := 'alter trigger ' || _skip_trg_name
+ || ' on ' || londiste.quote_fqname(fq_table_name)
+ || ' rename to ' || _skip_prefix || _skip_trg_name;
+ execute sql;
+ end if;
+ else
+ select 405, 'Multiple SKIP triggers in table: ' || fq_table_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ end if;
+
-- create Ins/Upd/Del trigger if it does not exists already
- lg_name := '_londiste_' || i_queue_name;
perform 1 from pg_catalog.pg_trigger
where tgrelid = londiste.find_table_oid(fq_table_name)
and tgname = lg_name;
if not found then
- -- new trigger
- lg_func := 'pgq.logutriga';
- lg_event := '';
- lg_args := quote_literal(i_queue_name);
- lg_pos := 'after';
+ if _no_triggers then
+ select 200, 'Table added with no triggers: ' || fq_table_name
+ into ret_code, ret_note;
+ return;
+ end if;
- -- parse extra args
- if array_lower(i_trg_args, 1) is not null then
- for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
- arg := i_trg_args[i];
- if arg like 'tgflags=%' then
- -- special flag handling
- arg := upper(substr(arg, 9));
- if position('B' in arg) > 0 then
- lg_pos := 'before';
- end if;
- if position('A' in arg) > 0 then
- lg_pos := 'after';
- end if;
- if position('Q' in arg) > 0 then
- lg_func := 'pgq.sqltriga';
- end if;
- if position('L' in arg) > 0 then
- lg_func := 'pgq.logutriga';
- end if;
- if position('I' in arg) > 0 then
- lg_event := lg_event || ' or insert';
- end if;
- if position('U' in arg) > 0 then
- lg_event := lg_event || ' or update';
- end if;
- if position('D' in arg) > 0 then
- lg_event := lg_event || ' or delete';
- end if;
- if position('S' in arg) > 0 then
- -- get count and name of existing skip triggers
- select count(*), min(t.tgname)
- into _skip_trg_count, _skip_trg_name
- from pg_catalog.pg_trigger t
- where t.tgrelid = londiste.find_table_oid(fq_table_name)
- and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
- -- if no previous skip triggers, prefix name and add SKIP to args
- if _skip_trg_count = 0 then
- lg_name := _skip_prefix || lg_name;
- lg_args := lg_args || ', ' || quote_literal('SKIP');
- -- if one previous skip trigger, check it's prefix and
- -- do not use SKIP on current trigger
- elsif _skip_trg_count = 1 then
- -- if not prefixed then rename
- if position(_skip_prefix in _skip_trg_name) != 1 then
- sql := 'alter trigger ' || _skip_trg_name
- || ' on ' || londiste.quote_fqname(fq_table_name)
- || ' rename to ' || _skip_prefix || _skip_trg_name;
- execute sql;
- end if;
- else
- select 405, 'Multiple SKIP triggers in table: ' || fq_table_name
- into ret_code, ret_note;
- return;
- end if;
- end if;
- elsif arg = 'expect_sync' then
- -- already handled
- elsif arg = 'skip_truncate' then
- perform 1 from londiste.local_set_table_attrs(i_queue_name, fq_table_name, 'skip_truncate=1');
- elsif arg = 'no_triggers' then
- select 200, 'Table added with no triggers: ' || fq_table_name into ret_code, ret_note;
- return;
- else
- -- ordinary arg
- lg_args := lg_args || ', ' || quote_literal(arg);
- end if;
- end loop;
+ if _skip_truncate then
+ perform 1
+ from londiste.local_set_table_attrs(i_queue_name,
+ fq_table_name,
+ 'skip_truncate=1');
end if;
-- finalize event