New change-handler command which can be used for changing table_attrs (python handler...
authorPetr Jelinek <git@pjmodos.net>
Sun, 9 Dec 2012 16:17:38 +0000 (17:17 +0100)
committerPetr Jelinek <git@pjmodos.net>
Mon, 17 Dec 2012 17:29:35 +0000 (18:29 +0100)
python/londiste.py
python/londiste/setup.py
sql/londiste/expected/londiste_provider.out
sql/londiste/functions/londiste.create_trigger.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_add_table.sql
sql/londiste/functions/londiste.local_change_handler.sql [new file with mode: 0644]
sql/londiste/sql/londiste_provider.sql
sql/londiste/structure/functions.sql

index 536d83cd95ec25f74f62e564bb394c0f1bc88277..fb230f235de83d68459fd870637a68d9a1ad42d3 100755 (executable)
@@ -24,6 +24,7 @@ Replication Daemon:
 Replication Administration:
   add-table TBL ...     add table to queue
   remove-table TBL ...  remove table from queue
+  change-handler TBL    change handler for the table
   add-seq SEQ ...       add sequence to provider
   remove-seq SEQ ...    remove sequence from provider
   tables                show all tables on provider
@@ -48,7 +49,7 @@ cmd_handlers = (
     (('create-root', 'create-branch', 'create-leaf', 'members', 'tag-dead', 'tag-alive',
       'change-provider', 'rename-node', 'status', 'node-status', 'pause', 'resume', 'node-info',
       'drop-node', 'takeover', 'resurrect'), londiste.LondisteSetup),
-    (('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs',
+    (('add-table', 'remove-table', 'change-handler', 'add-seq', 'remove-seq', 'tables', 'seqs',
       'missing', 'resync', 'wait-sync', 'wait-root', 'wait-provider',
       'check', 'fkeys', 'execute'), londiste.LondisteSetup),
     (('show-handlers',), londiste.LondisteSetup),
index 0ddbb852adf18ca8da83e26b71964157e3870dcb..0df9c9db4755cb307c5cad4d3b16541767701371 100644 (file)
@@ -211,35 +211,18 @@ class LondisteSetup(CascadeAdmin):
                     newname = dest_table
                 s.create(dst_curs, create_flags, log = self.log, new_table_name = newname)
 
-        tgargs = []
-        if self.options.trigger_arg:
-            tgargs = self.options.trigger_arg
-        tgflags = self.options.trigger_flags
-        if tgflags:
-            tgargs.append('tgflags='+tgflags)
-        if self.options.no_triggers:
-            tgargs.append('no_triggers')
-        if self.options.merge_all:
-            tgargs.append('merge_all')
-        if self.options.no_merge:
-            tgargs.append('no_merge')
+        tgargs = self.build_tgargs()
 
         attrs = {}
+
         if self.options.handler:
-            hstr = londiste.handler.create_handler_string(
-                            self.options.handler, self.options.handler_arg)
-            p = londiste.handler.build_handler(tbl, hstr, self.options.dest_table)
-            attrs['handler'] = hstr
-            p.add(tgargs)
+            attrs['handler'] = self.build_handler(tbl, tgargs, self.options.dest_table)
 
         if self.options.find_copy_node:
             attrs['copy_node'] = '?'
         elif self.options.copy_node:
             attrs['copy_node'] = self.options.copy_node
 
-        if self.options.expect_sync:
-            tgargs.append('expect_sync')
-
         if not self.options.expect_sync:
             if self.options.skip_truncate:
                 attrs['skip_truncate'] = 1
@@ -257,6 +240,33 @@ class LondisteSetup(CascadeAdmin):
         self.exec_cmd(dst_curs, q, args)
         dst_db.commit()
 
+    def build_tgargs(self):
+        """Build trigger args"""
+        tgargs = []
+        if self.options.trigger_arg:
+            tgargs = self.options.trigger_arg
+        tgflags = self.options.trigger_flags
+        if tgflags:
+            tgargs.append('tgflags='+tgflags)
+        if self.options.no_triggers:
+            tgargs.append('no_triggers')
+        if self.options.merge_all:
+            tgargs.append('merge_all')
+        if self.options.no_merge:
+            tgargs.append('no_merge')
+        if self.options.expect_sync:
+            tgargs.append('expect_sync')
+
+        return tgargs
+
+    def build_handler(self, tbl, tgargs, dest_table=None):
+        """Build handler and push int into tgargs"""
+        hstr = londiste.handler.create_handler_string(
+                        self.options.handler, self.options.handler_arg)
+        p = londiste.handler.build_handler(tbl, hstr, dest_table)
+        p.add(tgargs)
+        return hstr
+
     def handler_needs_table(self):
         if self.options.handler:
             hstr = londiste.handler.create_handler_string(
@@ -296,6 +306,51 @@ class LondisteSetup(CascadeAdmin):
         q = "select * from londiste.local_remove_table(%s, %s)"
         self.exec_cmd_many(db, q, [self.set_name], args)
 
+    def cmd_change_handler(self, tbl):
+        """Change handler (table_attrs) of the replicated table"""
+
+        self.load_local_info()
+
+        tbl = skytools.fq_name(tbl)
+
+        db = self.get_database('db')
+        curs = db.cursor()
+        q = "select table_attrs, dest_table "\
+            " from londiste.get_table_list(%s) "\
+            " where table_name = %s and local"
+        curs.execute(q, [self.set_name, tbl])
+        if curs.rowcount == 0:
+            self.log.error("Table %s not found on this node" % tbl)
+            sys.exit(1)
+
+        attrs, dest_table = curs.fetchone()
+        attrs = skytools.db_urldecode(attrs or '')
+        old_handler = attrs.get('handler')
+
+        tgargs = self.build_tgargs()
+        if self.options.handler:
+            new_handler = self.build_handler(tbl, tgargs, dest_table)
+        else:
+            new_handler = None
+
+        if old_handler == new_handler:
+            self.log.info("Handler is already set to desired value, nothing done")
+            sys.exit(0)
+
+        if new_handler:
+            attrs['handler'] = new_handler
+        elif 'handler' in attrs:
+            del attrs['handler']
+
+        args = [self.set_name, tbl, tgargs, None]
+        if attrs:
+            args[3] = skytools.db_urlencode(attrs)
+
+        q = "select * from londiste.local_change_handler(%s, %s, %s, %s)"
+        self.exec_cmd(curs, q, args)
+        db.commit()
+
+
     def cmd_add_seq(self, *args):
         """Attach seqs(s) to local node."""
         dst_db = self.get_database('db')
index a07a620876efdf03789adf739a62bf80b57a2bd9..102ae83763eb658825fbf8a59ae9aa373e6672c0 100644 (file)
@@ -149,3 +149,39 @@ select tgname from pg_trigger where tgrelid = 'public.trg_test'::regclass order
  _londiste_aset_truncate
 (2 rows)
 
+-- handler test
+create table hdlr_test (
+    id int4 primary key,
+    txt text
+);
+select * from londiste.local_add_table('aset', 'public.hdlr_test');
+ ret_code |           ret_note            
+----------+-------------------------------
+      200 | Table added: public.hdlr_test
+(1 row)
+
+insert into hdlr_test values (1, 'data');
+select * from londiste.local_change_handler('aset', 'public.hdlr_test', array['ev_extra4=''test='' || txt'], 'handler=foobar');
+ ret_code |                  ret_note                   
+----------+---------------------------------------------
+      200 | Handler changed for table: public.hdlr_test
+(1 row)
+
+insert into hdlr_test values (2, 'data2');
+select * from londiste.local_change_handler('aset', 'public.hdlr_test', array[]::text[], '');
+ ret_code |                  ret_note                   
+----------+---------------------------------------------
+      200 | Handler changed for table: public.hdlr_test
+(1 row)
+
+insert into hdlr_test values (3, 'data3');
+truncate hdlr_test;
+select ev_id, ev_type, ev_data, ev_extra1, ev_extra4 from pgq.event_template where ev_extra1 = 'public.hdlr_test';
+ ev_id | ev_type |    ev_data     |    ev_extra1     | ev_extra4  
+-------+---------+----------------+------------------+------------
+     8 | I:id    | id=1&txt=data  | public.hdlr_test | 
+     9 | I:id    | id=2&txt=data2 | public.hdlr_test | test=data2
+    10 | I:id    | id=3&txt=data3 | public.hdlr_test | 
+    11 | R       |                | public.hdlr_test | 
+(4 rows)
+
diff --git a/sql/londiste/functions/londiste.create_trigger.sql b/sql/londiste/functions/londiste.create_trigger.sql
new file mode 100644 (file)
index 0000000..c888ced
--- /dev/null
@@ -0,0 +1,245 @@
+create or replace function londiste.create_trigger(
+    in i_queue_name     text,
+    in i_table_name     text,
+    in i_trg_args       text[],
+    in i_dest_table     text,
+    in i_node_type      text,
+    out ret_code        int4,
+    out ret_note        text,
+    out trigger_name    text)
+as $$
+------------------------------------------------------------------------
+-- Function: londiste.create_trigger(5)
+-- 
+--     Create or replace londiste trigger(s)
+-- 
+-- Parameters:
+--      i_queue_name - queue name
+--      i_table_name - table name
+--      i_trg_args   - args to trigger
+--      i_dest_table - actual name of destination table (NULL if same as src)
+--      i_node_type  - l3 node type
+-- 
+-- Trigger args:
+--      See documentation for pgq triggers.
+-- 
+-- Trigger creation flags (default: AIUDL):
+--      I - ON INSERT
+--      U - ON UPDATE
+--      D - ON DELETE
+--      Q - use pgq.sqltriga() as trigger function
+--      L - use pgq.logutriga() as trigger function
+--      B - BEFORE
+--      A - AFTER
+--      S - SKIP
+-- 
+-- Returns:
+--      200 - Ok
+--      201 - Trigger not created
+--      405 - Multiple SKIP triggers
+-- 
+------------------------------------------------------------------------
+declare
+    trigger_name text;
+    lg_func text;
+    lg_pos text;
+    lg_event text;
+    lg_args text[];
+    _old_tgargs bytea;
+    _new_tgargs bytea;
+    trunctrg_name text;
+    pgversion int;
+    sql text;
+    arg text;
+    i integer;
+    _extra_args text[] := '{}';
+    _skip_prefix text := 'zzz_';
+    _skip_trg_count integer;
+    _skip_trg_name text;
+    -- given tgflags array
+    _tgflags char[];
+    -- ordinary argument array
+    _args text[];
+    -- array with all tgflags values
+    _check_flags char[] := array['B','A','Q','L','I','U','D','S'];
+    -- argument flags
+    _skip boolean := false;
+    _no_triggers boolean := false;
+    _got_extra1 boolean := false;
+begin
+    if array_lower(i_trg_args, 1) is not null then
+        for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
+            arg := i_trg_args[i];
+            if arg like 'tgflags=%' then
+                -- special flag handling
+                arg := upper(substr(arg, 9));
+                for j in array_lower(_check_flags, 1) .. array_upper(_check_flags, 1) loop
+                    if position(_check_flags[j] in arg) > 0 then
+                        _tgflags := array_append(_tgflags, _check_flags[j]);
+                    end if;
+                end loop;
+            elsif arg = 'no_triggers' then
+                _no_triggers := true;
+            elsif lower(arg) = 'skip' then
+                _skip := true;
+            elsif arg = 'virtual_table' then
+                _no_triggers := true;   -- do not create triggers
+            else
+                if arg like 'ev_extra1=%' then
+                    _got_extra1 := true;
+                end if;
+                -- ordinary arg
+                _args = array_append(_args, quote_literal(arg));
+            end if;
+        end loop;
+    end if;
+
+    if i_dest_table <> i_table_name and not _got_extra1 then
+        -- if renamed table, enforce trigger to put
+        -- global table name into extra1
+        arg := 'ev_extra1=' || quote_literal(i_table_name);
+        _args := array_append(_args, quote_literal(arg));
+    end if;
+    
+    trigger_name := '_londiste_' || i_queue_name;
+    lg_func := 'pgq.logutriga';
+    lg_event := '';
+    lg_args := array[i_queue_name];
+    lg_pos := 'after';
+
+    if array_lower(_args, 1) is not null then
+        lg_args := lg_args || _args;
+    end if;
+
+    if 'B' = any(_tgflags) then
+        lg_pos := 'before';
+    end if;
+    if 'A' = any(_tgflags)  then
+        lg_pos := 'after';
+    end if;
+    if 'Q' = any(_tgflags) then
+        lg_func := 'pgq.sqltriga';
+    end if;
+    if 'L' = any(_tgflags) then
+        lg_func := 'pgq.logutriga';
+    end if;
+    if 'I' = any(_tgflags) then
+        lg_event := lg_event || ' or insert';
+    end if;
+    if 'U' = any(_tgflags) then
+        lg_event := lg_event || ' or update';
+    end if;
+    if 'D' = any(_tgflags) then
+        lg_event := lg_event || ' or delete';
+    end if;
+    if 'S' = any(_tgflags) then
+        _skip := true;
+    end if;
+
+    if i_node_type = 'leaf' then
+        -- on weird leafs the trigger funcs may not exist
+        perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
+            where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
+        if not found then
+            select 201, 'Trigger not created' into ret_code, ret_note;
+            return;
+        end if;
+        -- on regular leaf, install deny trigger
+        _extra_args := array_append(_extra_args, quote_literal('deny'));
+    end if;
+
+    -- if skip param given, rename previous skip triggers and prefix current
+    if _skip then
+        -- get count and name of existing skip triggers
+        select count(*), min(t.tgname)
+        into _skip_trg_count, _skip_trg_name
+        from pg_catalog.pg_trigger t
+        where t.tgrelid = londiste.find_table_oid(i_dest_table)
+            and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
+        -- if no previous skip triggers, prefix name and add SKIP to args
+        if _skip_trg_count = 0 then
+            trigger_name := _skip_prefix || trigger_name;
+            lg_args := array_append(lg_args, quote_literal('SKIP'));
+        -- if one previous skip trigger, check it's prefix and
+        -- do not use SKIP on current trigger
+        elsif _skip_trg_count = 1 then
+            -- if not prefixed then rename
+            if position(_skip_prefix in _skip_trg_name) != 1 then
+                sql := 'alter trigger ' || _skip_trg_name
+                    || ' on ' || londiste.quote_fqname(i_dest_table)
+                    || ' rename to ' || _skip_prefix || _skip_trg_name;
+                execute sql;
+            end if;
+        else
+            select 405, 'Multiple SKIP triggers'
+            into ret_code, ret_note;
+            return;
+        end if;
+    end if;
+
+    -- create Ins/Upd/Del trigger if it does not exists already
+    select t.tgargs 
+        from pg_catalog.pg_trigger t
+        where t.tgrelid = londiste.find_table_oid(i_dest_table)
+            and t.tgname = trigger_name
+        into _old_tgargs;
+
+    if found then
+        _new_tgargs := lg_args[1];
+        for i in 2 .. array_upper(lg_args, 1) loop
+            _new_tgargs := _new_tgargs || E'\\000'::bytea || decode(lg_args[i], 'escape');
+        end loop;
+
+        if _old_tgargs IS DISTINCT FROM _new_tgargs then
+            sql := 'drop trigger if exists ' || quote_ident(trigger_name)
+                || ' on ' || londiste.quote_fqname(i_dest_table);
+            execute sql;
+        end if;
+    end if;
+
+    if not found or _old_tgargs IS DISTINCT FROM _new_tgargs then
+        if _no_triggers then
+            select 201, 'Trigger not created'
+            into ret_code, ret_note;
+            return;
+        end if;
+
+        -- finalize event
+        lg_event := substr(lg_event, 4);
+        if lg_event = '' then
+            lg_event := 'insert or update or delete';
+        end if;
+
+        -- create trigger
+        lg_args := lg_args || _extra_args;
+        sql := 'create trigger ' || quote_ident(trigger_name)
+            || ' ' || lg_pos || ' ' || lg_event
+            || ' on ' || londiste.quote_fqname(i_dest_table)
+            || ' for each row execute procedure '
+            || lg_func || '(' || array_to_string(lg_args, ', ') || ')';
+        execute sql;
+    end if;
+
+    -- create truncate trigger if it does not exists already
+    show server_version_num into pgversion;
+    if pgversion >= 80400 then
+        trunctrg_name  := '_londiste_' || i_queue_name || '_truncate';
+        perform 1 from pg_catalog.pg_trigger
+          where tgrelid = londiste.find_table_oid(i_dest_table)
+            and tgname = trunctrg_name;
+        if not found then
+            _extra_args := i_queue_name || _extra_args;
+            sql := 'create trigger ' || quote_ident(trunctrg_name)
+                || ' after truncate on ' || londiste.quote_fqname(i_dest_table)
+                || ' for each statement execute procedure pgq.sqltriga('
+                || array_to_string(_extra_args, ', ') || ')';
+            execute sql;
+        end if;
+    end if;
+
+    select 200, 'OK'
+    into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql;
+
index 78d72422b44c49b45c95cb478ba845e721f2b7b2..ea8f1edffc5eeebbf3e296822bf670485720847a 100644 (file)
@@ -56,19 +56,12 @@ declare
     col_types text;
     fq_table_name text;
     new_state text;
-    trunctrg_name text;
     pgversion int;
     logtrg_previous text;
-    lg_name text;
-    lg_func text;
-    lg_pos text;
-    lg_event text;
-    lg_args text;
-    _extra_args text;
+    trigger_name text;
     tbl record;
     i integer;
     j integer;
-    sql text;
     arg text;
     _node record;
     _tbloid oid;
@@ -76,66 +69,35 @@ declare
     _combined_table text;
     _table_attrs text := i_table_attrs;
     -- skip trigger
-    _skip_prefix text := 'zzz_';
-    _skip_trg_count integer;
-    _skip_trg_name text;
     -- check local tables from all sources
     _queue_name text;
     _local boolean;
-    -- array with all tgflags values
-    _check_flags char[] := array['B','A','Q','L','I','U','D','S'];
-    -- given tgflags array
-    _tgflags char[];
-    -- ordinary argument array
-    _args text[];
     -- argument flags
     _expect_sync boolean := false;
     _merge_all boolean := false;
     _no_merge boolean := false;
-    _no_triggers boolean := false;
-    _skip boolean := false;
     _virtual_table boolean := false;
     _dest_table text;
-    _got_extra1 boolean := false;
     _table_name2 text;
     _desc text;
 begin
 
-    -------- i_trg_args ARGUMENTS PARSING
+    -------- i_trg_args ARGUMENTS PARSING (TODO: use different input param for passing extra options that have nothing to do with trigger)
 
     if array_lower(i_trg_args, 1) is not null then
         for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
             arg := i_trg_args[i];
-            if arg like 'tgflags=%' then
-                -- special flag handling
-                arg := upper(substr(arg, 9));
-                for j in array_lower(_check_flags, 1) .. array_upper(_check_flags, 1) loop
-                    if position(_check_flags[j] in arg) > 0 then
-                        _tgflags := array_append(_tgflags, _check_flags[j]);
-                    end if;
-                end loop;
-            elsif arg = 'expect_sync' then
+            if arg = 'expect_sync' then
                 _expect_sync := true;
             elsif arg = 'skip_truncate' then
                 _table_attrs := coalesce(_table_attrs || '&skip_truncate=1', 'skip_truncate=1');
-            elsif arg = 'no_triggers' then
-                _no_triggers := true;
             elsif arg = 'merge_all' then
                 _merge_all = true;
             elsif arg = 'no_merge' then
                 _no_merge = true;
-            elsif lower(arg) = 'skip' then
-                _skip := true;
             elsif arg = 'virtual_table' then
                 _virtual_table := true;
                 _expect_sync := true;   -- do not copy
-                _no_triggers := true;   -- do not create triggers
-            else
-                if arg like 'ev_extra1=%' then
-                    _got_extra1 := true;
-                end if;
-                -- ordinary arg
-                _args = array_append(_args, quote_literal(arg));
             end if;
         end loop;
     end if;
@@ -149,13 +111,6 @@ begin
     fq_table_name := londiste.make_fqname(i_table_name);
     _dest_table := londiste.make_fqname(coalesce(i_dest_table, i_table_name));
 
-    if _dest_table <> fq_table_name and not _got_extra1 then
-        -- if renamed table, enforce trigger to put
-        -- global table name into extra1
-        arg := 'ev_extra1=' || quote_literal(fq_table_name);
-        _args := array_append(_args, quote_literal(arg));
-    end if;
-
     if _dest_table = fq_table_name then
         _desc := fq_table_name;
     else
@@ -316,128 +271,20 @@ begin
         end if;
     end if;
 
-    -------- TRIGGER LOGIC
-
-    -- new trigger
-    _extra_args := '';
-    lg_name := '_londiste_' || i_queue_name;
-    lg_func := 'pgq.logutriga';
-    lg_event := '';
-    lg_args := quote_literal(i_queue_name);
-    lg_pos := 'after';
-
-    if array_lower(_args, 1) is not null then
-        lg_args := lg_args || ', ' || array_to_string(_args, ', ');
-    end if;
-
-    if 'B' = any(_tgflags) then
-        lg_pos := 'before';
-    end if;
-    if 'A' = any(_tgflags)  then
-        lg_pos := 'after';
-    end if;
-    if 'Q' = any(_tgflags) then
-        lg_func := 'pgq.sqltriga';
-    end if;
-    if 'L' = any(_tgflags) then
-        lg_func := 'pgq.logutriga';
-    end if;
-    if 'I' = any(_tgflags) then
-        lg_event := lg_event || ' or insert';
-    end if;
-    if 'U' = any(_tgflags) then
-        lg_event := lg_event || ' or update';
-    end if;
-    if 'D' = any(_tgflags) then
-        lg_event := lg_event || ' or delete';
-    end if;
-    if 'S' = any(_tgflags) then
-        _skip := true;
-    end if;
-
-    if _node.node_type = 'leaf' then
-        -- on weird leafs the trigger funcs may not exist
-        perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
-            where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
-        if not found then
-            select 200, 'Table added with no triggers: ' || _desc into ret_code, ret_note;
-            return;
-        end if;
-        -- on regular leaf, install deny trigger
-        _extra_args := ', ' || quote_literal('deny');
-    end if;
-
-    -- if skip param given, rename previous skip triggers and prefix current
-    if _skip then
-        -- get count and name of existing skip triggers
-        select count(*), min(t.tgname)
-        into _skip_trg_count, _skip_trg_name
-        from pg_catalog.pg_trigger t
-        where t.tgrelid = londiste.find_table_oid(_dest_table)
-            and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
-        -- if no previous skip triggers, prefix name and add SKIP to args
-        if _skip_trg_count = 0 then
-            lg_name := _skip_prefix || lg_name;
-            lg_args := lg_args || ', ' || quote_literal('SKIP');
-        -- if one previous skip trigger, check it's prefix and
-        -- do not use SKIP on current trigger
-        elsif _skip_trg_count = 1 then
-            -- if not prefixed then rename
-            if position(_skip_prefix in _skip_trg_name) != 1 then
-                sql := 'alter trigger ' || _skip_trg_name
-                    || ' on ' || londiste.quote_fqname(_dest_table)
-                    || ' rename to ' || _skip_prefix || _skip_trg_name;
-                execute sql;
-            end if;
-        else
-            select 405, 'Multiple SKIP triggers in table: ' || _desc
-            into ret_code, ret_note;
-            return;
-        end if;
-    end if;
-
-    -- create Ins/Upd/Del trigger if it does not exists already
-    perform 1 from pg_catalog.pg_trigger
-        where tgrelid = londiste.find_table_oid(_dest_table)
-            and tgname = lg_name;
-    if not found then
+    -- create trigger
+    select f.ret_code, f.ret_note, f.trigger_name
+        from londiste.create_trigger(i_queue_name, fq_table_name, i_trg_args, _dest_table, _node.node_type) f
+        into ret_code, ret_note, trigger_name;
 
-        if _no_triggers then
-            select 200, 'Table added with no triggers: ' || _desc
+    if ret_code > 299 then
+        ret_note := 'Trigger creation failed for table ' || _desc || ': ' || ret_note;
+        return;
+    elsif ret_code = 201 then
+        select 200, 'Table added with no triggers: ' || _desc
             into ret_code, ret_note;
-            return;
-        end if;
-
-        -- finalize event
-        lg_event := substr(lg_event, 4);
-        if lg_event = '' then
-            lg_event := 'insert or update or delete';
-        end if;
-
-        -- create trigger
-        sql := 'create trigger ' || quote_ident(lg_name)
-            || ' ' || lg_pos || ' ' || lg_event
-            || ' on ' || londiste.quote_fqname(_dest_table)
-            || ' for each row execute procedure '
-            || lg_func || '(' || lg_args || _extra_args || ')';
-        execute sql;
+        return;
     end if;
 
-    -- create truncate trigger if it does not exists already
-    show server_version_num into pgversion;
-    if pgversion >= 80400 then
-        trunctrg_name  := '_londiste_' || i_queue_name || '_truncate';
-        perform 1 from pg_catalog.pg_trigger
-          where tgrelid = londiste.find_table_oid(_dest_table)
-            and tgname = trunctrg_name;
-        if not found then
-            sql := 'create trigger ' || quote_ident(trunctrg_name)
-                || ' after truncate on ' || londiste.quote_fqname(_dest_table)
-                || ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
-                || _extra_args || ')';
-            execute sql;
-        end if;
-    end if;
 
     -- Check that no trigger exists on the target table that will get fired
     -- before londiste one (this could have londiste replicate data
@@ -446,12 +293,13 @@ begin
     -- Don't report all the trigger names, 8.3 does not have array_accum
     -- available
 
+    show server_version_num into pgversion;
     if pgversion >= 90000 then
         select tg.tgname into logtrg_previous
         from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
         where r.oid = londiste.find_table_oid(_dest_table)
           and not tg.tgisinternal
-          and tg.tgname < lg_name::name
+          and tg.tgname < trigger_name::name
           -- per-row AFTER trigger
           and (tg.tgtype & 3) = 1   -- bits: 0:ROW, 1:BEFORE
           -- current londiste
@@ -465,7 +313,7 @@ begin
         from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
         where r.oid = londiste.find_table_oid(_dest_table)
           and not tg.tgisconstraint
-          and tg.tgname < lg_name::name
+          and tg.tgname < trigger_name::name
           -- per-row AFTER trigger
           and (tg.tgtype & 3) = 1   -- bits: 0:ROW, 1:BEFORE
           -- current londiste
diff --git a/sql/londiste/functions/londiste.local_change_handler.sql b/sql/londiste/functions/londiste.local_change_handler.sql
new file mode 100644 (file)
index 0000000..bdffd2d
--- /dev/null
@@ -0,0 +1,86 @@
+create or replace function londiste.local_change_handler(
+    in i_queue_name     text,
+    in i_table_name     text,
+    in i_trg_args       text[],
+    in i_table_attrs    text,    
+    out ret_code        int4,
+    out ret_note        text)
+as $$
+----------------------------------------------------------------------------------------------------
+-- Function: londiste.local_change_handler(4)
+-- 
+--     Change handler and rebuild trigger if needed
+--
+-- Parameters:
+--      i_queue_name  - set name
+--      i_table_name  - table name
+--      i_trg_args    - args to trigger
+--      i_table_attrs - args to python handler
+-- 
+-- Returns:
+--      200 - OK
+--      400 - No such set
+--      404 - Table not found
+-- 
+----------------------------------------------------------------------------------------------------
+declare
+    _dest_table text;
+    _desc text;
+    _node record;
+begin
+    -- get node info
+    select * from pgq_node.get_node_info(i_queue_name) into _node;
+    if not found or _node.ret_code >= 400 then
+        select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
+        return;
+    end if;
+    
+    -- get destination table name for use in trigger creation
+    select coalesce(ti.dest_table, i_table_name)
+        from londiste.table_info ti
+        where queue_name = i_queue_name
+        and table_name = i_table_name
+        and local
+        into _dest_table;
+
+    if not found then
+        select 404, 'no such local table: ' || i_table_name
+            into ret_code, ret_note;
+    end if;        
+
+    -- update table_attrs with new handler info
+    select f.ret_code, f.ret_note
+        from londiste.local_set_table_attrs(i_queue_name, i_table_name, i_table_attrs) f
+        into ret_code, ret_note;
+
+    if ret_code <> 200 then
+        return;
+    end if;
+
+    -- replace the trigger if needed
+    select f.ret_code, f.ret_note
+        from londiste.create_trigger(i_queue_name, i_table_name, i_trg_args, _dest_table, _node.node_type) f
+        into ret_code, ret_note;
+
+    if _dest_table = i_table_name then
+        _desc := i_table_name;
+    else
+        _desc := i_table_name || '(' || _dest_table || ')';
+    end if;
+        
+    if ret_code > 299 then
+        ret_note := 'Trigger creation failed for table ' || _desc || ': ' || ret_note;
+        return;
+    elsif ret_code = 201 then
+        select 200, 'Table handler updated with no triggers: ' || _desc
+            into ret_code, ret_note;
+        return;
+    end if;
+
+    select 200, 'Handler changed for table: ' || _desc
+        into ret_code, ret_note;
+
+    return;
+end;
+$$ language plpgsql;
+
index 12149e331e91bc89efa9e8e81ec3f34999c6b51b..cac89c1ac8f4ea99e8e9c154b4978ab8da7467df 100644 (file)
@@ -51,3 +51,21 @@ select tgname from pg_trigger where tgrelid = 'public.trg_test'::regclass order
 delete from londiste.table_info where table_name = 'public.trg_test';
 select tgname from pg_trigger where tgrelid = 'public.trg_test'::regclass order by 1;
 
+-- handler test
+create table hdlr_test (
+    id int4 primary key,
+    txt text
+);
+
+select * from londiste.local_add_table('aset', 'public.hdlr_test');
+insert into hdlr_test values (1, 'data');
+
+select * from londiste.local_change_handler('aset', 'public.hdlr_test', array['ev_extra4=''test='' || txt'], 'handler=foobar');
+insert into hdlr_test values (2, 'data2');
+
+select * from londiste.local_change_handler('aset', 'public.hdlr_test', array[]::text[], '');
+insert into hdlr_test values (3, 'data3');
+truncate hdlr_test;
+
+select ev_id, ev_type, ev_data, ev_extra1, ev_extra4 from pgq.event_template where ev_extra1 = 'public.hdlr_test';
+
index 4c8322a5dce25da4d07e74bdc19a45e719d6b432..560185ae136ba1bbb3af4d6835ba505e4ba9ddb1 100644 (file)
@@ -11,7 +11,9 @@ select londiste.upgrade_schema();
 
 -- Group: Local object registration (setup tool)
 \i functions/londiste.local_add_seq.sql
+\i functions/londiste.create_trigger.sql
 \i functions/londiste.local_add_table.sql
+\i functions/londiste.local_change_handler.sql
 \i functions/londiste.local_remove_seq.sql
 \i functions/londiste.local_remove_table.sql