help = "add: create table/seq if not exist")
g.add_option("--create-only",
help = "add: create table/seq if not exist (seq,pkey,full,indexes,fkeys)")
+ g.add_option("--trigger-flags",
+ help="add: Set trigger flags (BAIUDLQ)")
g.add_option("--trigger-arg", action="append",
- help="add: Custom trigger arg")
+ help="add: Custom trigger arg (can be specified multiply times)")
g.add_option("--handler", action="append",
help="add: Custom handler for table")
g.add_option("--copy-condition", dest="copy_condition",
help="include all tables", default=False)
p.add_option("--create-only",
help="pkey,fkeys,indexes")
+ p.add_option("--trigger-flags",
+ help="Set trigger flags (BAIUDLQ)")
p.add_option("--trigger-arg", action="append",
help="Custom trigger arg")
p.add_option("--handler", action="append",
attrs['handlers'] = ":".join(hlist)
# actual table registration
+ tgflags = self.options.trigger_flags
tgargs = self.options.trigger_arg # None by default
+ if tgflags:
+ if not tgargs:
+ tgargs = []
+ tgargs.append('tgflags='+tgflags)
q = "select * from londiste.local_add_table(%s, %s, %s)"
self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs])
--
-- Register table on Londiste node, with customizable trigger args.
--
+-- Parameters:
+-- i_queue_name - queue name
+-- i_table_name - table name
+-- i_trg_args - args to trigger, or tgflags=X for trigger creation flags
+--
+-- Trigger args:
+-- See documentation for pgq triggers.
+--
+-- Trigger creation flags (default: AIUDL):
+-- I - ON INSERT
+-- U - ON UPDATE
+-- D - ON DELETE
+-- Q - use pgq.sqltriga() as trigger function
+-- L - use pgq.logutriga() as trigger function
+-- B - BEFORE
+-- A - AFTER
+--
+-- Example:
+-- > londiste.local_add_table('q', 'tbl', array['tgflags=BI', 'SKIP', 'pkey=col1,col2'])
+--
-- Returns:
-- 200 - Ok
-- 301 - Warning, trigger exists that will fire before londiste one
fq_table_name text;
new_state text;
- logtrg_name text;
trunctrg_name text;
pgversion int;
logtrg_previous text;
- logtrg text;
+ lg_name text;
+ lg_func text;
+ lg_pos text;
+ lg_event text;
+ lg_args text;
tbl record;
i integer;
+ sql text;
+ arg text;
begin
fq_table_name := londiste.make_fqname(i_table_name);
col_types := londiste.find_column_types(fq_table_name);
end if;
-- create Ins/Upd/Del trigger if it does not exists already
- logtrg_name := '_londiste_' || i_queue_name;
+ lg_name := '_londiste_' || i_queue_name;
perform 1 from pg_catalog.pg_trigger
where tgrelid = londiste.find_table_oid(fq_table_name)
- and tgname = logtrg_name;
+ and tgname = lg_name;
if not found then
- logtrg := 'create trigger ' || quote_ident(logtrg_name)
- || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
- || ' for each row execute procedure pgq.sqltriga(' || quote_literal(i_queue_name);
+ -- new trigger
+ lg_func := 'pgq.sqltriga';
+ lg_event := '';
+ lg_args := quote_literal(i_queue_name);
+ lg_pos := 'after';
+
+ -- parse extra args
if i_trg_args is not null then
for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
- logtrg := logtrg || ', ' || quote_literal(i_trg_args[i]);
+ arg := i_trg_args[i];
+ if arg like 'tgflags=%' then
+ -- special flag handling
+ arg := upper(substr(arg, 9));
+ if position('B' in arg) > 0 then
+ lg_pos := 'before';
+ end if;
+ if position('A' in arg) > 0 then
+ lg_pos := 'after';
+ end if;
+ if position('Q' in arg) > 0 then
+ lg_func := 'pgq.sqltriga';
+ end if;
+ if position('L' in arg) > 0 then
+ lg_func := 'pgq.logutriga';
+ end if;
+ if position('I' in arg) > 0 then
+ lg_event := lg_event || ' or insert';
+ end if;
+ if position('U' in arg) > 0 then
+ lg_event := lg_event || ' or update';
+ end if;
+ if position('D' in arg) > 0 then
+ lg_event := lg_event || ' or delete';
+ end if;
+ if position('S' in arg) > 0 then
+ lg_args := lg_args || ', ' || quote_literal('SKIP');
+ end if;
+ else
+ -- ordinary arg
+ lg_args := lg_args || ', ' || quote_literal(arg);
+ end if;
end loop;
end if;
- logtrg := logtrg || ')';
- execute logtrg;
+
+ -- finalize event
+ lg_event := substr(lg_event, 4);
+ if lg_event = '' then
+ lg_event := 'insert or update or delete';
+ end if;
+
+ -- create trigger
+ sql := 'create trigger ' || quote_ident(lg_name)
+ || ' ' || lg_pos || ' ' || lg_event
+ || ' on ' || londiste.quote_fqname(fq_table_name)
+ || ' for each row execute procedure '
+ || lg_func || '(' || lg_args || ')';
+ execute sql;
end if;
-- create tRuncate trigger if it does not exists already
where tgrelid = londiste.find_table_oid(fq_table_name)
and tgname = trunctrg_name;
if not found then
- logtrg := 'create trigger ' || quote_ident(trunctrg_name)
+ sql := 'create trigger ' || quote_ident(trunctrg_name)
|| ' after truncate on ' || londiste.quote_fqname(fq_table_name)
|| ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
|| ')';
- execute logtrg;
+ execute sql;
end if;
end if;
from pg_class r, pg_trigger tg
where r.oid = londiste.find_table_oid(fq_table_name)
and not tg.tgisinternal
- and tg.tgname < logtrg_name::name
+ and tg.tgname < lg_name::name
-- per-row AFTER trigger
and (tg.tgtype & 3) = 1 -- bits: 0:ROW, 1:BEFORE
-- current londiste
from pg_class r, pg_trigger tg
where r.oid = londiste.find_table_oid(fq_table_name)
and not tg.tgisconstraint
- and tg.tgname < logtrg_name::name
+ and tg.tgname < lg_name::name
-- per-row AFTER trigger
and (tg.tgtype & 3) = 1 -- bits: 0:ROW, 1:BEFORE
-- current londiste