Rewrite sql/londiste for cascading.
authorMarko Kreen <markokr@gmail.com>
Fri, 13 Feb 2009 10:49:38 +0000 (12:49 +0200)
committerMarko Kreen <markokr@gmail.com>
Fri, 13 Feb 2009 13:20:25 +0000 (15:20 +0200)
Noteworthy changes:
- EXECUTE
- Parallel copy from partitions into single table.
- Sequences are now pushed, instead of pulled.
  Root node will periodically send values with reasonable
  safety-room into queue.

52 files changed:
sql/londiste/Makefile
sql/londiste/README.londiste [deleted file]
sql/londiste/expected/londiste_denytrigger.out [deleted file]
sql/londiste/expected/londiste_execute.out [new file with mode: 0644]
sql/londiste/expected/londiste_fkeys.out
sql/londiste/expected/londiste_merge.out [new file with mode: 0644]
sql/londiste/expected/londiste_provider.out
sql/londiste/expected/londiste_seqs.out [new file with mode: 0644]
sql/londiste/expected/londiste_subscriber.out
sql/londiste/functions/londiste.execute_finish.sql [new file with mode: 0644]
sql/londiste/functions/londiste.execute_start.sql [new file with mode: 0644]
sql/londiste/functions/londiste.find_table_triggers.sql [deleted file]
sql/londiste/functions/londiste.get_seq_list.sql [new file with mode: 0644]
sql/londiste/functions/londiste.get_table_list.sql [new file with mode: 0644]
sql/londiste/functions/londiste.global_add_table.sql [moved from sql/londiste/functions/londiste.set_add_table.sql with 63% similarity]
sql/londiste/functions/londiste.global_remove_seq.sql [new file with mode: 0644]
sql/londiste/functions/londiste.global_remove_table.sql [moved from sql/londiste/functions/londiste.set_remove_table.sql with 66% similarity]
sql/londiste/functions/londiste.global_update_seq.sql [new file with mode: 0644]
sql/londiste/functions/londiste.handle_fkeys.sql
sql/londiste/functions/londiste.handle_triggers.sql [deleted file]
sql/londiste/functions/londiste.local_add_seq.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_add_table.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_remove_seq.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_remove_table.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_set_skip_truncate.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_set_table_state.sql [moved from sql/londiste/functions/londiste.node_set_table_state.sql with 71% similarity]
sql/londiste/functions/londiste.local_set_table_struct.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_add_seq.sql [deleted file]
sql/londiste/functions/londiste.node_add_table.sql [deleted file]
sql/londiste/functions/londiste.node_disable_triggers.sql [deleted file]
sql/londiste/functions/londiste.node_get_seq_list.sql [deleted file]
sql/londiste/functions/londiste.node_get_table_list.sql [deleted file]
sql/londiste/functions/londiste.node_prepare_triggers.sql [deleted file]
sql/londiste/functions/londiste.node_refresh_triggers.sql [deleted file]
sql/londiste/functions/londiste.node_remove_seq.sql [deleted file]
sql/londiste/functions/londiste.node_remove_table.sql [deleted file]
sql/londiste/functions/londiste.node_set_skip_truncate.sql [deleted file]
sql/londiste/functions/londiste.root_check_seqs.sql [new file with mode: 0644]
sql/londiste/functions/londiste.root_notify_change.sql
sql/londiste/functions/londiste.set_get_table_list.sql [deleted file]
sql/londiste/sql/londiste_denytrigger.sql [deleted file]
sql/londiste/sql/londiste_execute.sql [new file with mode: 0644]
sql/londiste/sql/londiste_fkeys.sql
sql/londiste/sql/londiste_install.sql
sql/londiste/sql/londiste_merge.sql [new file with mode: 0644]
sql/londiste/sql/londiste_provider.sql
sql/londiste/sql/londiste_seqs.sql [new file with mode: 0644]
sql/londiste/sql/londiste_subscriber.sql
sql/londiste/structure/functions.sql
sql/londiste/structure/grants.sql
sql/londiste/structure/tables.old.sql [new file with mode: 0644]
sql/londiste/structure/tables.sql

index 6d0b00b485640aa5efef048db2e1761803519250..72c32f8bff4deb2b59cd3c4a0b6d9bce315aee2e 100644 (file)
@@ -1,12 +1,12 @@
 
 DATA_built = londiste.sql londiste.upgrade.sql
-DOCS = README.londiste
 
 SQLS = structure/tables.sql structure/grants.sql structure/functions.sql
 FUNCS = $(shell sed -n -e '/^\\/{s/\\i //;p}' $(SQLS))
 SRCS = $(SQLS) $(FUNCS)
 
-REGRESS = londiste_install londiste_provider londiste_subscriber londiste_fkeys
+REGRESS = londiste_install londiste_provider londiste_subscriber \
+         londiste_fkeys londiste_execute londiste_seqs londiste_merge
 # londiste_denytrigger
 REGRESS_OPTS = --load-language=plpgsql
 
diff --git a/sql/londiste/README.londiste b/sql/londiste/README.londiste
deleted file mode 100644 (file)
index 5104f4f..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-
-londiste database backend
---------------------------
-
-Provider side:
---------------
-
-londiste.provider_table
-londiste.provider_seq
-
-
-Subscriber side
----------------
-
-table londiste.completed
-table londiste.subscriber_table
-table londiste.subscriber_seq
-
-
-Open issues
-------------
-
-- notify behaviour
-- should notify-s given to db for processing?
-- link init functions
-- switchover
-- are set_last_tick()/get_last_tick() functions needed anymore?
-- typecheck for add_table()?
-
diff --git a/sql/londiste/expected/londiste_denytrigger.out b/sql/londiste/expected/londiste_denytrigger.out
deleted file mode 100644 (file)
index 4fe2f40..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-create table denytest ( val integer);
-insert into denytest values (1);
-create trigger xdeny after insert or update or delete
-on denytest for each row execute procedure londiste.deny_trigger();
-insert into denytest values (2);
-ERROR:  ('Changes no allowed on this table',)
-update denytest set val = 2;
-ERROR:  ('Changes no allowed on this table',)
-delete from denytest;
-ERROR:  ('Changes no allowed on this table',)
-select londiste.disable_deny_trigger(true);
- disable_deny_trigger 
-----------------------
- t
-(1 row)
-
-update denytest set val = 2;
-select londiste.disable_deny_trigger(true);
- disable_deny_trigger 
-----------------------
- t
-(1 row)
-
-update denytest set val = 2;
-select londiste.disable_deny_trigger(false);
- disable_deny_trigger 
-----------------------
- f
-(1 row)
-
-update denytest set val = 2;
-ERROR:  ('Changes no allowed on this table',)
-select londiste.disable_deny_trigger(false);
- disable_deny_trigger 
-----------------------
- f
-(1 row)
-
-update denytest set val = 2;
-ERROR:  ('Changes no allowed on this table',)
diff --git a/sql/londiste/expected/londiste_execute.out b/sql/londiste/expected/londiste_execute.out
new file mode 100644 (file)
index 0000000..c1e9745
--- /dev/null
@@ -0,0 +1,67 @@
+set log_error_verbosity = 'terse';
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+ ret_code |       ret_note       
+----------+----------------------
+      200 | Executing: DDL-A.sql
+(1 row)
+
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+ ret_code |              ret_note              
+----------+------------------------------------
+      301 | EXECUTE(DDL-A.sql) already applied
+(1 row)
+
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+ ret_code |          ret_note           
+----------+-----------------------------
+      200 | Execute finished: DDL-A.sql
+(1 row)
+
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+ ret_code |          ret_note           
+----------+-----------------------------
+      200 | Execute finished: DDL-A.sql
+(1 row)
+
+select * from londiste.execute_finish('branch_set', 'DDL-XXX.sql');
+ ret_code |                 ret_note                  
+----------+-------------------------------------------
+      404 | execute_file called without execute_start
+(1 row)
+
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+ ret_code |             ret_note              
+----------+-----------------------------------
+      401 | Node is not root node: branch_set
+(1 row)
+
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+ ret_code |             ret_note              
+----------+-----------------------------------
+      401 | Node is not root node: branch_set
+(1 row)
+
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+ ret_code |        ret_note         
+----------+-------------------------
+      200 | Executing: DDL-root.sql
+(1 row)
+
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+ ret_code |               ret_note                
+----------+---------------------------------------
+      301 | EXECUTE(DDL-root.sql) already applied
+(1 row)
+
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+ ret_code |            ret_note            
+----------+--------------------------------
+      200 | Execute finished: DDL-root.sql
+(1 row)
+
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+ ret_code |            ret_note            
+----------+--------------------------------
+      200 | Execute finished: DDL-root.sql
+(1 row)
+
index ed220cc827e68c23414bfb37fec3946aadafeac2..34055e0f31d8fb1165eb8ea83831778bd78a18ef 100644 (file)
@@ -16,37 +16,37 @@ create table ref_3 (
     val text
 );
 NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "ref_3_pkey" for table "ref_3"
-select * from londiste.set_add_table('branch_set', 'public.ref_1');
+select * from londiste.global_add_table('branch_set', 'public.ref_1');
  ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
-select * from londiste.set_add_table('branch_set', 'public.ref_2');
+select * from londiste.global_add_table('branch_set', 'public.ref_2');
  ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
-select * from londiste.set_add_table('branch_set', 'public.ref_3');
+select * from londiste.global_add_table('branch_set', 'public.ref_3');
  ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
-select * from londiste.node_add_table('branch_set', 'public.ref_1');
+select * from londiste.local_add_table('branch_set', 'public.ref_1');
  ret_code |         ret_note          
 ----------+---------------------------
       200 | Table added: public.ref_1
 (1 row)
 
-select * from londiste.node_add_table('branch_set', 'public.ref_2');
+select * from londiste.local_add_table('branch_set', 'public.ref_2');
  ret_code |         ret_note          
 ----------+---------------------------
       200 | Table added: public.ref_2
 (1 row)
 
-select * from londiste.node_add_table('branch_set', 'public.ref_3');
+select * from londiste.local_add_table('branch_set', 'public.ref_3');
  ret_code |         ret_note          
 ----------+---------------------------
       200 | Table added: public.ref_3
@@ -76,7 +76,7 @@ select * from londiste.get_table_pending_fkeys('public.ref_2');
 ------------+----------+-----------+----------
 (0 rows)
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
  from_table | to_table | fkey_name | fkey_def 
 ------------+----------+-----------+----------
 (0 rows)
@@ -135,42 +135,42 @@ select * from londiste.get_table_pending_fkeys('public.ref_2');
  public.ref_3 | public.ref_2 | ref_3_ref2_fkey | alter table only public.ref_3 add constraint ref_3_ref2_fkey FOREIGN KEY (ref2) REFERENCES ref_2(id)
 (2 rows)
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
  from_table | to_table | fkey_name | fkey_def 
 ------------+----------+-----------+----------
 (0 rows)
 
 -- toggle sync
-select * from londiste.node_set_table_state('branch_set', 'public.ref_1', null, 'ok');
node_set_table_state 
-----------------------
-                    1
+select * from londiste.local_set_table_state('branch_set', 'public.ref_1', null, 'ok');
local_set_table_state 
+-----------------------
+                     1
 (1 row)
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
  from_table | to_table | fkey_name | fkey_def 
 ------------+----------+-----------+----------
 (0 rows)
 
-select * from londiste.node_set_table_state('branch_set', 'public.ref_2', null, 'ok');
node_set_table_state 
-----------------------
-                    1
+select * from londiste.local_set_table_state('branch_set', 'public.ref_2', null, 'ok');
local_set_table_state 
+-----------------------
+                     1
 (1 row)
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
   from_table  |   to_table   |   fkey_name    |                                              fkey_def                                              
 --------------+--------------+----------------+----------------------------------------------------------------------------------------------------
  public.ref_2 | public.ref_1 | ref_2_ref_fkey | alter table only public.ref_2 add constraint ref_2_ref_fkey FOREIGN KEY (ref) REFERENCES ref_1(id)
 (1 row)
 
-select * from londiste.node_set_table_state('branch_set', 'public.ref_3', null, 'ok');
node_set_table_state 
-----------------------
-                    1
+select * from londiste.local_set_table_state('branch_set', 'public.ref_3', null, 'ok');
local_set_table_state 
+-----------------------
+                     1
 (1 row)
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
   from_table  |   to_table   |    fkey_name    |                                               fkey_def                                               
 --------------+--------------+-----------------+------------------------------------------------------------------------------------------------------
  public.ref_2 | public.ref_1 | ref_2_ref_fkey  | alter table only public.ref_2 add constraint ref_2_ref_fkey FOREIGN KEY (ref) REFERENCES ref_1(id)
@@ -196,7 +196,7 @@ select * from londiste.get_table_pending_fkeys('public.ref_2');
 ------------+----------+-----------+----------
 (0 rows)
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
  from_table | to_table | fkey_name | fkey_def 
 ------------+----------+-----------+----------
 (0 rows)
diff --git a/sql/londiste/expected/londiste_merge.out b/sql/londiste/expected/londiste_merge.out
new file mode 100644 (file)
index 0000000..f7b5b61
--- /dev/null
@@ -0,0 +1,213 @@
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+--
+-- tables
+--
+create table tblmerge (
+    id int4 primary key,
+    data text
+);
+select * from pgq_node.register_location('combined_set', 'croot', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('combined_set', 'root', 'croot', 'londiste_croot', null, null, null);
+ ret_code |                          ret_note                           
+----------+-------------------------------------------------------------
+      200 | Node "croot" initialized for queue "croot" with type "root"
+(1 row)
+
+select * from pgq_node.register_location('part1_set', 'p1root', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('part1_set', 'p1merge', 'dbname=db2', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('part1_set', 'leaf', 'p1merge', 'londiste_p1merge', 'p1root', 100, 'combined_set');
+ ret_code |                            ret_note                             
+----------+-----------------------------------------------------------------
+      200 | Node "p1merge" initialized for queue "p1merge" with type "leaf"
+(1 row)
+
+select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set');
+ ret_code |                            ret_note                             
+----------+-----------------------------------------------------------------
+      200 | Node "p2merge" initialized for queue "p2merge" with type "leaf"
+(1 row)
+
+select * from londiste.local_add_table('combined_set', 'tblmerge');
+ ret_code |           ret_note           
+----------+------------------------------
+      200 | Table added: public.tblmerge
+(1 row)
+
+select * from londiste.global_add_table('part1_set', 'tblmerge');
+ ret_code | ret_note 
+----------+----------
+      200 | OK
+(1 row)
+
+select * from londiste.local_add_table('part1_set', 'tblmerge');
+ ret_code |           ret_note           
+----------+------------------------------
+      200 | Table added: public.tblmerge
+(1 row)
+
+select * from londiste.global_add_table('part2_set', 'tblmerge');
+ ret_code | ret_note 
+----------+----------
+      200 | OK
+(1 row)
+
+select * from londiste.local_add_table('part2_set', 'tblmerge');
+ ret_code |           ret_note           
+----------+------------------------------
+      200 | Table added: public.tblmerge
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     |             |                 |               |             | 
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     |             |                 |               |             | 
+(1 row)
+
+select * from londiste.get_table_list('combined_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     | ok          |                 |               |             | 
+(1 row)
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+ local_set_table_state 
+-----------------------
+                     1
+(1 row)
+
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'in-copy');
+ local_set_table_state 
+-----------------------
+                     1
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     | in-copy     |                 |               |             | lead
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     | in-copy     |                 |               |             | wait-copy
+(1 row)
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'create index;');
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Table struct stored
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate |  dropped_ddl  | copy_role 
+-----------------+-------+-------------+-----------------+---------------+---------------+-----------
+ public.tblmerge | t     | in-copy     |                 |               | create index; | lead
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl |  copy_role  
+-----------------+-------+-------------+-----------------+---------------+-------------+-------------
+ public.tblmerge | t     | in-copy     |                 |               |             | wait-replay
+(1 row)
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+ local_set_table_state 
+-----------------------
+                     1
+(1 row)
+
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+ local_set_table_state 
+-----------------------
+                     1
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate |  dropped_ddl  | copy_role 
+-----------------+-------+-------------+-----------------+---------------+---------------+-----------
+ public.tblmerge | t     | in-copy     |                 |               | create index; | 
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl |  copy_role  
+-----------------+-------+-------------+-----------------+---------------+-------------+-------------
+ public.tblmerge | t     | catching-up |                 |               |             | wait-replay
+(1 row)
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Table struct stored
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     | in-copy     |                 |               |             | 
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl |  copy_role  
+-----------------+-------+-------------+-----------------+---------------+-------------+-------------
+ public.tblmerge | t     | catching-up |                 |               |             | wait-replay
+(1 row)
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
+ local_set_table_state 
+-----------------------
+                     1
+(1 row)
+
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+ local_set_table_state 
+-----------------------
+                     1
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     | catching-up |                 |               |             | 
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t     | catching-up |                 |               |             | 
+(1 row)
+
index bfa0d4a4716ef81314f9d229aae8016e5d03fe1f..4d26339efca908e37811f94e3f470151fb98db05 100644 (file)
@@ -17,59 +17,59 @@ select current_database();
  regression
 (1 row)
 
-select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
- ret_code | ret_note 
-----------+----------
-      200 | Ok
+select * from pgq_node.register_location('aset', 'rnode', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
 (1 row)
 
-select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
- ret_code | ret_note 
-----------+----------
-      200 | Ok
+select * from pgq_node.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+ ret_code |                          ret_note                           
+----------+-------------------------------------------------------------
+      200 | Node "rnode" initialized for queue "rnode" with type "root"
 (1 row)
 
-select * from londiste.node_add_table('aset', 'public.testdata_nopk');
+select * from londiste.local_add_table('aset', 'public.testdata_nopk');
  ret_code |                      ret_note                      
 ----------+----------------------------------------------------
       400 | Primary key missing on table: public.testdata_nopk
 (1 row)
 
-select * from londiste.node_add_table('aset', 'public.testdata');
+select * from londiste.local_add_table('aset', 'public.testdata');
  ret_code |           ret_note           
 ----------+------------------------------
       200 | Table added: public.testdata
 (1 row)
 
 insert into testdata (data) values ('test-data');
-select * from londiste.node_get_table_list('aset');
-   table_name    | merge_state | custom_snapshot | skip_truncat
------------------+-------------+-----------------+---------------
- public.testdata | ok          |                 | 
+select * from londiste.get_table_list('aset');
+   table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_rol
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.testdata | t     | ok          |                 |               |             | 
 (1 row)
 
-select * from londiste.node_remove_table('aset', 'public.testdata');
+select * from londiste.local_remove_table('aset', 'public.testdata');
  ret_code |            ret_note            
 ----------+--------------------------------
       200 | Table removed: public.testdata
 (1 row)
 
-select * from londiste.node_remove_table('aset', 'public.testdata');
- ret_code |          ret_note          
-----------+----------------------------
-      400 | Not found: public.testdata
+select * from londiste.local_remove_table('aset', 'public.testdata');
+ ret_code |             ret_note             
+----------+----------------------------------
+      400 | Table not found: public.testdata
 (1 row)
 
-select * from londiste.node_get_table_list('aset');
- table_name | merge_state | custom_snapshot | skip_truncat
-------------+-------------+-----------------+---------------
+select * from londiste.get_table_list('aset');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_rol
+------------+-------+-------------+-----------------+---------------+-------------+-----------
 (0 rows)
 
 select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
- ev_id |   ev_type    |              ev_data               |    ev_extra1    
--------+--------------+------------------------------------+-----------------
-     1 | add-table    | public.testdata                    | 
-     2 | I            | (id,data) values ('1','test-data') | public.testdata
-     3 | remove-table | public.testdata                    | 
+ ev_id |        ev_type        |              ev_data               |    ev_extra1    
+-------+-----------------------+------------------------------------+-----------------
+     1 | londiste.add-table    | public.testdata                    | 
+     2 | I                     | (id,data) values ('1','test-data') | public.testdata
+     3 | londiste.remove-table | public.testdata                    | 
 (3 rows)
 
diff --git a/sql/londiste/expected/londiste_seqs.out b/sql/londiste/expected/londiste_seqs.out
new file mode 100644 (file)
index 0000000..2902282
--- /dev/null
@@ -0,0 +1,127 @@
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+--
+-- sequences
+--
+create sequence masterseq;
+create sequence slaveseq;
+select * from pgq_node.register_location('seqroot', 'rnode', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('seqroot', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+ ret_code |                          ret_note                           
+----------+-------------------------------------------------------------
+      200 | Node "rnode" initialized for queue "rnode" with type "root"
+(1 row)
+
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+ ret_code |    ret_note    
+----------+----------------
+      200 | Sequence added
+(1 row)
+
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+ ret_code |                 ret_note                 
+----------+------------------------------------------
+      201 | Sequence already added: public.masterseq
+(1 row)
+
+select * from londiste.root_check_seqs('seqroot');
+ ret_code |     ret_note      
+----------+-------------------
+      200 | Sequences updated
+(1 row)
+
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+ ret_code |              ret_note              
+----------+------------------------------------
+      200 | Sequence removed: public.masterseq
+(1 row)
+
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+ ret_code |               ret_note               
+----------+--------------------------------------
+      400 | Sequence not found: public.masterseq
+(1 row)
+
+select * from londiste.get_seq_list('seqroot');
+ seq_name | last_value | local 
+----------+------------+-------
+(0 rows)
+
+select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
+ ev_id |        ev_type        |              ev_data               |    ev_extra1     
+-------+-----------------------+------------------------------------+------------------
+     1 | londiste.add-table    | public.testdata                    | 
+     2 | I                     | (id,data) values ('1','test-data') | public.testdata
+     3 | londiste.remove-table | public.testdata                    | 
+     4 | EXECUTE               | drop all                           | DDL-root.sql
+     5 | EXECUTE               | drop all                           | DDL-root.sql
+     1 | londiste.update-seq   | 30001                              | public.masterseq
+     2 | londiste.remove-seq   | public.masterseq                   | 
+(7 rows)
+
+-- subscriber
+select * from pgq_node.register_location('seqbranch', 'subnode', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('seqbranch', 'rootnode', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('seqbranch', 'branch', 'subnode', 'londiste_branch', 'rootnode', 1, null::text);
+ ret_code |                             ret_note                              
+----------+-------------------------------------------------------------------
+      200 | Node "subnode" initialized for queue "subnode" with type "branch"
+(1 row)
+
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+ ret_code |              ret_note              
+----------+------------------------------------
+      404 | Unknown sequence: public.masterseq
+(1 row)
+
+select * from londiste.global_update_seq('seqbranch', 'masterseq', 5);
+ ret_code |     ret_note     
+----------+------------------
+      200 | Sequence updated
+(1 row)
+
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+ ret_code |    ret_note    
+----------+----------------
+      200 | Sequence added
+(1 row)
+
+select * from londiste.root_check_seqs('seqbranch');
+ ret_code |    ret_note     
+----------+-----------------
+      402 | Not a root node
+(1 row)
+
+select * from londiste.get_seq_list('seqbranch');
+     seq_name     | last_value | local 
+------------------+------------+-------
+ public.masterseq |          5 | t
+(1 row)
+
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+ ret_code |              ret_note              
+----------+------------------------------------
+      200 | Sequence removed: public.masterseq
+(1 row)
+
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+ ret_code |               ret_note               
+----------+--------------------------------------
+      404 | Sequence not found: public.masterseq
+(1 row)
+
index feb73397c33a7516704ce086e5b560f140ce5368..3d42403e2fa1238cc97af0f5c83badcdc5c6730b 100644 (file)
@@ -13,62 +13,76 @@ select current_database();
  regression
 (1 row)
 
-select * from pgq_set.add_member('branch_set', 'snode', 'dbname=db', false);
- ret_code | ret_note 
-----------+----------
-      200 | Ok
+select * from pgq_node.register_location('branch_set', 'snode', 'dbname=db', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
 (1 row)
 
-select * from pgq_set.add_member('branch_set', 'pnode', 'dbname=db2', false);
- ret_code | ret_note 
-----------+----------
-      200 | Ok
+select * from pgq_node.register_location('branch_set', 'pnode', 'dbname=db2', false);
+ ret_code |      ret_note       
+----------+---------------------
+      200 | Location registered
 (1 row)
 
-select * from pgq_set.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
- ret_code | ret_note 
-----------+----------
-      200 | Ok
+select * from pgq_node.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
+ ret_code |                           ret_note                            
+----------+---------------------------------------------------------------
+      200 | Node "snode" initialized for queue "snode" with type "branch"
 (1 row)
 
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
- ret_code |                   ret_note                    
-----------+-----------------------------------------------
-      400 | Table not registered in set: public.slavedata
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
+ ret_code |                    ret_note                    
+----------+------------------------------------------------
+      404 | Table not available on queue: public.slavedata
 (1 row)
 
-select * from londiste.set_add_table('branch_set', 'public.slavedata');
+select * from londiste.global_add_table('branch_set', 'public.slavedata');
  ret_code | ret_note 
 ----------+----------
       200 | OK
 (1 row)
 
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
  ret_code |           ret_note            
 ----------+-------------------------------
       200 | Table added: public.slavedata
 (1 row)
 
-select * from londiste.node_get_table_list('branch_set');
-    table_name    | merge_state | custom_snapshot | skip_truncate 
-------------------+-------------+-----------------+---------------
- public.slavedata |             |                 | 
+select * from londiste.global_add_table('branch_set', 'public.tmp');
+ ret_code | ret_note 
+----------+----------
+      200 | OK
+(1 row)
+
+select * from londiste.get_table_list('branch_set');
+    table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+------------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.slavedata | t     |             |                 |               |             | 
+ public.tmp       | f     |             |                 |               |             | 
+(2 rows)
+
+select * from londiste.global_remove_table('branch_set', 'public.tmp');
+ ret_code | ret_note 
+----------+----------
+      200 | OK
 (1 row)
 
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
  ret_code |            ret_note             
 ----------+---------------------------------
       200 | Table removed: public.slavedata
 (1 row)
 
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
- ret_code |          ret_note           
-----------+-----------------------------
-      400 | Not found: public.slavedata
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
+ ret_code |                    ret_note                    
+----------+------------------------------------------------
+      400 | Table not registered locally: public.slavedata
 (1 row)
 
-select * from londiste.node_get_table_list('branch_set');
- table_name | merge_state | custom_snapshot | skip_truncate 
-------------+-------------+-----------------+---------------
-(0 rows)
+select * from londiste.get_table_list('branch_set');
+    table_name    | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role 
+------------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.slavedata | f     |             |                 |               |             | 
+(1 row)
 
diff --git a/sql/londiste/functions/londiste.execute_finish.sql b/sql/londiste/functions/londiste.execute_finish.sql
new file mode 100644 (file)
index 0000000..3edf835
--- /dev/null
@@ -0,0 +1,51 @@
+create or replace function londiste.execute_finish(
+    in i_queue_name     text,
+    in i_file_name      text,
+    out ret_code        int4,
+    out ret_note        text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.execute_finish(2)
+--
+--      Finish execution of DDL.  Should be called at the
+--      end of the transaction that does the SQL execution.
+--
+-- Called-by:
+--      Londiste setup tool on root, replay on branches/leafs.
+--
+-- Returns:
+--      200 - Proceed.
+--      404 - Current entry not found, execute_start() was not called?
+-- ----------------------------------------------------------------------
+declare
+    is_root boolean;
+    sql text;
+begin
+    is_root := pgq_node.is_root_node(i_queue_name);
+
+    select execute_sql into sql
+        from londiste.applied_execute
+        where queue_name = i_queue_name
+            and execute_file = i_file_name;
+    if not found then
+        select 404, 'execute_file called without execute_start'
+            into ret_code, ret_note;
+        return;
+    end if;
+
+    if is_root then
+        perform pgq.insert_event(i_queue_name, 'EXECUTE', sql, i_file_name, null, null, null);
+    end if;
+
+    -- try educated guess of previous state
+    if is_root then
+        SET LOCAL session_replication_role = 'origin';
+    else
+        SET LOCAL session_replication_role = 'replica';
+    end if;
+
+    select 200, 'Execute finished: ' || i_file_name into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.execute_start.sql b/sql/londiste/functions/londiste.execute_start.sql
new file mode 100644 (file)
index 0000000..bc2874f
--- /dev/null
@@ -0,0 +1,62 @@
+create or replace function londiste.execute_start(
+    in i_queue_name     text,
+    in i_file_name      text,
+    in i_sql            text,
+    in i_expect_root    boolean,
+    out ret_code        int4,
+    out ret_note        text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.execute_start(4)
+--
+--      Start execution of DDL.  Should be called at the
+--      start of the transaction that does the SQL execution.
+--
+-- Called-by:
+--      Londiste setup tool on root, replay on branches/leafs.
+--
+-- Parameters:
+--      i_queue_name    - cascaded queue name
+--      i_file_name     - Unique ID for SQL
+--      i_sql           - Actual script (informative, not used here)
+--      i_expect_root   - Is this on root?  Setup tool sets this to avoid
+--                        execution on branches.
+--
+-- Returns:
+--      200 - Proceed.
+--      301 - Already applied
+--      401 - Not root.
+--      404 - No such queue
+-- ----------------------------------------------------------------------
+declare
+    is_root boolean;
+begin
+    is_root := pgq_node.is_root_node(i_queue_name);
+    if i_expect_root then
+        if not is_root then
+            select 401, 'Node is not root node: ' || i_queue_name
+                into ret_code, ret_note;
+            return;
+        end if;
+    end if;
+
+    perform 1 from londiste.applied_execute
+        where queue_name = i_queue_name
+            and execute_file = i_file_name;
+    if found then
+        select 301, 'EXECUTE(' || i_file_name || ') already applied'
+            into ret_code, ret_note;
+        return;
+    end if;
+
+    -- this also lock against potetial parallel execute
+    insert into londiste.applied_execute (queue_name, execute_file, execute_sql)
+        values (i_queue_name, i_file_name, i_sql);
+
+    SET LOCAL session_replication_role = 'local';
+
+    select 200, 'Executing: ' || i_file_name into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.find_table_triggers.sql b/sql/londiste/functions/londiste.find_table_triggers.sql
deleted file mode 100644 (file)
index d75e5e9..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-
-create or replace function londiste.find_table_triggers(i_table_name text)
-returns setof londiste.pending_triggers as $$
--- ----------------------------------------------------------------------
--- Function: londiste.find_table_triggers(1)
---
---      Returns all existing triggers on table.
---
--- Parameters:
---      i_table_name - table name
---
--- Returns:
---      table_name      - fq table name
---      trigger_name    - name
---      trigger_def     - partial def as returned by pg_get_triggerdef()
--- ----------------------------------------------------------------------
-declare
-    tg        record;
-begin
-    for tg in
-        select n.nspname || '.' || c.relname as table_name, t.tgname::text as name, pg_get_triggerdef(t.oid) as def 
-        from pg_trigger t, pg_class c, pg_namespace n
-        where n.oid = c.relnamespace and c.oid = t.tgrelid
-            and t.tgrelid = londiste.find_table_oid(i_table_name)
-            and not t.tgisconstraint
-    loop
-        return next tg;
-    end loop;
-    
-    return;
-end;
-$$ language plpgsql strict stable;
-
diff --git a/sql/londiste/functions/londiste.get_seq_list.sql b/sql/londiste/functions/londiste.get_seq_list.sql
new file mode 100644 (file)
index 0000000..fa4c425
--- /dev/null
@@ -0,0 +1,31 @@
+
+create or replace function londiste.get_seq_list(
+    in i_queue_name text,
+    out seq_name text,
+    out last_value int8,
+    out local boolean)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_seq_list(1)
+--
+--      Returns registered seqs on this Londiste node.
+--
+-- Result fiels:
+--      seq_name    - fully qualified name of sequence
+--      last_value  - last globally published value
+--      local       - is locally registered
+-- ----------------------------------------------------------------------
+declare
+    rec record;
+begin
+    for seq_name, last_value, local in
+        select s.seq_name, s.last_value, s.local from londiste.seq_info s
+            where s.queue_name = i_queue_name
+            order by s.nr, s.seq_name
+    loop
+        return next;
+    end loop;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.get_table_list.sql b/sql/londiste/functions/londiste.get_table_list.sql
new file mode 100644 (file)
index 0000000..0f77e1c
--- /dev/null
@@ -0,0 +1,113 @@
+
+create or replace function londiste.get_table_list(
+    in i_queue_name text,
+    out table_name text,
+    out local boolean,
+    out merge_state text,
+    out custom_snapshot text,
+    out skip_truncate bool,
+    out dropped_ddl text,
+    out copy_role text)
+returns setof record as $$ 
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_table_list(1)
+--
+--      Return info about registered tables.
+--
+-- Parameters:
+--      i_queue_name - cascaded queue name
+--
+-- Returns:
+--      table_name      - fully-quelified table name
+--      local           - does events needs to be applied to local table
+--      merge_state     - show phase of initial copy
+--      custom_snapshot - remote snapshot of COPY transaction
+--      skip_truncate   - don't truncate table on copy
+--      dropped_ddl     - partition combining: temp place to put DDL
+--      copy_role       - partition combining: how to handle copy
+--
+-- copy_role = lead:
+--      on copy start, drop indexes and store in dropped_ddl
+--      on copy finish wait, until copy_role turns to NULL
+--      if dropped_ddl not NULL, restore them
+--      tag as catching-up
+-- copy_role = wait-copy:
+--      on copy start wait, until role changes (to wait-replay)
+-- copy_role = wait-replay:
+--      on copy finish, tag as 'catching-up'
+--      wait until copy_role is NULL, then proceed
+--
+declare
+    q_part1     text;
+    q_target    text;
+    n_parts     int4;
+    n_done      int4;
+begin
+    -- get first part queue, if exists
+    select n.combined_queue into q_target
+        from pgq_node.node_info n
+        where n.queue_name = i_queue_name;
+    if q_target is not null then
+        select n.queue_name into q_part1
+            from pgq_node.node_info n
+            where n.combined_queue = q_target
+            order by n.queue_name
+            limit 1;
+        select count(*) into n_parts
+            from pgq_node.node_info n
+            where n.combined_queue = q_target;
+    end if;
+
+    for table_name, local, merge_state, custom_snapshot, skip_truncate, dropped_ddl in 
+        select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.skip_truncate, t.dropped_ddl
+            from londiste.table_info t
+            where t.queue_name = i_queue_name
+            order by t.nr, t.table_name
+    loop
+        -- if the table is in middle of copy from multiple partitions,
+        -- the copy processes need coordination
+        copy_role := null;
+        if q_part1 is not null then
+            select count(*) into n_done
+                from londiste.table_info t, pgq_node.node_info n
+                where n.combined_queue = q_target
+                    and t.queue_name = n.queue_name
+                    and t.table_name = table_name
+                    and (t.merge_state is not null
+                         and t.merge_state <> 'in-copy');
+            if i_queue_name = q_part1 then
+                -- lead
+                if merge_state = 'in-copy' then
+                    -- show copy_role only if need to wait for others
+                    if n_done < n_parts - 1 then
+                        copy_role := 'lead';
+                    end if;
+                end if;
+            else
+                -- follow
+                if merge_state = 'in-copy' then
+                    -- has lead already dropped ddl?
+                    perform 1 from londiste.table_info t
+                        where t.queue_name = q_part1
+                            and t.table_name = table_name
+                            and t.dropped_ddl is not null;
+                    if found then
+                        copy_role := 'wait-replay';
+                    else
+                        copy_role := 'wait-copy';
+                    end if;
+                elsif merge_state = 'catching-up' then
+                    -- show copy_role only if need to wait for lead
+                    if n_done < n_parts then
+                        copy_role := 'wait-replay';
+                    end if;
+                end if;
+            end if;
+        end if;
+
+        return next;
+    end loop; 
+    return;
+end; 
+$$ language plpgsql strict stable;
+
similarity index 63%
rename from sql/londiste/functions/londiste.set_add_table.sql
rename to sql/londiste/functions/londiste.global_add_table.sql
index 8a6461c83b9e456c55d7564d568c7c5fb16c03a8..40b22d62cab2bbad20659bac6a35d3d3d63ed6d2 100644 (file)
@@ -1,12 +1,12 @@
 
-create or replace function londiste.set_add_table(
-    in i_set_name       text,
+create or replace function londiste.global_add_table(
+    in i_queue_name     text,
     in i_table_name     text,
     out ret_code        int4,
     out ret_note        text)
 as $$
 -- ----------------------------------------------------------------------
--- Function: londiste.node_add_table(x)
+-- Function: londiste.global_add_table(2)
 --
 --      Register table on Londiste set.
 --
@@ -14,7 +14,7 @@ as $$
 --      in queue and nodes can attach to it.
 --
 -- Called by:
---      on root - londiste.node_add_table()
+--      on root - londiste.local_add_table()
 --      elsewhere - londiste consumer when receives new table event
 --
 -- Returns:
@@ -22,25 +22,24 @@ as $$
 --      400 - No such set
 -- ----------------------------------------------------------------------
 declare
-    col_types text;
     fq_table_name text;
 begin
     fq_table_name := londiste.make_fqname(i_table_name);
 
-    perform 1 from pgq_set.set_info where set_name = i_set_name;
+    perform 1 from pgq_node.node_info where queue_name = i_queue_name;
     if not found then
-        select 400, 'No such set: ' || i_set_name into ret_code, ret_note;
+        select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
         return;
     end if;
 
-    perform 1 from londiste.set_table where set_name = i_set_name and table_name = fq_table_name;
+    perform 1 from londiste.table_info where queue_name = i_queue_name and table_name = fq_table_name;
     if found then
         select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_note;
         return;
     end if;
 
-    insert into londiste.set_table (set_name, table_name)
-        values (i_set_name, fq_table_name);
+    insert into londiste.table_info (queue_name, table_name)
+        values (i_queue_name, fq_table_name);
     select 200, 'OK' into ret_code, ret_note;
     return;
 end;
diff --git a/sql/londiste/functions/londiste.global_remove_seq.sql b/sql/londiste/functions/londiste.global_remove_seq.sql
new file mode 100644 (file)
index 0000000..1b9b497
--- /dev/null
@@ -0,0 +1,37 @@
+
+create or replace function londiste.global_remove_seq(
+    in i_queue_name text, in i_seq_name text,
+    out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.global_remove_seq(2)
+--
+--      Removes sequence registration in set.
+--
+-- Called by:
+--      - On root by londiste.local_remove_seq()
+--      - Elsewhere by consumer receiving seq remove event
+--
+-- Returns:
+--      200 - OK
+--      400 - not found
+-- ----------------------------------------------------------------------
+declare
+    fq_name text;
+begin
+    fq_name := londiste.make_fqname(i_seq_name);
+    delete from londiste.seq_info
+        where queue_name = i_queue_name
+          and seq_name = fq_name;
+    if not found then
+        select 400, 'Sequence not found: '||fq_name into ret_code, ret_note;
+        return;
+    end if;
+    if pgq_node.is_root_node(i_queue_name) then
+        perform londiste.root_notify_change(i_queue_name, 'londiste.remove-seq', fq_name);
+    end if;
+    select 200, 'Sequence removed: '||fq_name into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql strict;
+
similarity index 66%
rename from sql/londiste/functions/londiste.set_remove_table.sql
rename to sql/londiste/functions/londiste.global_remove_table.sql
index 683ea837ee799c33d843c5f05417fe98e3f26e0f..dd473b405e21b0811dc2aea826156045ef2cfa83 100644 (file)
@@ -1,17 +1,17 @@
 
-create or replace function londiste.set_remove_table(
-    in i_set_name text, in i_table_name text,
+create or replace function londiste.global_remove_table(
+    in i_queue_name text, in i_table_name text,
     out ret_code int4, out ret_note text)
 as $$
 -- ----------------------------------------------------------------------
--- Function: londiste.set_remove_table(2)
+-- Function: londiste.global_remove_table(2)
 --
 --      Removes tables registration in set.
 --
 --      Means that nodes cannot attach to this table anymore.
 --
 -- Called by:
---      - On root by londiste.node_remove_table()
+--      - On root by londiste.local_remove_table()
 --      - Elsewhere by consumer receiving table remove event
 --
 -- Returns:
@@ -22,11 +22,11 @@ declare
     fq_table_name text;
 begin
     fq_table_name := londiste.make_fqname(i_table_name);
-    if not pgq_set.is_root(i_set_name) then
-        perform londiste.node_remove_table(i_set_name, fq_table_name);
+    if not pgq_node.is_root_node(i_queue_name) then
+        perform londiste.local_remove_table(i_queue_name, fq_table_name);
     end if;
-    delete from londiste.set_table
-        where set_name = i_set_name
+    delete from londiste.table_info
+        where queue_name = i_queue_name
           and table_name = fq_table_name;
     if not found then
         select 400, 'Not found: '||fq_table_name into ret_code, ret_note;
diff --git a/sql/londiste/functions/londiste.global_update_seq.sql b/sql/londiste/functions/londiste.global_update_seq.sql
new file mode 100644 (file)
index 0000000..60e65c8
--- /dev/null
@@ -0,0 +1,57 @@
+
+create or replace function londiste.global_update_seq(
+    in i_queue_name text, in i_seq_name text, in i_value int8,
+    out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.global_update_seq(3)
+--
+--      Update seq.
+--
+-- Parameters:
+--      i_queue_name  - set name
+--      i_seq_name  - seq name
+--      i_value     - new published value
+--
+-- Returns:
+--      200 - OK
+-- ----------------------------------------------------------------------
+declare
+    n record;
+    fqname text;
+    seq record;
+begin
+    select node_type, node_name into n
+        from pgq_node.node_info
+        where queue_name = i_queue_name;
+    if not found then
+        select 404, 'Set not found: ' || i_queue_name into ret_code, ret_note;
+        return;
+    end if;
+    if n.node_type = 'root' then
+        select 402, 'Must not run on root node' into ret_code, ret_note;
+        return;
+    end if;
+
+    fqname := londiste.make_fqname(i_seq_name);
+    select last_value, local from londiste.seq_info
+        into seq
+        where queue_name = i_queue_name and seq_name = fqname
+        for update;
+    if not found then
+        insert into londiste.seq_info
+            (queue_name, seq_name, last_value)
+        values (i_queue_name, fqname, i_value);
+    else
+        update londiste.seq_info
+            set last_value = i_value
+            where queue_name = i_queue_name and seq_name = fqname;
+        if seq.local then
+            perform pgq.seq_setval(fqname, i_value);
+        end if;
+    end if;
+    select 200, 'Sequence updated' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql;
+
index cdfecfba92ac225b9ad2361bf38b48b0fcab285a..37be4ab5e64733dd1adb92dc79019bb9fa4183ca 100644 (file)
@@ -28,15 +28,15 @@ end;
 $$ language plpgsql strict stable;
 
 
-create or replace function londiste.node_get_valid_pending_fkeys(i_set_name text)
+create or replace function londiste.get_valid_pending_fkeys(i_queue_name text)
 returns setof londiste.pending_fkeys as $$
 -- ----------------------------------------------------------------------
--- Function: londiste.node_get_valid_pending_fkeys(1)
+-- Function: londiste.get_valid_pending_fkeys(1)
 --
 --      Returns dropped fkeys where both sides are in sync now.
 --
 -- Parameters:
---      i_set_name - sets name
+--      i_queue_name - cascaded queue name
 --
 -- Returns:
 --      desc
@@ -47,12 +47,12 @@ begin
     for fkeys in
         select pf.*
         from londiste.pending_fkeys pf
-             left join londiste.node_table st_from on (st_from.table_name = pf.from_table)
-             left join londiste.node_table st_to on (st_to.table_name = pf.to_table)
+             left join londiste.table_info st_from on (st_from.table_name = pf.from_table)
+             left join londiste.table_info st_to on (st_to.table_name = pf.to_table)
         where (st_from.table_name is null or (st_from.merge_state = 'ok' and st_from.custom_snapshot is null))
           and (st_to.table_name is null or (st_to.merge_state = 'ok' and st_to.custom_snapshot is null))
-          and (coalesce(st_from.set_name = i_set_name, false)
-               or coalesce(st_to.set_name = i_set_name, false))
+          and (coalesce(st_from.queue_name = i_queue_name, false)
+               or coalesce(st_to.queue_name = i_queue_name, false))
         order by 1, 2, 3
     loop
         return next fkeys;
@@ -66,7 +66,7 @@ $$ language plpgsql strict stable;
 create or replace function londiste.drop_table_fkey(i_from_table text, i_fkey_name text)
 returns integer as $$
 -- ----------------------------------------------------------------------
--- Function: londiste.drop_table_fkey(x)
+-- Function: londiste.drop_table_fkey(2)
 --
 --      Drop one fkey, save in pending table.
 -- ----------------------------------------------------------------------
diff --git a/sql/londiste/functions/londiste.handle_triggers.sql b/sql/londiste/functions/londiste.handle_triggers.sql
deleted file mode 100644 (file)
index b244f78..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-
-create or replace function londiste.get_pending_triggers(i_table_name text)
-returns setof londiste.pending_triggers as $$
--- ----------------------------------------------------------------------
--- Function: londiste.get_pending_triggers(1)
---
---      Returns dropped triggers for one table.
---
--- Parameters:
---      i_table_name - fqname
---
--- Returns:
---      list of triggers
--- ----------------------------------------------------------------------
-declare
-    trigger    record;
-begin
-    for trigger in
-        select *
-        from londiste.pending_triggers
-        where table_name = i_table_name
-    loop
-        return next trigger;
-    end loop;
-    
-    return;
-end;
-$$ language plpgsql strict stable;
-
-
-create or replace function londiste.drop_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.drop_table_trigger(2)
---
---      Drop one trigger, saves it to pending table.
--- ----------------------------------------------------------------------
-declare
-    trig_def record;
-begin
-    select * into trig_def
-    from londiste.find_table_triggers(i_table_name)
-    where trigger_name = i_trigger_name;
-    
-    if FOUND is not true then
-        return 0;
-    end if;
-    
-    insert into londiste.pending_triggers(table_name, trigger_name, trigger_def) 
-        values (i_table_name, i_trigger_name, trig_def.trigger_def);
-    
-    execute 'drop trigger ' || i_trigger_name || ' on ' || i_table_name;
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.drop_all_table_triggers(i_table_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.drop_all_table_triggers(1)
---
---      Drop all triggers that exist.
--- ----------------------------------------------------------------------
-declare
-    trigger record;
-begin
-    for trigger in
-        select trigger_name as name
-        from londiste.find_table_triggers(i_table_name)
-    loop
-        perform londiste.drop_table_trigger(i_table_name, trigger.name);
-    end loop;
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.restore_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.restore_table_trigger(2)
---
---      Restore one trigger.
--- ----------------------------------------------------------------------
-declare
-    trig_def text;
-begin
-    select trigger_def into trig_def
-    from londiste.pending_triggers
-    where (table_name, trigger_name) = (i_table_name, i_trigger_name);
-    
-    if not found then
-        return 0;
-    end if;
-    
-    delete from londiste.pending_triggers 
-    where table_name = i_table_name and trigger_name = i_trigger_name;
-    
-    execute trig_def;
-
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.restore_all_table_triggers(i_table_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.restore_all_table_triggers(1)
---
---      Restore all dropped triggers.
--- ----------------------------------------------------------------------
-declare
-    trigger record;
-begin
-    for trigger in
-        select trigger_name as name
-        from londiste.get_pending_triggers(i_table_name)
-    loop
-        perform londiste.restore_table_trigger(i_table_name, trigger.name);
-    end loop;
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
diff --git a/sql/londiste/functions/londiste.local_add_seq.sql b/sql/londiste/functions/londiste.local_add_seq.sql
new file mode 100644 (file)
index 0000000..d26515b
--- /dev/null
@@ -0,0 +1,77 @@
+
+create or replace function londiste.local_add_seq(
+    in i_queue_name text, in i_seq_name text,
+    out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_seq(2)
+--
+--      Register sequence.
+--
+-- Parameters:
+--      i_queue_name    - cascaded queue name
+--      i_seq_name      - seq name
+--
+-- Returns:
+--      200 - OK
+--      400 - Not found
+-- ----------------------------------------------------------------------
+declare
+    fq_seq_name text;
+    lastval int8;
+    seq record;
+begin
+    fq_seq_name := londiste.make_fqname(i_seq_name);
+
+    perform 1 from pg_class
+        where oid = londiste.find_seq_oid(fq_seq_name);
+    if not found then
+        select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_note;
+        return;
+    end if;
+
+    if pgq_node.is_root_node(i_queue_name) then
+        select local, last_value into seq
+            from londiste.seq_info
+            where queue_name = i_queue_name
+                and seq_name = fq_seq_name
+            for update;
+        if found and seq.local then
+            select 201, 'Sequence already added: ' || fq_seq_name
+                into ret_code, ret_note;
+            return;
+        end if;
+        if not seq.local then
+            update londiste.seq_info set local = true
+                where queue_name = i_queue_name and seq_name = fq_seq_name;
+        else
+            insert into londiste.seq_info (queue_name, seq_name, local, last_value)
+                values (i_queue_name, fq_seq_name, true, 0);
+        end if;
+        perform * from londiste.root_check_seqs(i_queue_name);
+    else
+        select local, last_value into seq
+            from londiste.seq_info
+            where queue_name = i_queue_name
+                and seq_name = fq_seq_name
+            for update;
+        if not found then
+            select 404, 'Unknown sequence: ' || fq_seq_name
+                into ret_code, ret_note;
+            return;
+        end if;
+        if seq.local then
+            select 201, 'Sequence already added: ' || fq_seq_name
+                into ret_code, ret_note;
+            return;
+        end if;
+        update londiste.seq_info set local = true
+            where queue_name = i_queue_name and seq_name = fq_seq_name;
+        perform pgq.seq_setval(fq_seq_name, seq.last_value);
+    end if;
+
+    select 200, 'Sequence added' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql;
+
diff --git a/sql/londiste/functions/londiste.local_add_table.sql b/sql/londiste/functions/londiste.local_add_table.sql
new file mode 100644 (file)
index 0000000..8e20ba0
--- /dev/null
@@ -0,0 +1,97 @@
+create or replace function londiste.local_add_table(
+    in i_queue_name     text,
+    in i_table_name     text,
+    out ret_code        int4,
+    out ret_note        text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(2)
+--
+--      Register table on Londiste node.
+--
+-- Returns:
+--      200 - Ok
+--      400 - No such set
+-- ----------------------------------------------------------------------
+declare
+    col_types text;
+    fq_table_name text;
+    new_state text;
+
+    logtrg_name text;
+    logtrg text;
+    tbl record;
+begin
+    fq_table_name := londiste.make_fqname(i_table_name);
+    col_types := londiste.find_column_types(fq_table_name);
+    if position('k' in col_types) < 1 then
+        select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
+        return;
+    end if;
+
+    perform 1 from pgq_node.node_info where queue_name = i_queue_name;
+    if not found then
+        select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
+        return;
+    end if;
+
+    select merge_state, local into tbl
+        from londiste.table_info
+        where queue_name = i_queue_name and table_name = fq_table_name;
+    if not found then
+        -- add to set on root
+        if pgq_node.is_root_node(i_queue_name) then
+            select f.ret_code, f.ret_note into ret_code, ret_note
+                from londiste.global_add_table(i_queue_name, i_table_name) f;
+            if ret_code <> 200 then
+                return;
+            end if;
+        else
+            select 404, 'Table not available on queue: ' || fq_table_name
+                into ret_code, ret_note;
+            return;
+        end if;
+
+        -- reload info
+        select merge_state, local into tbl
+            from londiste.table_info
+            where queue_name = i_queue_name and table_name = fq_table_name;
+    end if;
+
+    if tbl.local then
+        select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
+        return;
+    end if;
+
+    if pgq_node.is_root_node(i_queue_name) then
+        new_state := 'ok';
+        perform londiste.root_notify_change(i_queue_name, 'londiste.add-table', fq_table_name);
+    else
+        new_state := NULL;
+    end if;
+
+    update londiste.table_info
+        set local = true,
+            merge_state = new_state
+        where queue_name = i_queue_name and table_name = fq_table_name;
+    if not found then
+        raise exception 'lost table: %', fq_table_name;
+    end if;
+
+    -- create trigger if it does not exists already
+    logtrg_name := i_queue_name || '_logtrigger';
+    perform 1 from pg_catalog.pg_trigger
+        where tgrelid = londiste.find_table_oid(fq_table_name)
+            and tgname = logtrg_name;
+    if not found then
+        logtrg := 'create trigger ' || quote_ident(logtrg_name)
+            || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
+            || ' for each row execute procedure pgq.sqltriga(' || quote_literal(i_queue_name) || ')';
+        execute logtrg;
+    end if;
+
+    select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.local_remove_seq.sql b/sql/londiste/functions/londiste.local_remove_seq.sql
new file mode 100644 (file)
index 0000000..691a124
--- /dev/null
@@ -0,0 +1,43 @@
+
+create or replace function londiste.local_remove_seq(
+    in i_queue_name text, in i_seq_name text,
+    out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_remove_seq(2)
+--
+--      Remove sequence.
+--
+-- Parameters:
+--      i_queue_name      - set name
+--      i_seq_name      - sequence name
+--
+-- Returns:
+--      200 - OK
+--      404 - Sequence not found
+-- ----------------------------------------------------------------------
+declare
+    fqname text;
+begin
+    fqname := londiste.make_fqname(i_seq_name);
+    if pgq_node.is_root_node(i_queue_name) then
+        select f.ret_code, f.ret_note
+            into ret_code, ret_note
+            from londiste.global_remove_seq(i_queue_name, fqname) f;
+        return;
+    end if;
+    update londiste.seq_info
+        set local = false
+        where queue_name = i_queue_name
+          and seq_name = fqname
+          and local;
+    if not found then
+        select 404, 'Sequence not found: '||fqname into ret_code, ret_note;
+        return;
+    end if;
+
+    select 200, 'Sequence removed: '||fqname into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.local_remove_table.sql b/sql/londiste/functions/londiste.local_remove_table.sql
new file mode 100644 (file)
index 0000000..6ba9993
--- /dev/null
@@ -0,0 +1,67 @@
+
+create or replace function londiste.local_remove_table(
+    in i_queue_name text, in i_table_name text,
+    out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_remove_table(2)
+--
+--      Remove table.
+--
+-- Parameters:
+--      i_queue_name      - set name
+--      i_table_name      - table name
+--
+-- Returns:
+--      200 - OK
+--      404 - Table not found
+-- ----------------------------------------------------------------------
+declare
+    fq_table_name   text;
+    logtrg_name     text;
+    tbl             record;
+begin
+    fq_table_name := londiste.make_fqname(i_table_name);
+
+    select local into tbl
+        from londiste.table_info
+        where queue_name = i_queue_name
+          and table_name = fq_table_name;
+    if not found then
+        select 400, 'Table not found: ' || fq_table_name into ret_code, ret_note;
+        return;
+    end if;
+
+    if tbl.local then
+        -- drop trigger if exists
+        logtrg_name := i_queue_name || '_logtrigger';
+        execute 'drop trigger if exists ' || quote_ident(logtrg_name)
+                || ' on ' || londiste.quote_fqname(fq_table_name);
+        -- reset data
+        update londiste.table_info
+            set local = false,
+                custom_snapshot = null,
+
+                ---- should we keep those?
+                -- skip_truncate = null,
+                -- dropped_ddl = null,
+                merge_state = null
+            where queue_name = i_queue_name
+                and table_name = fq_table_name;
+    else
+        if not pgq_node.is_root_node(i_queue_name) then
+            select 400, 'Table not registered locally: ' || fq_table_name into ret_code, ret_note;
+            return;
+        end if;
+    end if;
+
+    if pgq_node.is_root_node(i_queue_name) then
+        perform londiste.global_remove_table(i_queue_name, fq_table_name);
+        perform londiste.root_notify_change(i_queue_name, 'londiste.remove-table', fq_table_name);
+    end if;
+
+    select 200, 'Table removed: ' || fq_table_name into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.local_set_skip_truncate.sql b/sql/londiste/functions/londiste.local_set_skip_truncate.sql
new file mode 100644 (file)
index 0000000..2e8677d
--- /dev/null
@@ -0,0 +1,29 @@
+
+create or replace function londiste.local_set_skip_truncate(
+    in i_queue_name text,
+    in i_table      text,
+    in i_value      bool,
+    out ret_code    int4,
+    out ret_note    text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_set_skip_truncate(3)
+--
+--      Change skip_truncate flag for table.
+-- ----------------------------------------------------------------------
+begin
+    update londiste.table_info
+       set skip_truncate = i_value
+     where queue_name = i_queue_name
+       and table_name = i_table;
+    if found then
+        select 200, 'skip_truncate=' || i_value::text
+            into ret_code, ret_note;
+    else
+        select 404, 'table not found: ' || i_table
+            into ret_code, ret_note;
+    end if;
+    return;
+end;
+$$ language plpgsql;
+
similarity index 71%
rename from sql/londiste/functions/londiste.node_set_table_state.sql
rename to sql/londiste/functions/londiste.local_set_table_state.sql
index 28bee73c27dc44ecb1c6a681883284cca3ea92da..a7e2dc5232b7e1d3c388869a4e6b9a52d10a2918 100644 (file)
@@ -1,18 +1,18 @@
 
-create or replace function londiste.node_set_table_state(
-    i_set_name text,
+create or replace function londiste.local_set_table_state(
+    i_queue_name text,
     i_table_name text,
     i_snapshot text,
     i_merge_state text)
 returns integer as $$
 -- ----------------------------------------------------------------------
--- Function: londiste.node_set_table_state(4)
+-- Function: londiste.local_set_table_state(4)
 --
 --      Change table state.
 --
 -- Parameters:
---      i_set_name      - set name
---      i-table         - table name
+--      i_queue_name    - cascaded queue name
+--      i_table         - table name
 --      i_snapshot      - optional remote snapshot info
 --      i_merge_state   - merge state
 --
@@ -20,7 +20,7 @@ returns integer as $$
 --      nothing
 -- ----------------------------------------------------------------------
 begin
-    update londiste.node_table
+    update londiste.table_info
         set custom_snapshot = i_snapshot,
             merge_state = i_merge_state,
             -- reset skip_snapshot when table is copied over
@@ -28,8 +28,9 @@ begin
                                  then null
                                  else skip_truncate
                             end
-      where set_name = i_set_name
-        and table_name = i_table_name;
+      where queue_name = i_queue_name
+        and table_name = i_table_name
+        and local;
     if not found then
         raise exception 'no such table';
     end if;
diff --git a/sql/londiste/functions/londiste.local_set_table_struct.sql b/sql/londiste/functions/londiste.local_set_table_struct.sql
new file mode 100644 (file)
index 0000000..9a6b1cc
--- /dev/null
@@ -0,0 +1,36 @@
+
+create or replace function londiste.local_set_table_struct(
+    in i_queue_name text,
+    in i_table_name text,
+    in i_dropped_ddl text,
+    out ret_code int4,
+    out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_set_table_struct(3)
+--
+--      Store dropped table struct temporarily.
+--
+-- Parameters:
+--      i_queue_name    - cascaded queue name
+--      i_table         - table name
+--      i_dropped_ddl   - merge state
+-- ----------------------------------------------------------------------
+begin
+    update londiste.table_info
+        set dropped_ddl = i_dropped_ddl
+      where queue_name = i_queue_name
+        and table_name = i_table_name
+        and local;
+    if found then
+        select 200, 'Table struct stored'
+            into ret_code, ret_note;
+    else
+        select 404, 'no such local table: '||i_table_name
+            into ret_code, ret_note;
+
+    end if;
+    return;
+end;
+$$ language plpgsql;
+
diff --git a/sql/londiste/functions/londiste.node_add_seq.sql b/sql/londiste/functions/londiste.node_add_seq.sql
deleted file mode 100644 (file)
index 4156f0b..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-
-create or replace function londiste.node_add_seq(
-    in i_set_name text, in i_seq_name text,
-    out ret_code int4, out ret_text text)
-as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_add_seq(2)
---
---      Register sequence.
---
--- Parameters:
---      i_set_name  - set name
---      i_seq_name  - seq name
---
--- Returns:
---      200 - OK
---      400 - Not found
--- ----------------------------------------------------------------------
-declare
-    fq_seq_name text;
-begin
-    fq_seq_name := londiste.make_fqname(i_seq_name);
-
-    perform 1 from pg_class
-        where oid = londiste.find_seq_oid(fq_seq_name);
-    if not found then
-        select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_text;
-        return;
-    end if;
-
-    perform 1 from londiste.node_seq
-        where set_name = i_set_name and seq_name = fq_seq_name;
-    if found then
-        select 200, 'OK, seqence already added' into ret_code, ret_text;
-        return;
-    end if;
-
-    if pgq_set.is_root(i_set_name) then
-        insert into londiste.set_seq (set_name, seq_name)
-            values (i_set_name, fq_seq_name);
-        perform londiste.node_notify_change(i_set_name, 'add-seq', fq_seq_name);
-    end if;
-
-    insert into londiste.node_seq (set_name, seq_name)
-        values (i_set_name, fq_seq_name);
-
-    select 200, 'OK' into ret_code, ret_text;
-    return;
-end;
-$$ language plpgsql;
-
diff --git a/sql/londiste/functions/londiste.node_add_table.sql b/sql/londiste/functions/londiste.node_add_table.sql
deleted file mode 100644 (file)
index 0612516..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-create or replace function londiste.node_add_table(
-    in i_set_name       text,
-    in i_table_name     text,
-    out ret_code        int4,
-    out ret_note        text)
-as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_add_table(2)
---
---      Register table on Londiste node.
---
--- Returns:
---      200 - Ok
---      400 - No such set
--- ----------------------------------------------------------------------
-declare
-    col_types text;
-    fq_table_name text;
-    new_state text;
-begin
-    fq_table_name := londiste.make_fqname(i_table_name);
-    col_types := londiste.find_column_types(fq_table_name);
-    if position('k' in col_types) < 1 then
-        select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
-        return;
-    end if;
-
-    perform 1 from pgq_set.set_info where set_name = i_set_name;
-    if not found then
-        select 400, 'No such set: ' || i_set_name into ret_code, ret_note;
-        return;
-    end if;
-
-    perform 1 from londiste.node_table where set_name = i_set_name and table_name = fq_table_name;
-    if found then
-        select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
-        return;
-    end if;
-
-    if pgq_set.is_root(i_set_name) then
-        select * into ret_code, ret_note
-            from londiste.set_add_table(i_set_name, fq_table_name);
-        if ret_code <> 200 then
-            return;
-        end if;
-        new_state := 'ok';
-        perform londiste.root_notify_change(i_set_name, 'add-table', fq_table_name);
-    else
-        perform 1 from londiste.set_table where set_name = i_set_name and table_name = fq_table_name;
-        if not found then
-            select 400, 'Table not registered in set: ' || fq_table_name into ret_code, ret_note;
-            return;
-        end if;
-        new_state := NULL;
-    end if;
-
-    insert into londiste.node_table (set_name, table_name, merge_state)
-        values (i_set_name, fq_table_name, new_state);
-
-    for ret_code, ret_note in
-        select f.ret_code, f.ret_note
-        from londiste.node_prepare_triggers(i_set_name, fq_table_name) f
-    loop
-        if ret_code > 299 then
-            return;
-        end if;
-    end loop;
-
-    for ret_code, ret_note in
-        select f.ret_code, f.ret_note
-        from londiste.node_refresh_triggers(i_set_name, fq_table_name) f
-    loop
-        if ret_code > 299 then
-            return;
-        end if;
-    end loop;
-
-    select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
-    return;
-end;
-$$ language plpgsql strict;
-
diff --git a/sql/londiste/functions/londiste.node_disable_triggers.sql b/sql/londiste/functions/londiste.node_disable_triggers.sql
deleted file mode 100644 (file)
index f86dc86..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-
-create or replace function londiste.node_disable_triggers(
-    in i_set_name   text,
-    in i_table_name text,
-    out ret_code    int4,
-    out ret_note    text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_disable_triggers(2)
---
---      Drop all registered triggers from particular table.
--- ----------------------------------------------------------------------
-declare
-    tbl_oid oid;
-    fq_table_name text;
-    tg record;
-    is_active int4;
-begin
-    fq_table_name := londiste.make_fqname(i_table_name);
-    perform 1 from pgq_set.set_info where set_name = i_set_name;
-    if not found then
-        select 400, 'Unknown set: ' || i_set_name;
-        return next;
-        return;
-    end if;
-    tbl_oid := londiste.find_table_oid(fq_table_name);
-    for tg in
-        select tg_name, tg_type, tg_def from londiste.node_trigger
-         where set_name = i_set_name and table_name = fq_table_name
-         order by tg_name
-    loop
-        -- check if active
-        perform 1 from pg_catalog.pg_trigger
-         where tgrelid = tbl_oid
-           and tgname = tg.tg_name;
-        if found then
-            execute 'drop trigger ' || quote_ident(tg.tg_name)
-                || ' on ' || londiste.quote_fqname(fq_table_name);
-            select 200, 'Dropped trigger ' || tg.tg_name
-                || ' from table ' || fq_table_name
-                into ret_code, ret_note;
-                return next;
-        end if;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.node_disable_triggers(
-    in i_set_name   text,
-    out ret_code    int4,
-    out ret_note    text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_disable_triggers(1)
---
---      Drop all registered triggers from set tables.
--- ----------------------------------------------------------------------
-declare
-    t record;
-begin
-    for t in
-        select table_name from londiste.node_table
-         where set_name = i_set_name
-         order by nr
-    loop
-        for ret_code, ret_note in
-            select f.ret_code, f.ret_note
-                from londiste.node_disable_triggers(i_set_name, t.table_name) f
-        loop
-            return next;
-        end loop;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.node_get_seq_list.sql b/sql/londiste/functions/londiste.node_get_seq_list.sql
deleted file mode 100644 (file)
index 4a72041..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-
-create or replace function londiste.provider_get_seq_list(i_set_name text)
-returns setof text as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_get_seq_list(x)
---
---      Returns registered seqs on this Londiste node.
--- ----------------------------------------------------------------------
-declare
-    rec record;
-begin
-    for rec in
-        select seq_name from londiste.node_seq
-            where set_name = i_set_name
-            order by nr
-    loop
-        return next rec.seq_name;
-    end loop;
-    return;
-end;
-$$ language plpgsql strict;
-
diff --git a/sql/londiste/functions/londiste.node_get_table_list.sql b/sql/londiste/functions/londiste.node_get_table_list.sql
deleted file mode 100644 (file)
index 500d94b..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-
-create or replace function londiste.node_get_table_list(
-    in i_set_name text,
-    out table_name text,
-    out merge_state text,
-    out custom_snapshot text,
-    out skip_truncate bool)
-returns setof record as $$ 
--- ----------------------------------------------------------------------
--- Function: londiste.node_get_table_list(1)
---
---      Return info about registered tables.
---
--- Parameters:
---      i_set_name - set name
--- ----------------------------------------------------------------------
-begin 
-    for table_name, merge_state, custom_snapshot, skip_truncate in 
-        select t.table_name, t.merge_state, t.custom_snapshot, t.skip_truncate
-            from londiste.node_table t
-            where t.set_name= i_set_name
-            order by t.nr
-    loop
-        return next;
-    end loop; 
-    return;
-end; 
-$$ language plpgsql strict stable;
-
diff --git a/sql/londiste/functions/londiste.node_prepare_triggers.sql b/sql/londiste/functions/londiste.node_prepare_triggers.sql
deleted file mode 100644 (file)
index 6a82f1d..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-
-create or replace function londiste.node_prepare_triggers(
-    in i_set_name   text,
-    in i_table_name text,
-    out ret_code    int4,
-    out ret_note    text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_prepare_triggers(2)
---
---      Regsiter Londiste trigger for table.
--- ----------------------------------------------------------------------
-declare
-    t_name   text;
-    logtrg   text;
-    denytrg  text;
-    logtrg_name text;
-    denytrg_name text;
-    qname    text;
-    fq_table_name text;
-begin
-    fq_table_name := londiste.make_fqname(i_table_name);
-    select queue_name into qname from pgq_set.set_info where set_name = i_set_name;
-    if not found then
-        select 400, 'Set not found: ' || i_set_name into ret_code, ret_note;
-        return next;
-        return;
-    end if;
-    if qname is not null then
-        logtrg_name := i_set_name || '_logtrigger';
-        logtrg := 'create trigger ' || quote_ident(logtrg_name)
-            || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
-            || ' for each row execute procedure pgq.sqltriga(' || quote_literal(qname) || ')';
-        insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
-        values (i_set_name, fq_table_name, logtrg_name, 'root', logtrg);
-        select 200, logtrg into ret_code, ret_note;
-        return next;
-    end if;
-
-    denytrg_name := i_set_name || '_denytrigger';
-    denytrg := 'create trigger ' || quote_ident(denytrg_name)
-        || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
-        || ' for each row execute procedure pgq.denytriga(' || quote_literal(i_set_name) || ')';
-    insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
-    values (i_set_name, fq_table_name, denytrg_name, 'non-root', denytrg);
-    select 200, denytrg into ret_code, ret_note;
-    return next;
-
-    return;
-end;
-$$ language plpgsql;
-
diff --git a/sql/londiste/functions/londiste.node_refresh_triggers.sql b/sql/londiste/functions/londiste.node_refresh_triggers.sql
deleted file mode 100644 (file)
index 68d3f98..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-
-create or replace function londiste.node_refresh_triggers(
-    in i_set_name   text,
-    in i_table_name text,
-    out ret_code    int4,
-    out ret_note    text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_refresh_triggers(2)
---
---      Sync actual trigger state with registered triggers.
--- ----------------------------------------------------------------------
-declare
-    tbl_oid oid;
-    fq_table_name text;
-    tg record;
-    is_root bool;
-    is_active int4;
-begin
-    fq_table_name := londiste.make_fqname(i_table_name);
-    perform 1 from pgq_set.set_info where set_name = i_set_name;
-    if not found then
-        select 400, 'Unknown set: ' || i_set_name;
-        return next;
-        return;
-    end if;
-    is_root := pgq_set.is_root(i_set_name);
-    tbl_oid := londiste.find_table_oid(fq_table_name);
-    for tg in
-        select tg_name, tg_type, tg_def from londiste.node_trigger
-         where set_name = i_set_name and table_name = fq_table_name
-         order by tg_name
-    loop
-        if tg.tg_type not in ('root', 'non-root') then
-            select 400, 'trigger ' || tg.tg_name
-                || ' on table ' || fq_table_name
-                || ' had unsupported type: ' || tg.tg_type
-                into ret_code, ret_note;
-            return next;
-        else
-            -- check if active
-            select count(1) into is_active
-              from pg_catalog.pg_trigger
-             where tgrelid = tbl_oid
-               and tgname = tg.tg_name;
-
-            -- create or drop if needed
-            if (tg.tg_type = 'root') = is_root then
-                -- trigger must be active
-                if is_active = 0 then
-                    execute tg.tg_def;
-                    select 200, 'Created trigger ' || tg.tg_name
-                        || ' on table ' || fq_table_name
-                        into ret_code, ret_note;
-                    return next;
-                end if;
-            else
-                -- trigger must be dropped
-                if is_active = 1 then
-                    execute 'drop trigger ' || quote_ident(tg.tg_name)
-                        || ' on ' || londiste.quote_fqname(fq_table_name);
-                    select 200, 'Dropped trigger ' || tg.tg_name
-                        || ' from table ' || fq_table_name
-                        into ret_code, ret_note;
-                    return next;
-                end if;
-            end if;
-        end if;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.node_refresh_triggers(
-    in i_set_name   text,
-    out ret_code    int4,
-    out ret_note    text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_refresh_triggers(2)
---
---      Sync actual trigger state with registered triggers for all tables.
--- ----------------------------------------------------------------------
-declare
-    t record;
-begin
-    for t in
-        select table_name from londiste.node_table
-         where set_name = i_set_name
-         order by nr
-    loop
-        for ret_code, ret_note in
-            select f.ret_code, f.ret_note
-                from londiste.node_refresh_triggers(i_set_name, t.table_name) f
-        loop
-            return next;
-        end loop;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.node_remove_seq.sql b/sql/londiste/functions/londiste.node_remove_seq.sql
deleted file mode 100644 (file)
index ae14254..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-
-create or replace function londiste.provider_remove_seq(
-    in i_set_name text, in i_seq_name text,
-    out ret_code int4, out ret_note text)
-as $$
-begin
-    delete from londiste.node_seq
-        where set_name = i_set_name
-          and seq_name = i_seq_name;
-    if not found then
-        select 400, 'Not found: '||i_seq_name into ret_code, ret_note;
-        return;
-    end if;
-
-    -- perform londiste.provider_notify_change(i_queue_name);
-    select 200, 'OK' into ret_code, ret_note;
-    return;
-end;
-$$ language plpgsql strict;
-
diff --git a/sql/londiste/functions/londiste.node_remove_table.sql b/sql/londiste/functions/londiste.node_remove_table.sql
deleted file mode 100644 (file)
index 9e4ff63..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-
-create or replace function londiste.node_remove_table(
-    in i_set_name text, in i_table_name text,
-    out ret_code int4, out ret_note text)
-as $$
-declare
-    fq_table_name text;
-begin
-    fq_table_name := londiste.make_fqname(i_table_name);
-
-    for ret_code, ret_note in
-        select f.ret_code, f.ret_note from londiste.node_disable_triggers(i_set_name, fq_table_name) f
-    loop
-        if ret_code > 299 then
-            return;
-        end if;
-    end loop;
-    delete from londiste.node_trigger
-        where set_name = i_set_name
-          and table_name = fq_table_name;
-    delete from londiste.node_table
-        where set_name = i_set_name
-          and table_name = fq_table_name;
-    if not found then
-        select 400, 'Not found: ' || fq_table_name into ret_code, ret_note;
-        return;
-    end if;
-
-    if pgq_set.is_root(i_set_name) then
-        perform londiste.set_remove_table(i_set_name, fq_table_name);
-        perform londiste.root_notify_change(i_set_name, 'remove-table', fq_table_name);
-    end if;
-
-    select 200, 'Table removed: ' || fq_table_name into ret_code, ret_note;
-    return;
-end;
-$$ language plpgsql strict;
-
diff --git a/sql/londiste/functions/londiste.node_set_skip_truncate.sql b/sql/londiste/functions/londiste.node_set_skip_truncate.sql
deleted file mode 100644 (file)
index 20aae7e..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-
-create or replace function londiste.node_set_skip_truncate(
-    i_set_name  text,
-    i_table     text,
-    i_value     bool)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_set_skip_truncate(x)
---
---      Change skip_truncate flag for table.
--- ----------------------------------------------------------------------
-begin
-    update londiste.node_table
-       set skip_truncate = i_value
-     where set_name = i_set_name
-       and table_name = i_table;
-    if not found then
-        raise exception 'table not found';
-    end if;
-
-    return 1;
-end;
-$$ language plpgsql;
-
diff --git a/sql/londiste/functions/londiste.root_check_seqs.sql b/sql/londiste/functions/londiste.root_check_seqs.sql
new file mode 100644 (file)
index 0000000..81be6aa
--- /dev/null
@@ -0,0 +1,80 @@
+
+create or replace function londiste.root_check_seqs(
+    in i_queue_name text, in i_buffer int8,
+    out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.root_check_seqs(1)
+--
+--      Check sequences, and publish values if needed.
+--
+-- Parameters:
+--      i_queue_name    - set name
+--      i_buffer        - safety room
+--
+-- Returns:
+--      200 - OK
+--      402 - Not a root node
+--      404 - Queue not found
+-- ----------------------------------------------------------------------
+declare
+    n record;
+    seq record;
+    real_value int8;
+    pub_value int8;
+    real_buffer int8;
+begin
+    if i_buffer is null or i_buffer < 10 then
+        real_buffer := 10000;
+    else
+        real_buffer := i_buffer;
+    end if;
+
+    select node_type, node_name into n
+        from pgq_node.node_info
+        where queue_name = i_queue_name;
+    if not found then
+        select 404, 'Queue not found: ' || i_queue_name into ret_code, ret_note;
+        return;
+    end if;
+    if n.node_type <> 'root' then
+        select 402, 'Not a root node' into ret_code, ret_note;
+        return;
+    end if;
+
+    for seq in
+        select seq_name, last_value,
+               londiste.quote_fqname(seq_name) as fqname
+            from londiste.seq_info
+            where queue_name = i_queue_name
+                and local
+            order by nr
+    loop
+        execute 'select last_value from ' || seq.fqname into real_value;
+        if real_value + real_buffer >= seq.last_value then
+            pub_value := real_value + real_buffer * 3;
+            perform pgq.insert_event(i_queue_name, 'londiste.update-seq',
+                        pub_value::text, seq.seq_name, null, null, null);
+            update londiste.seq_info set last_value = pub_value
+                where queue_name = i_queue_name
+                    and seq_name = seq.seq_name;
+        end if;
+    end loop;
+
+    select 200, 'Sequences updated' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql;
+
+create or replace function londiste.root_check_seqs(
+    in i_queue_name text,
+    out ret_code int4, out ret_note text)
+as $$
+begin
+    select f.ret_code, f.ret_note
+        into ret_code, ret_note
+        from londiste.root_check_seqs(i_queue_name, 10000) f;
+    return;
+end;
+$$ language plpgsql;
+
index 03183c1641a61b98e191cef8dc27ac3f471564bb..b711280e85122ab8e990edae89718e23520d4012 100644 (file)
@@ -1,5 +1,5 @@
 
-create or replace function londiste.root_notify_change(i_set_name text, i_ev_type text, i_ev_data text)
+create or replace function londiste.root_notify_change(i_queue_name text, i_ev_type text, i_ev_data text)
 returns integer as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.root_notify_change(3)
@@ -10,17 +10,11 @@ declare
     que     text;
     ntype   text;
 begin
-    select s.queue_name, s.node_type into que, ntype
-        from pgq_set.set_info s
-        where s.set_name = i_set_name;
-    if not found then
-        raise exception 'Unknown set: %', i_set_name;
-    end if;
-    if ntype <> 'root' then
+
+    if not coalesce(pgq_node.is_root_node(i_queue_name), false) then
         raise exception 'only root node can send events';
     end if;
-
-    perform pgq.insert_event(que, i_ev_type, i_ev_data);
+    perform pgq.insert_event(i_queue_name, i_ev_type, i_ev_data);
 
     return 1;
 end;
diff --git a/sql/londiste/functions/londiste.set_get_table_list.sql b/sql/londiste/functions/londiste.set_get_table_list.sql
deleted file mode 100644 (file)
index 6082706..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-
-create or replace function londiste.set_get_table_list(
-    in i_set_name       text,
-    out table_name      text,
-    out is_local        bool)
-returns setof record as $$
--- ----------------------------------------------------------------------
--- Function: londiste.set_get_table_list(1)
---
---      Show tables registered for set.
---
---      This means its available from root, events for it appear
---      in queue and nodes can attach to it.
---
--- Called by:
---      Admin tools.
--- ----------------------------------------------------------------------
-begin
-    for table_name, is_local in
-        select t.table_name, n.table_name is not null
-          from londiste.set_table t left join londiste.node_table n
-               on (t.set_name = n.set_name and t.table_name = n.table_name)
-         where t.set_name = i_set_name
-    loop
-        return next;
-    end loop;
-    return;
-end;
-$$ language plpgsql strict security definer;
-
diff --git a/sql/londiste/sql/londiste_denytrigger.sql b/sql/londiste/sql/londiste_denytrigger.sql
deleted file mode 100644 (file)
index dad81ff..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-
-create table denytest ( val integer);
-insert into denytest values (1);
-create trigger xdeny after insert or update or delete
-on denytest for each row execute procedure londiste.deny_trigger();
-
-insert into denytest values (2);
-update denytest set val = 2;
-delete from denytest;
-
-select londiste.disable_deny_trigger(true);
-update denytest set val = 2;
-select londiste.disable_deny_trigger(true);
-update denytest set val = 2;
-select londiste.disable_deny_trigger(false);
-update denytest set val = 2;
-select londiste.disable_deny_trigger(false);
-update denytest set val = 2;
-
diff --git a/sql/londiste/sql/londiste_execute.sql b/sql/londiste/sql/londiste_execute.sql
new file mode 100644 (file)
index 0000000..85afa18
--- /dev/null
@@ -0,0 +1,20 @@
+
+set log_error_verbosity = 'terse';
+
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+select * from londiste.execute_finish('branch_set', 'DDL-XXX.sql');
+
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+
+
+
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+
index 7e9887907f7dff585409bd168f174ebf44a9ca22..78616664a011378147ebf30315989a3cc55956f3 100644 (file)
@@ -18,13 +18,13 @@ create table ref_3 (
     val text
 );
 
-select * from londiste.set_add_table('branch_set', 'public.ref_1');
-select * from londiste.set_add_table('branch_set', 'public.ref_2');
-select * from londiste.set_add_table('branch_set', 'public.ref_3');
+select * from londiste.global_add_table('branch_set', 'public.ref_1');
+select * from londiste.global_add_table('branch_set', 'public.ref_2');
+select * from londiste.global_add_table('branch_set', 'public.ref_3');
 
-select * from londiste.node_add_table('branch_set', 'public.ref_1');
-select * from londiste.node_add_table('branch_set', 'public.ref_2');
-select * from londiste.node_add_table('branch_set', 'public.ref_3');
+select * from londiste.local_add_table('branch_set', 'public.ref_1');
+select * from londiste.local_add_table('branch_set', 'public.ref_2');
+select * from londiste.local_add_table('branch_set', 'public.ref_3');
 
 select * from londiste.find_table_fkeys('public.ref_1');
 select * from londiste.find_table_fkeys('public.ref_2');
@@ -32,7 +32,7 @@ select * from londiste.find_table_fkeys('public.ref_3');
 
 select * from londiste.get_table_pending_fkeys('public.ref_2');
 
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
 
 -- drop fkeys
 
@@ -52,15 +52,15 @@ select * from londiste.find_table_fkeys('public.ref_3');
 
 -- look state
 select * from londiste.get_table_pending_fkeys('public.ref_2');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
 
 -- toggle sync
-select * from londiste.node_set_table_state('branch_set', 'public.ref_1', null, 'ok');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
-select * from londiste.node_set_table_state('branch_set', 'public.ref_2', null, 'ok');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
-select * from londiste.node_set_table_state('branch_set', 'public.ref_3', null, 'ok');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.local_set_table_state('branch_set', 'public.ref_1', null, 'ok');
+select * from londiste.get_valid_pending_fkeys('branch_set');
+select * from londiste.local_set_table_state('branch_set', 'public.ref_2', null, 'ok');
+select * from londiste.get_valid_pending_fkeys('branch_set');
+select * from londiste.local_set_table_state('branch_set', 'public.ref_3', null, 'ok');
+select * from londiste.get_valid_pending_fkeys('branch_set');
 
 -- restore
 select * from londiste.restore_table_fkey('public.ref_2', 'ref_2_ref_fkey');
@@ -68,7 +68,7 @@ select * from londiste.restore_table_fkey('public.ref_3', 'ref_3_ref2_fkey');
 
 -- look state
 select * from londiste.get_table_pending_fkeys('public.ref_2');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
 select * from londiste.find_table_fkeys('public.ref_1');
 select * from londiste.find_table_fkeys('public.ref_2');
 select * from londiste.find_table_fkeys('public.ref_3');
index efb4f23e798282889dccd175251625bb4fb9c720..c2fcecfe8d8562d17c825e6a33aaecb7b819a152 100644 (file)
@@ -1,10 +1,13 @@
 \set ECHO off
 set log_error_verbosity = 'terse';
+
 \i ../txid/txid.sql
 \i ../pgq/pgq.sql
-\i ../pgq_set/pgq_set.sql
---\i londiste.sql
+\i ../pgq_node/pgq_node.sql
+
+-- install directly from source files
 \i structure/tables.sql
 \i structure/functions.sql
+
 \set ECHO all
 
diff --git a/sql/londiste/sql/londiste_merge.sql b/sql/londiste/sql/londiste_merge.sql
new file mode 100644 (file)
index 0000000..c69e43d
--- /dev/null
@@ -0,0 +1,70 @@
+
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+
+--
+-- tables
+--
+create table tblmerge (
+    id int4 primary key,
+    data text
+);
+
+select * from pgq_node.register_location('combined_set', 'croot', 'dbname=db', false);
+select * from pgq_node.create_node('combined_set', 'root', 'croot', 'londiste_croot', null, null, null);
+
+select * from pgq_node.register_location('part1_set', 'p1root', 'dbname=db', false);
+select * from pgq_node.register_location('part1_set', 'p1merge', 'dbname=db2', false);
+select * from pgq_node.create_node('part1_set', 'leaf', 'p1merge', 'londiste_p1merge', 'p1root', 100, 'combined_set');
+
+select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', false);
+select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false);
+select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set');
+
+
+
+select * from londiste.local_add_table('combined_set', 'tblmerge');
+
+select * from londiste.global_add_table('part1_set', 'tblmerge');
+select * from londiste.local_add_table('part1_set', 'tblmerge');
+
+select * from londiste.global_add_table('part2_set', 'tblmerge');
+select * from londiste.local_add_table('part2_set', 'tblmerge');
+
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+select * from londiste.get_table_list('combined_set');
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'in-copy');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'create index;');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+
+
+
+
+
+
+
+
+
+
index b86223f44a734169f0d8afdc572e0bfba6dfadbd..a929667b45768dc2b0b36ba37bbbc6edfc051602 100644 (file)
@@ -16,16 +16,16 @@ create table testdata_nopk (
 
 select current_database();
 
-select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
-select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+select * from pgq_node.register_location('aset', 'rnode', 'dbname=db', false);
+select * from pgq_node.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
 
-select * from londiste.node_add_table('aset', 'public.testdata_nopk');
-select * from londiste.node_add_table('aset', 'public.testdata');
+select * from londiste.local_add_table('aset', 'public.testdata_nopk');
+select * from londiste.local_add_table('aset', 'public.testdata');
 insert into testdata (data) values ('test-data');
-select * from londiste.node_get_table_list('aset');
-select * from londiste.node_remove_table('aset', 'public.testdata');
-select * from londiste.node_remove_table('aset', 'public.testdata');
-select * from londiste.node_get_table_list('aset');
+select * from londiste.get_table_list('aset');
+select * from londiste.local_remove_table('aset', 'public.testdata');
+select * from londiste.local_remove_table('aset', 'public.testdata');
+select * from londiste.get_table_list('aset');
 
 select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
 
diff --git a/sql/londiste/sql/londiste_seqs.sql b/sql/londiste/sql/londiste_seqs.sql
new file mode 100644 (file)
index 0000000..e0038a5
--- /dev/null
@@ -0,0 +1,39 @@
+
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+
+--
+-- sequences
+--
+
+create sequence masterseq;
+create sequence slaveseq;
+
+
+select * from pgq_node.register_location('seqroot', 'rnode', 'dbname=db', false);
+select * from pgq_node.create_node('seqroot', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+select * from londiste.root_check_seqs('seqroot');
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+
+select * from londiste.get_seq_list('seqroot');
+
+select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
+
+-- subscriber
+select * from pgq_node.register_location('seqbranch', 'subnode', 'dbname=db', false);
+select * from pgq_node.register_location('seqbranch', 'rootnode', 'dbname=db', false);
+select * from pgq_node.create_node('seqbranch', 'branch', 'subnode', 'londiste_branch', 'rootnode', 1, null::text);
+
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+select * from londiste.global_update_seq('seqbranch', 'masterseq', 5);
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+select * from londiste.root_check_seqs('seqbranch');
+select * from londiste.get_seq_list('seqbranch');
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+
+
index 5486c1b3871ce2d39cad06641a90055aab0bf7c4..3cef618a0e71053134daa684875707556b58de21 100644 (file)
@@ -12,15 +12,17 @@ create table slavedata (
 
 select current_database();
 
-select * from pgq_set.add_member('branch_set', 'snode', 'dbname=db', false);
-select * from pgq_set.add_member('branch_set', 'pnode', 'dbname=db2', false);
-select * from pgq_set.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
+select * from pgq_node.register_location('branch_set', 'snode', 'dbname=db', false);
+select * from pgq_node.register_location('branch_set', 'pnode', 'dbname=db2', false);
+select * from pgq_node.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
 
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
-select * from londiste.set_add_table('branch_set', 'public.slavedata');
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
-select * from londiste.node_get_table_list('branch_set');
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
-select * from londiste.node_get_table_list('branch_set');
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
+select * from londiste.global_add_table('branch_set', 'public.slavedata');
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
+select * from londiste.global_add_table('branch_set', 'public.tmp');
+select * from londiste.get_table_list('branch_set');
+select * from londiste.global_remove_table('branch_set', 'public.tmp');
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
+select * from londiste.get_table_list('branch_set');
 
index 0bb20e20c1404626628db4f20633f4121f163098..62ac186e774fab636dda7e4e2344b3be6d7eab43 100644 (file)
@@ -1,37 +1,39 @@
 -- Section: Londiste functions
 
--- Group: Main operations
-\i functions/londiste.node_add_seq.sql
-\i functions/londiste.node_add_table.sql
-\i functions/londiste.node_get_seq_list.sql
-\i functions/londiste.node_get_table_list.sql
-\i functions/londiste.node_remove_seq.sql
-\i functions/londiste.node_remove_table.sql
-\i functions/londiste.node_set_table_state.sql
-
--- Group: Set object registrations
-\i functions/londiste.set_add_table.sql
-\i functions/londiste.set_remove_table.sql
-\i functions/londiste.set_get_table_list.sql
+-- Group: Information
+\i functions/londiste.get_seq_list.sql
+\i functions/londiste.get_table_list.sql
+
+-- Group: Local object registration (setup tool)
+\i functions/londiste.local_add_seq.sql
+\i functions/londiste.local_add_table.sql
+\i functions/londiste.local_remove_seq.sql
+\i functions/londiste.local_remove_table.sql
+
+-- Group: Global object registrations (internal)
+\i functions/londiste.global_add_table.sql
+\i functions/londiste.global_remove_table.sql
+\i functions/londiste.global_update_seq.sql
+\i functions/londiste.global_remove_seq.sql
 
 -- Group: FKey handling
 \i functions/londiste.handle_fkeys.sql
 
--- Group: Trigger handling
-\i functions/londiste.handle_triggers.sql
+-- Group: Execute handling
+\i functions/londiste.execute_start.sql
+\i functions/londiste.execute_finish.sql
 
 -- Group: Internal functions
-\i functions/londiste.node_set_skip_truncate.sql
-\i functions/londiste.node_prepare_triggers.sql
-\i functions/londiste.node_refresh_triggers.sql
-\i functions/londiste.node_disable_triggers.sql
+\i functions/londiste.root_check_seqs.sql
 \i functions/londiste.root_notify_change.sql
+\i functions/londiste.local_set_table_state.sql
+\i functions/londiste.local_set_skip_truncate.sql
+\i functions/londiste.local_set_table_struct.sql
 
 -- Group: Utility functions
 \i functions/londiste.find_column_types.sql
 \i functions/londiste.find_table_fkeys.sql
 \i functions/londiste.find_table_oid.sql
-\i functions/londiste.find_table_triggers.sql
 \i functions/londiste.quote_fqname.sql
 \i functions/londiste.make_fqname.sql
 
index a68321980ef11729153bf90d51a7ff269bd84e3b..50df9a7f7ae7ce7006292550ee26879d555b7a39 100644 (file)
@@ -1,7 +1,7 @@
 
 grant usage on schema londiste to public;
-grant select on londiste.node_table to public;
-grant select on londiste.node_seq to public;
+grant select on londiste.table_info to public;
+grant select on londiste.seq_info to public;
 grant select on londiste.pending_fkeys to public;
-grant select on londiste.pending_triggers to public;
+grant select on londiste.applied_execute to public;
 
diff --git a/sql/londiste/structure/tables.old.sql b/sql/londiste/structure/tables.old.sql
new file mode 100644 (file)
index 0000000..10b5354
--- /dev/null
@@ -0,0 +1,147 @@
+-- ----------------------------------------------------------------------
+-- Section: Londiste internals
+--
+--      Londiste storage: tables/seqs/fkeys/triggers/events.
+--
+-- Londiste event types:
+--      I/U/D                   - ev_data: table update in partial-sql format, ev_extra1: fq table name
+--      I:/U:/D: <pk>           - ev_data: table update in urlencoded format, ev_extra1: fq table name
+--      londiste.add-table      - ev_data: table name that was added on root
+--      londiste.remove-table   - ev_data: table name that was removed on root
+--      londiste.update-seq     - ev_data: new seq value from root, ev_extra1: seq name
+--      lodniste.remove-seq     - ev_data: seq name that was removed on root
+-- ----------------------------------------------------------------------
+create schema londiste;
+
+set default_with_oids = 'off';
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_table
+--
+--      Tables available on root, meaning that events for only
+--      tables specified here can appear in queue.
+--
+-- Columns:
+--      nr          - just to have stable order
+--      set_name    - which set the table belongs to
+--      table_name  - fq table name
+-- ----------------------------------------------------------------------
+create table londiste.set_table (
+    nr                  serial not null,
+    set_name            text not null,
+    table_name          text not null,
+    foreign key (set_name) references pgq_node.node_info (queue_name),
+    primary key (set_name, table_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_seq
+--
+--      Sequences available on root, meaning that events for only
+--      sequences specified here can appear in queue.
+--
+-- Columns:
+--      nr          - just to have stable order
+--      set_name    - which set the table belongs to
+--      seq_name    - fq seq name
+--      local       - there is actual seq on local node
+--      last_value  - last published value from root
+-- ----------------------------------------------------------------------
+create table londiste.seq_state (
+    nr                  serial not null,
+    set_name            text not null,
+    seq_name            text not null,
+    local               boolean not null default false,
+    last_value          int8 not null,
+    foreign key (set_name) references pgq_node.node_info (queue_name),
+    primary key (set_name, seq_name)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_table
+--
+--      Info about attached tables.
+--
+-- Columns:
+--      nr              - Dummy number for visual ordering
+--      set_name        - Set name
+--      table_name      - fully-qualified table name
+--      merge_state     - State for tables
+--      trigger_type    - trigger type
+--      trigger_name    - londiste trigger name
+--      copy_snapshot   - remote snapshot for COPY command
+--      custom_tg_args  - user-specified 
+--      skip_truncate   - if 'in-copy' should not do TRUNCATE
+--
+-- Tables merge states:
+--      master          - master: all in sync
+--      ok              - slave: all in sync
+--      in-copy         -
+--      catching-up     -
+--      wanna-sync:%    -
+--      do-sync:%       -
+--      unsynced        -
+--
+-- Trigger type:
+--      notrigger       - no trigger applied
+--      pgq.logtriga    - Partial SQL trigger with fixed column list
+--      pgq.sqltriga    - Partial SQL trigger with autodetection
+--      pgq.logutriga   - urlenc trigger with autodetection
+--      pgq.denytrigger - deny trigger
+-- ----------------------------------------------------------------------
+create table londiste.node_table (
+    nr                  serial not null,
+    set_name            text not null,
+    table_name          text not null,
+    merge_state         text,
+    custom_snapshot     text,
+    skip_truncate       bool,
+
+    foreign key (set_name, table_name) references londiste.set_table,
+    primary key (set_name, table_name)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.applied_execute
+--
+--      Info about EXECUTE commands that are ran.
+--
+-- Columns:
+--      set_name        - which set it belongs to
+--      execute_file    - filename / unique id
+--      execute_time    - the time execute happened
+--      execute_sql     - contains SQL for EXECUTE event (informative)
+-- ----------------------------------------------------------------------
+create table londiste.applied_execute (
+    set_name            text not null,
+    execute_file        text not null,
+    execute_time        timestamptz not null default now(),
+    execute_sql         text not null,
+    primary key (set_name, execute_file)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_fkeys
+--
+--      Details on dropped fkeys.  Global, not specific to any set.
+--
+-- Columns:
+--      from_table      - fully-qualified table name
+--      to_table        - fully-qualified table name
+--      fkey_name       - name of constraint
+--      fkey_def        - full fkey definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_fkeys (
+    from_table          text not null,
+    to_table            text not null,
+    fkey_name           text not null,
+    fkey_def            text not null,
+    
+    primary key (from_table, fkey_name)
+);
+
+
index b511b58b7069cf0266073a3d04b24fadf4b28244..79dd90dfce80215341c5aa04e86a43b4ae96e52d 100644 (file)
@@ -4,13 +4,58 @@
 --      Londiste storage: tables/seqs/fkeys/triggers/events.
 --
 -- Londiste event types:
---      I/U/D       - ev_data: table update in partial-sql format, ev_extra1: fq table name
---      I:/U:/D:    - ev_data: table update in urlencoded format, ev_extra1: fq table name
---      add-seq     - ev_data: seq name that was added on root
---      del-seq     - ev_data: seq name that was removed on root
---      add-tbl     - ev_data: table name that was added on root
---      del-tbl     - ev_data: table name that was removed on root
---      seq-values  - ev_data: urlencoded fqname:value pairs
+--      I/U/D                   - partial SQL event from pgq.sqltriga()
+--      I:/U:/D: <pk>           - urlencoded event from pgq.logutriga()
+--      EXECUTE                 - SQL script execution
+--      TRUNCATE                - table truncation
+--      londiste.add-table      - global table addition
+--      londiste.remove-table   - global table removal
+--      londiste.update-seq     - sequence update
+--      londiste.remove-seq     - global sequence removal
+--
+-- pgq.sqltriga() event:
+--      ev_type     - I/U/D which means insert/update/delete
+--      ev_data     - partial SQL
+--      ev_extra1   - table name
+--
+--      Insert: ev_type = "I", ev_data = "(col1, col2) values (2, 'foo')", ev_extra1 = "public.tblname"
+--
+--      Update: ev_type = "U", ev_data = "col2 = null where col1 = 2", ev_extra1 = "public.tblname"
+--
+--      Delete: ev_type = "D", ev_data = "col1 = 2", ev_extra1 = "public.tblname"
+--
+-- pgq.logutriga() event:
+--      ev_type     - I:/U:/D: plus comma separated list of pkey columns
+--      ev_data     - urlencoded row columns
+--      ev_extra1   - table name
+--
+--      Insert: ev_type = "I:col1", ev_data = ""
+--
+-- Truncate trigger event:
+--      ev_type     - TRUNCATE
+--      ev_extra1   - table name
+--
+-- Execute SQL event:
+--      ev_type     - EXECUTE
+--      ev_data     - SQL script
+--      ev_extra1   - Script ID
+--
+-- Global table addition:
+--      ev_type     - londiste.add-table
+--      ev_data     - table name
+--
+-- Global table removal:
+--      ev_type     - londiste.remove-table
+--      ev_data     - table name
+--
+-- Global sequence update:
+--      ev_type     - londiste.update-seq
+--      ev_data     - seq value
+--      ev_extra1   - seq name
+--5)
+-- Global sequence removal:
+--      ev_type     - londiste.remove-seq
+--      ev_data     - seq name
 -- ----------------------------------------------------------------------
 create schema londiste;
 
@@ -18,131 +63,85 @@ set default_with_oids = 'off';
 
 
 -- ----------------------------------------------------------------------
--- Table: londiste.set_table
+-- Table: londiste.table_info
 --
---      Tables available on root, meaning that events for only
---      tables specified here can appear in queue.
+--      Info about registered tables.
 --
 -- Columns:
---      nr          - just to have stable order
---      set_name    - which set the table belongs to
---      table_name  - fq table name
--- ----------------------------------------------------------------------
-create table londiste.set_table (
-    nr                  serial not null,
-    set_name            text not null,
-    table_name          text not null,
-    foreign key (set_name) references pgq_set.set_info,
-    primary key (set_name, table_name)
-);
-
--- ----------------------------------------------------------------------
--- Table: londiste.set_seq
---
---      Sequences available on root, meaning that events for only
---      sequences specified here can appear in queue.
---
--- Columns:
---      nr          - just to have stable order
---      set_name    - which set the table belongs to
---      seq_name    - fq seq name
--- ----------------------------------------------------------------------
-create table londiste.set_seq (
-    nr                  serial not null,
-    set_name            text not null,
-    seq_name            text not null,
-    foreign key (set_name) references pgq_set.set_info,
-    primary key (set_name, seq_name)
-);
-
-
--- ----------------------------------------------------------------------
--- Table: londiste.node_table
---
---      Info about attached tables.
---
--- Columns:
---      nr              - Dummy number for visual ordering
---      set_name        - Set name
+--      nr              - number for visual ordering
+--      queue_name      - Cascaded queue name
 --      table_name      - fully-qualified table name
+--      local           - Is used locally
 --      merge_state     - State for tables
---      trigger_type    - trigger type
---      trigger_name    - londiste trigger name
---      copy_snapshot   - remote snapshot for COPY command
---      custom_tg_args  - user-specified 
+--      custom_snapshot - remote snapshot for COPY command
 --      skip_truncate   - if 'in-copy' should not do TRUNCATE
+--      dropped_ddl     - temp place to store ddl
 --
 -- Tables merge states:
---      master          - master: all in sync
---      ok              - slave: all in sync
---      in-copy         -
---      catching-up     -
---      wanna-sync:%    -
---      do-sync:%       -
---      unsynced        -
---
--- Trigger type:
---      notrigger       - no trigger applied
---      pgq.logtriga    - Partial SQL trigger with fixed column list
---      pgq.sqltriga    - Partial SQL trigger with autodetection
---      pgq.logutriga   - urlenc trigger with autodetection
---      pgq.denytrigger - deny trigger
--- ----------------------------------------------------------------------
-create table londiste.node_table (
+--      NULL            - copy has not yet happened
+--      in-copy         - ongoing bulk copy
+--      catching-up     - copy process applies events that happened during copy
+--      wanna-sync:%    - copy process caught up, wants to hand table over to replay
+--      do-sync:%       - replay process is ready to accept the table
+--      ok              - in sync, replay applies events
+-- ----------------------------------------------------------------------
+create table londiste.table_info (
     nr                  serial not null,
-    set_name            text not null,
+    queue_name          text not null,
     table_name          text not null,
+    local               boolean not null default false,
     merge_state         text,
     custom_snapshot     text,
     skip_truncate       bool,
+    dropped_ddl         text,
 
-    foreign key (set_name, table_name) references londiste.set_table,
-    primary key (set_name, table_name)
+    primary key (queue_name, table_name),
+    foreign key (queue_name) references pgq_node.node_info (queue_name),
+    check (dropped_ddl is null or merge_state = 'in-copy')
 );
 
 
 -- ----------------------------------------------------------------------
--- Table: londiste.node_trigger
+-- Table: londiste.seq_info
 --
---      Node-specific triggers.  When node type changes,
---      Londiste will make sure unnecessary triggers are
---      dropped and new triggers created.
+--      Sequences available on this queue.
 --
 -- Columns:
---      set_name        - set it belongs to
---      table_name      - table name
---      tg_type         - any / root / non-root / unknown?
---      tg_name         - name for the trigger
---      tg_def          - full statement for trigger creation
+--      nr          - number for visual ordering
+--      queue_name  - cascaded queue name
+--      seq_name    - fully-qualified seq name
+--      local       - there is actual seq on local node
+--      last_value  - last published value from root
 -- ----------------------------------------------------------------------
-create table londiste.node_trigger (
-    set_name            text not null,
-    table_name          text not null,
-    tg_name             text not null,
-    tg_type             text not null,
-    tg_def              text not null,
-    foreign key (set_name, table_name) references londiste.node_table,
-    primary key (set_name, table_name, tg_name),
-    check (tg_type in ('root', 'non-root'))
-    -- check (tg_type in ('always', 'origin', 'replica', 'disabled'))
+create table londiste.seq_info (
+    nr                  serial not null,
+    queue_name          text not null,
+    seq_name            text not null,
+    local               boolean not null default false,
+    last_value          int8 not null,
+
+    primary key (queue_name, seq_name),
+    foreign key (queue_name) references pgq_node.node_info (queue_name)
 );
 
+
 -- ----------------------------------------------------------------------
--- Table: londiste.node_seq
+-- Table: londiste.applied_execute
 --
---      Info about attached sequences.
+--      Info about EXECUTE commands that are ran.
 --
 -- Columns:
---      nr              - dummy number for ordering
---      set_name        - which set it belongs to
---      seq_name        - fully-qualified seq name
--- ----------------------------------------------------------------------
-create table londiste.node_seq (
-    nr                  serial not null,
-    set_name            text not null,
-    seq_name            text not null,
-    foreign key (set_name, seq_name) references londiste.set_seq,
-    primary key (set_name, seq_name)
+--      queue_name      - cascaded queue name
+--      execute_file    - filename / unique id
+--      execute_time    - the time execute happened
+--      execute_sql     - contains SQL for EXECUTE event (informative)
+-- ----------------------------------------------------------------------
+create table londiste.applied_execute (
+    queue_name          text not null,
+    execute_file        text not null,
+    execute_time        timestamptz not null default now(),
+    execute_sql         text not null,
+    primary key (queue_name, execute_file)
 );
 
 
@@ -167,21 +166,3 @@ create table londiste.pending_fkeys (
 );
 
 
--- ----------------------------------------------------------------------
--- Table: londiste.pending_triggers
---
---      Details on dropped triggers.  Global, not specific to any set.
---
--- Columns:
---      table_name      - fully-qualified table name
---      trigger_name    - trigger name
---      trigger_def     - full trigger definition
--- ----------------------------------------------------------------------
-create table londiste.pending_triggers (
-    table_name          text not null,
-    trigger_name        text not null,
-    trigger_def         text not null,
-    
-    primary key (table_name, trigger_name)
-);
-