londiste.get_table_list: make sure all state combinations work
authorMarko Kreen <markokr@gmail.com>
Sun, 18 Sep 2011 20:24:36 +0000 (23:24 +0300)
committerMarko Kreen <markokr@gmail.com>
Sun, 18 Sep 2011 20:24:36 +0000 (23:24 +0300)
Although we may not support all combinations from UI side,
it does not guarantee that some combination cannot happen,
so try to give reasonable roles in any case.

New logic:

- if table has ddl, it's lead
- in case of no dll and at least one complete copy, let the copy be follower:
  that means it will do copy into table without touching ddl.

sql/londiste/expected/londiste_merge.out
sql/londiste/functions/londiste.get_table_list.sql
sql/londiste/sql/londiste_merge.sql

index 238d343d8719274f1628618a4b37f849c76482a7..27bca55db39d05af20ee93eb52f826cea10d353e 100644 (file)
@@ -55,6 +55,24 @@ select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2m
       200 | Node "p2merge" initialized for queue "part2_set" with type "leaf"
 (1 row)
 
+select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set');
+ ret_code |                             ret_note                              
+----------+-------------------------------------------------------------------
+      200 | Node "p3merge" initialized for queue "part3_set" with type "leaf"
+(1 row)
+
 select * from londiste.local_add_table('combined_set', 'tblmerge');
  ret_code |           ret_note           
 ----------+------------------------------
@@ -139,12 +157,6 @@ select * from londiste.get_table_list('part2_set');
  public.tblmerge | t     | in-copy     |                 |             |             | wait-replay |        1
 (1 row)
 
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
- ret_code |                   ret_note                   
-----------+----------------------------------------------
-      200 | Table public.tblmerge state set to 'in-copy'
-(1 row)
-
 select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
  ret_code |                     ret_note                     
 ----------+--------------------------------------------------
@@ -163,16 +175,16 @@ select * from londiste.get_table_list('part2_set');
  public.tblmerge | t     | catching-up |                 |             |             | wait-replay |        0
 (1 row)
 
-select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
- ret_code |      ret_note       
-----------+---------------------
-      200 | Table struct stored
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
+ ret_code |                     ret_note                     
+----------+--------------------------------------------------
+      200 | Table public.tblmerge state set to 'catching-up'
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     | in-copy     |                 |             |             | lead      |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos 
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
+ public.tblmerge | t     | catching-up |                 |             | create index; |           |        0
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
@@ -181,16 +193,10 @@ select * from londiste.get_table_list('part2_set');
  public.tblmerge | t     | catching-up |                 |             |             | wait-replay |        0
 (1 row)
 
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
- ret_code |                     ret_note                     
-----------+--------------------------------------------------
-      200 | Table public.tblmerge state set to 'catching-up'
-(1 row)
-
-select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
- ret_code |                     ret_note                     
-----------+--------------------------------------------------
-      200 | Table public.tblmerge state set to 'catching-up'
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Table struct stored
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
@@ -205,3 +211,50 @@ select * from londiste.get_table_list('part2_set');
  public.tblmerge | t     | catching-up |                 |             |             |           |        0
 (1 row)
 
+--
+-- Test all combinations on 3-table merge
+--
+select * from londiste.global_add_table('part3_set', 'tblmerge');
+ ret_code |       ret_note        
+----------+-----------------------
+      200 | Table added: tblmerge
+(1 row)
+
+\set ECHO off
+select * from testmatrix();
+     p1s      |     p2s      |     p3s      |     p1r     |     p2r     |     p3r     
+--------------+--------------+--------------+-------------+-------------+-------------
+ !catching-up | catching-up  | catching-up  | NULL        | wait-replay | wait-replay
+ !catching-up | catching-up  | in-copy      | wait-replay | wait-replay | wait-replay
+ !catching-up | in-copy      | catching-up  | wait-replay | wait-replay | wait-replay
+ !catching-up | in-copy      | in-copy      | wait-replay | wait-replay | wait-replay
+ !in-copy     | catching-up  | catching-up  | lead        | wait-replay | wait-replay
+ !in-copy     | catching-up  | in-copy      | lead        | wait-replay | wait-replay
+ !in-copy     | in-copy      | catching-up  | lead        | wait-replay | wait-replay
+ !in-copy     | in-copy      | in-copy      | lead        | wait-replay | wait-replay
+ catching-up  | !catching-up | catching-up  | wait-replay | NULL        | wait-replay
+ catching-up  | !catching-up | in-copy      | wait-replay | wait-replay | wait-replay
+ catching-up  | !in-copy     | catching-up  | wait-replay | lead        | wait-replay
+ catching-up  | !in-copy     | in-copy      | wait-replay | lead        | wait-replay
+ catching-up  | catching-up  | !catching-up | wait-replay | wait-replay | NULL
+ catching-up  | catching-up  | !in-copy     | wait-replay | wait-replay | lead
+ catching-up  | catching-up  | catching-up  | NULL        | NULL        | NULL
+ catching-up  | catching-up  | in-copy      | NULL        | NULL        | wait-replay
+ catching-up  | in-copy      | !catching-up | wait-replay | wait-replay | wait-replay
+ catching-up  | in-copy      | !in-copy     | wait-replay | wait-replay | lead
+ catching-up  | in-copy      | catching-up  | NULL        | wait-replay | NULL
+ catching-up  | in-copy      | in-copy      | NULL        | wait-replay | wait-replay
+ in-copy      | !catching-up | catching-up  | wait-replay | wait-replay | wait-replay
+ in-copy      | !catching-up | in-copy      | wait-replay | wait-replay | wait-replay
+ in-copy      | !in-copy     | catching-up  | wait-replay | lead        | wait-replay
+ in-copy      | !in-copy     | in-copy      | wait-replay | lead        | wait-replay
+ in-copy      | catching-up  | !catching-up | wait-replay | wait-replay | wait-replay
+ in-copy      | catching-up  | !in-copy     | wait-replay | wait-replay | lead
+ in-copy      | catching-up  | catching-up  | wait-replay | NULL        | NULL
+ in-copy      | catching-up  | in-copy      | wait-replay | NULL        | wait-replay
+ in-copy      | in-copy      | !catching-up | wait-replay | wait-replay | wait-replay
+ in-copy      | in-copy      | !in-copy     | wait-replay | wait-replay | lead
+ in-copy      | in-copy      | catching-up  | wait-replay | wait-replay | NULL
+ in-copy      | in-copy      | in-copy      | lead        | wait-copy   | wait-copy
+(32 rows)
+
index b998ed2261a2269087f423acfcadc604ac42ca08..37a02ad858d31687f62b268b2eeef60daa4d7353 100644 (file)
@@ -32,9 +32,8 @@ returns setof record as $$
 --
 -- copy_role = lead:
 --      on copy start, drop indexes and store in dropped_ddl
---      on copy finish wait, until copy_role turns to NULL
---      if dropped_ddl not NULL, restore them
---      tag as catching-up
+--      on copy finish change state to catching-up, then wait until copy_role turns to NULL
+--      catching-up: if dropped_ddl not NULL, restore them
 -- copy_role = wait-copy:
 --      on copy start wait, until role changes (to wait-replay)
 -- copy_role = wait-replay:
@@ -43,14 +42,18 @@ returns setof record as $$
 --
 declare
     q_part1     text;
+    q_part_ddl  text;
     n_parts     int4;
     n_done      int4;
-    var_table_name text;
+    v_table_name text;
     n_combined_queue text;
 begin
-    for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done, n_combined_queue, copy_pos in
+    for v_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl,
+        q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos
+    in
         select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl,
                min(case when t2.local then t2.queue_name else null end) as _queue1,
+               min(case when t2.local and t2.dropped_ddl is not null then t2.queue_name else null end) as _queue1ddl,
                count(case when t2.local then t2.table_name else null end) as _total,
                count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
                min(n.combined_queue) as _combined_queue,
@@ -69,37 +72,52 @@ begin
         -- the copy processes need coordination
         copy_role := null;
 
+        -- be more robust against late joiners
+        q_part1 := coalesce(q_part_ddl, q_part1);
+
         if q_part1 is not null then
             if i_queue_name = q_part1 then
                 -- lead
-                if merge_state in ('in-copy', 'catching-up') then
+                if merge_state = 'in-copy' then
+                    if dropped_ddl is null and n_done > 0 then
+                        -- seems late addition, let it copy with indexes
+                        copy_role := 'wait-replay';
+                    elsif n_done < n_parts then
+                        -- show copy_role only if need to drop ddl or already did drop ddl
+                        copy_role := 'lead';
+                    end if;
+
+                    -- make sure it cannot be made to wait
+                    copy_pos := 0;
+                end if;
+                if merge_state = 'catching-up' and dropped_ddl is not null then
                     -- show copy_role only if need to wait for others
                     if n_done < n_parts then
-                        copy_role := 'lead';
+                        copy_role := 'wait-replay';
                     end if;
                 end if;
             else
                 -- follow
                 if merge_state = 'in-copy' then
-                    -- has lead already dropped ddl?
-                    perform 1 from londiste.table_info t
-                        where t.queue_name = q_part1
-                            and t.table_name = var_table_name
-                            and t.dropped_ddl is not null;
-                    if found then
+                    if q_part_ddl is not null then
+                        -- can copy, wait in replay until lead has applied ddl
+                        copy_role := 'wait-replay';
+                    elsif n_done > 0 then
+                        -- ddl is not dropped, others are active, copy without touching ddl
                         copy_role := 'wait-replay';
                     else
+                        -- wait for lead to drop ddl
                         copy_role := 'wait-copy';
                     end if;
                 elsif merge_state = 'catching-up' then
                     -- show copy_role only if need to wait for lead
-                    if n_done < n_parts then
+                    if q_part_ddl is not null then
                         copy_role := 'wait-replay';
                     end if;
                 end if;
             end if;
         end if;
-        table_name:=var_table_name;
+        table_name := v_table_name;
         return next;
     end loop;
     return;
index d226a585cbb3ba775149c6bfd4e86f92bd7dfb10..dcfe78712590b12d0537608670b55ed15c4ec629 100644 (file)
@@ -21,6 +21,10 @@ select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', fal
 select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false);
 select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set');
 
+select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false);
+select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false);
+select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set');
+
 
 
 select * from londiste.local_add_table('combined_set', 'tblmerge');
@@ -43,26 +47,96 @@ select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'c
 select * from londiste.get_table_list('part1_set');
 select * from londiste.get_table_list('part2_set');
 
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
 select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
 select * from londiste.get_table_list('part1_set');
 select * from londiste.get_table_list('part2_set');
 
-select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
 select * from londiste.get_table_list('part1_set');
 select * from londiste.get_table_list('part2_set');
 
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
-select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
 select * from londiste.get_table_list('part1_set');
 select * from londiste.get_table_list('part2_set');
 
+--
+-- Test all combinations on 3-table merge
+--
 
-
-
-
-
-
+select * from londiste.global_add_table('part3_set', 'tblmerge');
+
+\set ECHO off
+
+create table states ( state text );
+insert into states values ('in-copy');
+insert into states values ('!in-copy');
+insert into states values ('catching-up');
+insert into states values ('!catching-up');
+
+create or replace function testmerge(
+    in p1state text, in p2state text, in p3state text,
+    out p1res text, out p2res text, out p3res text)
+as $$
+declare
+    p1ddl text;
+    p2ddl text;
+    p3ddl text;
+    tbl text = 'public.tblmerge';
+begin
+    if position('!' in p1state) > 0 then
+        p1state := replace(p1state, '!', '');
+        p1ddl = 'x';
+    end if;
+    if position('!' in p2state) > 0 then
+        p2state := replace(p2state, '!', '');
+        p2ddl = 'x';
+    end if;
+    if position('!' in p3state) > 0 then
+        p3state := replace(p3state, '!', '');
+        p3ddl = 'x';
+    end if;
+
+    update londiste.table_info
+       set merge_state = p1state, dropped_ddl = p1ddl, local = true
+       where table_name = tbl and queue_name = 'part1_set';
+    update londiste.table_info
+       set merge_state = p2state, dropped_ddl = p2ddl, local = true
+       where table_name = tbl and queue_name = 'part2_set';
+    update londiste.table_info
+       set merge_state = p3state, dropped_ddl = p3ddl, local = true
+       where table_name = tbl and queue_name = 'part3_set';
+
+    select coalesce(copy_role, 'NULL') from londiste.get_table_list('part1_set')
+        where table_name = tbl into p1res;
+    select coalesce(copy_role, 'NULL') from londiste.get_table_list('part2_set')
+        where table_name = tbl into p2res;
+    select coalesce(copy_role, 'NULL') from londiste.get_table_list('part3_set')
+        where table_name = tbl into p3res;
+    return;
+end;
+$$ language plpgsql;
+
+create function testmatrix(
+    out p1s text, out p2s text, out p3s text,
+    out p1r text, out p2r text, out p3r text)
+returns setof record as $$
+begin
+    for p1s, p2s, p3s in
+        select p1.state, p2.state, p3.state
+        from states p1, states p2, states p3
+        where position('!' in p1.state) + position('!' in p2.state) + position('!' in p3.state) < 2
+        order by 1,2,3
+    loop
+        select * from testmerge(p1s, p2s, p3s) into p1r, p2r, p3r;
+        return next;
+    end loop;
+    return;
+end;
+$$ language plpgsql;
+
+\set ECHO all
+
+select * from testmatrix();