OCM-1524: merge_all switch for non-queue merge leaf
authorEgon Valdmees <egon.valdmees@skype.net>
Tue, 12 Apr 2011 11:00:14 +0000 (14:00 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 15 Apr 2011 10:03:17 +0000 (13:03 +0300)
python/londiste.py
python/londiste/setup.py
sql/londiste/functions/londiste.local_add_table.sql
tests/merge/regen.sh
tests/noqueue_merge/regen.sh

index b260dc009b7fbb5d55cb6252a07e3fed7d6d4627..5ec6532346cbe014694a4eafa94a2585b1e32947 100755 (executable)
@@ -125,9 +125,12 @@ class Londiste(skytools.DBScript):
         g.add_option("--handler", action="store",
                 help="add: Custom handler for table")
         g.add_option("--handler-arg", action="append",
-                    help="add: Argument to custom handler")
+                help="add: Argument to custom handler")
         g.add_option("--copy-condition", dest="copy_condition",
                 help = "add: set WHERE expression for copy")
+        g.add_option("--merge-all", action="store_true",
+                help="merge tables from all source queues", default=False)
+
         p.add_option_group(g)
 
         return p
index 053debf21e3d790eaeb74714976eb90a3b3405c7..4f2e3d9dc46991559e5f61a54407f7a7a748c772 100644 (file)
@@ -65,6 +65,8 @@ class LondisteSetup(CascadeAdmin):
                 help="add: Custom handler for table")
         p.add_option("--handler-arg", action="append",
                     help="add: Argument to custom handler")
+        p.add_option("--merge-all", action="store_true",
+                    help="merge tables from all source queues", default=False)
         return p
 
     def extra_init(self, node_type, node_db, provider_db):
@@ -165,6 +167,8 @@ class LondisteSetup(CascadeAdmin):
             tgargs.append('tgflags='+tgflags)
         if self.options.no_triggers:
             tgargs.append('no_triggers')
+        if self.options.merge_all:
+            tgargs.append('merge_all')
 
         attrs = {}
         if self.options.handler:
index 5fa508d0765348b9b24e8355058c04f240c3f768..d6e4f3faaec282a0e352b7209cfaacf936758c42 100644 (file)
@@ -61,9 +61,14 @@ declare
     _node record;
     _tbloid oid;
     _extra_args text;
-    _skip_prefix text := 'zzzkip';
+    _skip_prefix text := 'zzz_';
     _skip_trg_count integer;
     _skip_trg_name text;
+    _merge_all boolean := false;
+    _skip_truncate boolean := false;
+    _no_triggers boolean := false;
+    _skip boolean := false;
+    _queue_name text;
 begin
     _extra_args := '';
     fq_table_name := londiste.make_fqname(i_table_name);
@@ -144,8 +149,91 @@ begin
         raise exception 'lost table: %', fq_table_name;
     end if;
 
-    -- skip triggers on leaf node
+    -- new trigger
+    lg_name := '_londiste_' || i_queue_name;
+    lg_func := 'pgq.logutriga';
+    lg_event := '';
+    lg_args := quote_literal(i_queue_name);
+    lg_pos := 'after';
+
+    -- parse extra args
+    if array_lower(i_trg_args, 1) is not null then
+        for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
+            arg := i_trg_args[i];
+            if arg like 'tgflags=%' then
+                -- special flag handling
+                arg := upper(substr(arg, 9));
+                if position('B' in arg) > 0 then
+                    lg_pos := 'before';
+                end if;
+                if position('A' in arg) > 0 then
+                    lg_pos := 'after';
+                end if;
+                if position('Q' in arg) > 0 then
+                    lg_func := 'pgq.sqltriga';
+                end if;
+                if position('L' in arg) > 0 then
+                    lg_func := 'pgq.logutriga';
+                end if;
+                if position('I' in arg) > 0 then
+                    lg_event := lg_event || ' or insert';
+                end if;
+                if position('U' in arg) > 0 then
+                    lg_event := lg_event || ' or update';
+                end if;
+                if position('D' in arg) > 0 then
+                    lg_event := lg_event || ' or delete';
+                end if;
+                if position('S' in arg) > 0 then
+                    _skip := true;
+                end if;
+            elsif arg = 'expect_sync' then
+                -- already handled
+            elsif arg = 'skip_truncate' then
+                _skip_truncate := true;
+            elsif arg = 'no_triggers' then
+                _no_triggers := true;
+            elsif arg = 'merge_all' then
+                _merge_all = true;
+            elsif lower(arg) = 'skip' then
+                _skip := true;
+            else
+                -- ordinary arg
+                lg_args := lg_args || ', ' || quote_literal(arg);
+            end if;
+        end loop;
+    end if;
+
+    -- merge all table sources on leaf
     if _node.node_type = 'leaf' then
+        for _queue_name in
+            select t2.queue_name
+            from londiste.table_info t
+            join pgq_node.node_info n on (n.queue_name = t.queue_name)
+            left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
+                    (n2.combined_queue is null and n.combined_queue is null))
+            left join londiste.table_info t2 on (t2.table_name = t.table_name and t2.queue_name = n2.queue_name)
+            where t.queue_name = i_queue_name
+              and t.table_name = fq_table_name
+              and t2.local = false
+              and t2.queue_name != i_queue_name -- skip self
+        loop
+           if not _merge_all then
+               select 405, 'multiple source tables '|| fq_table_name
+                        || ' found, use merge_all'
+               into ret_code, ret_note;
+               return;
+           end if;
+
+           update londiste.table_info
+                set local = true,
+                    merge_state = new_state
+                where queue_name = _queue_name and table_name = fq_table_name;
+            if not found then
+                raise exception 'lost table: %', fq_table_name;
+            end if;
+        end loop;
+
         -- on weird leafs the trigger funcs may not exist
         perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
             where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
@@ -157,85 +245,51 @@ begin
         _extra_args := ', ' || quote_literal('deny');
     end if;
 
+    -- if skip param given
+    if _skip then
+        -- get count and name of existing skip triggers
+        select count(*), min(t.tgname)
+        into _skip_trg_count, _skip_trg_name
+        from pg_catalog.pg_trigger t
+        where t.tgrelid = londiste.find_table_oid(fq_table_name)
+            and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
+        -- if no previous skip triggers, prefix name and add SKIP to args
+        if _skip_trg_count = 0 then
+            lg_name := _skip_prefix || lg_name;
+            lg_args := lg_args || ', ' || quote_literal('SKIP');
+        -- if one previous skip trigger, check it's prefix and
+        -- do not use SKIP on current trigger
+        elsif _skip_trg_count = 1 then
+            -- if not prefixed then rename
+            if position(_skip_prefix in _skip_trg_name) != 1 then
+                sql := 'alter trigger ' || _skip_trg_name
+                    || ' on ' || londiste.quote_fqname(fq_table_name)
+                    || ' rename to ' || _skip_prefix || _skip_trg_name;
+                execute sql;
+            end if;
+        else
+            select 405, 'Multiple SKIP triggers in table: ' || fq_table_name
+            into ret_code, ret_note;
+            return;
+        end if;
+    end if;
+
     -- create Ins/Upd/Del trigger if it does not exists already
-    lg_name := '_londiste_' || i_queue_name;
     perform 1 from pg_catalog.pg_trigger
         where tgrelid = londiste.find_table_oid(fq_table_name)
             and tgname = lg_name;
     if not found then
-        -- new trigger
-        lg_func := 'pgq.logutriga';
-        lg_event := '';
-        lg_args := quote_literal(i_queue_name);
-        lg_pos := 'after';
+        if _no_triggers then
+            select 200, 'Table added with no triggers: ' || fq_table_name
+            into ret_code, ret_note;
+            return;
+        end if;
 
-        -- parse extra args
-        if array_lower(i_trg_args, 1) is not null then
-            for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
-                arg := i_trg_args[i];
-                if arg like 'tgflags=%' then
-                    -- special flag handling
-                    arg := upper(substr(arg, 9));
-                    if position('B' in arg) > 0 then
-                        lg_pos := 'before';
-                    end if;
-                    if position('A' in arg) > 0 then
-                        lg_pos := 'after';
-                    end if;
-                    if position('Q' in arg) > 0 then
-                        lg_func := 'pgq.sqltriga';
-                    end if;
-                    if position('L' in arg) > 0 then
-                        lg_func := 'pgq.logutriga';
-                    end if;
-                    if position('I' in arg) > 0 then
-                        lg_event := lg_event || ' or insert';
-                    end if;
-                    if position('U' in arg) > 0 then
-                        lg_event := lg_event || ' or update';
-                    end if;
-                    if position('D' in arg) > 0 then
-                        lg_event := lg_event || ' or delete';
-                    end if;
-                    if position('S' in arg) > 0 then
-                        -- get count and name of existing skip triggers
-                        select count(*), min(t.tgname)
-                        into _skip_trg_count, _skip_trg_name
-                        from pg_catalog.pg_trigger t
-                        where t.tgrelid = londiste.find_table_oid(fq_table_name)
-                            and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
-                        -- if no previous skip triggers, prefix name and add SKIP to args
-                        if _skip_trg_count = 0 then
-                            lg_name := _skip_prefix || lg_name;
-                            lg_args := lg_args || ', ' || quote_literal('SKIP');
-                        -- if one previous skip trigger, check it's prefix and
-                        -- do not use SKIP on current trigger
-                        elsif _skip_trg_count = 1 then
-                            -- if not prefixed then rename
-                            if position(_skip_prefix in _skip_trg_name) != 1 then
-                                sql := 'alter trigger ' || _skip_trg_name
-                                    || ' on ' || londiste.quote_fqname(fq_table_name)
-                                    || ' rename to ' || _skip_prefix || _skip_trg_name;
-                                execute sql;
-                            end if;
-                        else
-                            select 405, 'Multiple SKIP triggers in table: ' || fq_table_name
-                            into ret_code, ret_note;
-                            return;
-                        end if;
-                    end if;
-                elsif arg = 'expect_sync' then
-                    -- already handled
-                elsif arg = 'skip_truncate' then
-                    perform 1 from londiste.local_set_table_attrs(i_queue_name, fq_table_name, 'skip_truncate=1');
-                elsif arg = 'no_triggers' then
-                    select 200, 'Table added with no triggers: ' || fq_table_name into ret_code, ret_note;
-                    return;
-                else
-                    -- ordinary arg
-                    lg_args := lg_args || ', ' || quote_literal(arg);
-                end if;
-            end loop;
+        if _skip_truncate then
+            perform 1
+            from londiste.local_set_table_attrs(i_queue_name,
+                                                fq_table_name,
+                                                'skip_truncate=1');
         end if;
 
         -- finalize event
index 0ec1d7acc6884e1113609c995104defa034fa3de..822282e5c5069f943be5d4d9aca2e8452cf5fc3c 100755 (executable)
@@ -139,11 +139,12 @@ done
 msg "Create table and register it in merge nodes"
 run_sql full1 "create table mydata (id int4 primary key, data text)"
 run londiste3 $v conf/londiste_full1.ini add-table mydata
-for db in full1; do
-  for src in $part_list; do
-    run londiste3 $v conf/londiste_${src}_${db}.ini add-table mydata
-  done
-done
+run londiste3 $v conf/londiste_part1_full1.ini add-table mydata --merge-all
+#for db in full1; do
+#  for src in $part_list; do
+#    run londiste3 $v conf/londiste_${src}_${db}.ini add-table mydata
+#  done
+#done
 
 msg "Wait until table is in sync on combined-root"
 cnt=0
index acbd4df038fc2322f413a0892bcf7b3ab3aa4169..d67bc43c030af62ec07f09aa84cc372292d21590 100755 (executable)
@@ -2,7 +2,7 @@
 
 . ../testlib.sh
 
-title "NoMerge"
+title "NoQueue Merge"
 
 part_list="part1 part2 part3 part4"
 full_list="full1 full2"