DATA_built = londiste.sql londiste.upgrade.sql
-DOCS = README.londiste
SQLS = structure/tables.sql structure/grants.sql structure/functions.sql
FUNCS = $(shell sed -n -e '/^\\/{s/\\i //;p}' $(SQLS))
SRCS = $(SQLS) $(FUNCS)
-REGRESS = londiste_install londiste_provider londiste_subscriber londiste_fkeys
+REGRESS = londiste_install londiste_provider londiste_subscriber \
+ londiste_fkeys londiste_execute londiste_seqs londiste_merge
# londiste_denytrigger
REGRESS_OPTS = --load-language=plpgsql
+++ /dev/null
-
-londiste database backend
---------------------------
-
-Provider side:
---------------
-
-londiste.provider_table
-londiste.provider_seq
-
-
-Subscriber side
----------------
-
-table londiste.completed
-table londiste.subscriber_table
-table londiste.subscriber_seq
-
-
-Open issues
-------------
-
-- notify behaviour
-- should notify-s given to db for processing?
-- link init functions
-- switchover
-- are set_last_tick()/get_last_tick() functions needed anymore?
-- typecheck for add_table()?
-
+++ /dev/null
-create table denytest ( val integer);
-insert into denytest values (1);
-create trigger xdeny after insert or update or delete
-on denytest for each row execute procedure londiste.deny_trigger();
-insert into denytest values (2);
-ERROR: ('Changes no allowed on this table',)
-update denytest set val = 2;
-ERROR: ('Changes no allowed on this table',)
-delete from denytest;
-ERROR: ('Changes no allowed on this table',)
-select londiste.disable_deny_trigger(true);
- disable_deny_trigger
-----------------------
- t
-(1 row)
-
-update denytest set val = 2;
-select londiste.disable_deny_trigger(true);
- disable_deny_trigger
-----------------------
- t
-(1 row)
-
-update denytest set val = 2;
-select londiste.disable_deny_trigger(false);
- disable_deny_trigger
-----------------------
- f
-(1 row)
-
-update denytest set val = 2;
-ERROR: ('Changes no allowed on this table',)
-select londiste.disable_deny_trigger(false);
- disable_deny_trigger
-----------------------
- f
-(1 row)
-
-update denytest set val = 2;
-ERROR: ('Changes no allowed on this table',)
--- /dev/null
+set log_error_verbosity = 'terse';
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+ ret_code | ret_note
+----------+----------------------
+ 200 | Executing: DDL-A.sql
+(1 row)
+
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+ ret_code | ret_note
+----------+------------------------------------
+ 301 | EXECUTE(DDL-A.sql) already applied
+(1 row)
+
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+ ret_code | ret_note
+----------+-----------------------------
+ 200 | Execute finished: DDL-A.sql
+(1 row)
+
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+ ret_code | ret_note
+----------+-----------------------------
+ 200 | Execute finished: DDL-A.sql
+(1 row)
+
+select * from londiste.execute_finish('branch_set', 'DDL-XXX.sql');
+ ret_code | ret_note
+----------+-------------------------------------------
+ 404 | execute_file called without execute_start
+(1 row)
+
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+ ret_code | ret_note
+----------+-----------------------------------
+ 401 | Node is not root node: branch_set
+(1 row)
+
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+ ret_code | ret_note
+----------+-----------------------------------
+ 401 | Node is not root node: branch_set
+(1 row)
+
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+ ret_code | ret_note
+----------+-------------------------
+ 200 | Executing: DDL-root.sql
+(1 row)
+
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+ ret_code | ret_note
+----------+---------------------------------------
+ 301 | EXECUTE(DDL-root.sql) already applied
+(1 row)
+
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+ ret_code | ret_note
+----------+--------------------------------
+ 200 | Execute finished: DDL-root.sql
+(1 row)
+
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+ ret_code | ret_note
+----------+--------------------------------
+ 200 | Execute finished: DDL-root.sql
+(1 row)
+
val text
);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "ref_3_pkey" for table "ref_3"
-select * from londiste.set_add_table('branch_set', 'public.ref_1');
+select * from londiste.global_add_table('branch_set', 'public.ref_1');
ret_code | ret_note
----------+----------
200 | OK
(1 row)
-select * from londiste.set_add_table('branch_set', 'public.ref_2');
+select * from londiste.global_add_table('branch_set', 'public.ref_2');
ret_code | ret_note
----------+----------
200 | OK
(1 row)
-select * from londiste.set_add_table('branch_set', 'public.ref_3');
+select * from londiste.global_add_table('branch_set', 'public.ref_3');
ret_code | ret_note
----------+----------
200 | OK
(1 row)
-select * from londiste.node_add_table('branch_set', 'public.ref_1');
+select * from londiste.local_add_table('branch_set', 'public.ref_1');
ret_code | ret_note
----------+---------------------------
200 | Table added: public.ref_1
(1 row)
-select * from londiste.node_add_table('branch_set', 'public.ref_2');
+select * from londiste.local_add_table('branch_set', 'public.ref_2');
ret_code | ret_note
----------+---------------------------
200 | Table added: public.ref_2
(1 row)
-select * from londiste.node_add_table('branch_set', 'public.ref_3');
+select * from londiste.local_add_table('branch_set', 'public.ref_3');
ret_code | ret_note
----------+---------------------------
200 | Table added: public.ref_3
------------+----------+-----------+----------
(0 rows)
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
from_table | to_table | fkey_name | fkey_def
------------+----------+-----------+----------
(0 rows)
public.ref_3 | public.ref_2 | ref_3_ref2_fkey | alter table only public.ref_3 add constraint ref_3_ref2_fkey FOREIGN KEY (ref2) REFERENCES ref_2(id)
(2 rows)
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
from_table | to_table | fkey_name | fkey_def
------------+----------+-----------+----------
(0 rows)
-- toggle sync
-select * from londiste.node_set_table_state('branch_set', 'public.ref_1', null, 'ok');
- node_set_table_state
-----------------------
- 1
+select * from londiste.local_set_table_state('branch_set', 'public.ref_1', null, 'ok');
+ local_set_table_state
+-----------------------
+ 1
(1 row)
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
from_table | to_table | fkey_name | fkey_def
------------+----------+-----------+----------
(0 rows)
-select * from londiste.node_set_table_state('branch_set', 'public.ref_2', null, 'ok');
- node_set_table_state
-----------------------
- 1
+select * from londiste.local_set_table_state('branch_set', 'public.ref_2', null, 'ok');
+ local_set_table_state
+-----------------------
+ 1
(1 row)
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
from_table | to_table | fkey_name | fkey_def
--------------+--------------+----------------+----------------------------------------------------------------------------------------------------
public.ref_2 | public.ref_1 | ref_2_ref_fkey | alter table only public.ref_2 add constraint ref_2_ref_fkey FOREIGN KEY (ref) REFERENCES ref_1(id)
(1 row)
-select * from londiste.node_set_table_state('branch_set', 'public.ref_3', null, 'ok');
- node_set_table_state
-----------------------
- 1
+select * from londiste.local_set_table_state('branch_set', 'public.ref_3', null, 'ok');
+ local_set_table_state
+-----------------------
+ 1
(1 row)
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
from_table | to_table | fkey_name | fkey_def
--------------+--------------+-----------------+------------------------------------------------------------------------------------------------------
public.ref_2 | public.ref_1 | ref_2_ref_fkey | alter table only public.ref_2 add constraint ref_2_ref_fkey FOREIGN KEY (ref) REFERENCES ref_1(id)
------------+----------+-----------+----------
(0 rows)
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
from_table | to_table | fkey_name | fkey_def
------------+----------+-----------+----------
(0 rows)
--- /dev/null
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+--
+-- tables
+--
+create table tblmerge (
+ id int4 primary key,
+ data text
+);
+select * from pgq_node.register_location('combined_set', 'croot', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('combined_set', 'root', 'croot', 'londiste_croot', null, null, null);
+ ret_code | ret_note
+----------+-------------------------------------------------------------
+ 200 | Node "croot" initialized for queue "croot" with type "root"
+(1 row)
+
+select * from pgq_node.register_location('part1_set', 'p1root', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('part1_set', 'p1merge', 'dbname=db2', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('part1_set', 'leaf', 'p1merge', 'londiste_p1merge', 'p1root', 100, 'combined_set');
+ ret_code | ret_note
+----------+-----------------------------------------------------------------
+ 200 | Node "p1merge" initialized for queue "p1merge" with type "leaf"
+(1 row)
+
+select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set');
+ ret_code | ret_note
+----------+-----------------------------------------------------------------
+ 200 | Node "p2merge" initialized for queue "p2merge" with type "leaf"
+(1 row)
+
+select * from londiste.local_add_table('combined_set', 'tblmerge');
+ ret_code | ret_note
+----------+------------------------------
+ 200 | Table added: public.tblmerge
+(1 row)
+
+select * from londiste.global_add_table('part1_set', 'tblmerge');
+ ret_code | ret_note
+----------+----------
+ 200 | OK
+(1 row)
+
+select * from londiste.local_add_table('part1_set', 'tblmerge');
+ ret_code | ret_note
+----------+------------------------------
+ 200 | Table added: public.tblmerge
+(1 row)
+
+select * from londiste.global_add_table('part2_set', 'tblmerge');
+ ret_code | ret_note
+----------+----------
+ 200 | OK
+(1 row)
+
+select * from londiste.local_add_table('part2_set', 'tblmerge');
+ ret_code | ret_note
+----------+------------------------------
+ 200 | Table added: public.tblmerge
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | | | | |
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | | | | |
+(1 row)
+
+select * from londiste.get_table_list('combined_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | ok | | | |
+(1 row)
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+ local_set_table_state
+-----------------------
+ 1
+(1 row)
+
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'in-copy');
+ local_set_table_state
+-----------------------
+ 1
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | in-copy | | | | lead
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | in-copy | | | | wait-copy
+(1 row)
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'create index;');
+ ret_code | ret_note
+----------+---------------------
+ 200 | Table struct stored
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+---------------+-----------
+ public.tblmerge | t | in-copy | | | create index; | lead
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-------------
+ public.tblmerge | t | in-copy | | | | wait-replay
+(1 row)
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+ local_set_table_state
+-----------------------
+ 1
+(1 row)
+
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+ local_set_table_state
+-----------------------
+ 1
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+---------------+-----------
+ public.tblmerge | t | in-copy | | | create index; |
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-------------
+ public.tblmerge | t | catching-up | | | | wait-replay
+(1 row)
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Table struct stored
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | in-copy | | | |
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-------------
+ public.tblmerge | t | catching-up | | | | wait-replay
+(1 row)
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
+ local_set_table_state
+-----------------------
+ 1
+(1 row)
+
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+ local_set_table_state
+-----------------------
+ 1
+(1 row)
+
+select * from londiste.get_table_list('part1_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | catching-up | | | |
+(1 row)
+
+select * from londiste.get_table_list('part2_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.tblmerge | t | catching-up | | | |
+(1 row)
+
regression
(1 row)
-select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+select * from pgq_node.register_location('aset', 'rnode', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
(1 row)
-select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+select * from pgq_node.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+ ret_code | ret_note
+----------+-------------------------------------------------------------
+ 200 | Node "rnode" initialized for queue "rnode" with type "root"
(1 row)
-select * from londiste.node_add_table('aset', 'public.testdata_nopk');
+select * from londiste.local_add_table('aset', 'public.testdata_nopk');
ret_code | ret_note
----------+----------------------------------------------------
400 | Primary key missing on table: public.testdata_nopk
(1 row)
-select * from londiste.node_add_table('aset', 'public.testdata');
+select * from londiste.local_add_table('aset', 'public.testdata');
ret_code | ret_note
----------+------------------------------
200 | Table added: public.testdata
(1 row)
insert into testdata (data) values ('test-data');
-select * from londiste.node_get_table_list('aset');
- table_name | merge_state | custom_snapshot | skip_truncate
------------------+-------------+-----------------+---------------
- public.testdata | ok | |
+select * from londiste.get_table_list('aset');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+-----------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.testdata | t | ok | | | |
(1 row)
-select * from londiste.node_remove_table('aset', 'public.testdata');
+select * from londiste.local_remove_table('aset', 'public.testdata');
ret_code | ret_note
----------+--------------------------------
200 | Table removed: public.testdata
(1 row)
-select * from londiste.node_remove_table('aset', 'public.testdata');
- ret_code | ret_note
-----------+----------------------------
- 400 | Not found: public.testdata
+select * from londiste.local_remove_table('aset', 'public.testdata');
+ ret_code | ret_note
+----------+----------------------------------
+ 400 | Table not found: public.testdata
(1 row)
-select * from londiste.node_get_table_list('aset');
- table_name | merge_state | custom_snapshot | skip_truncate
-------------+-------------+-----------------+---------------
+select * from londiste.get_table_list('aset');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+------------+-------+-------------+-----------------+---------------+-------------+-----------
(0 rows)
select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
- ev_id | ev_type | ev_data | ev_extra1
--------+--------------+------------------------------------+-----------------
- 1 | add-table | public.testdata |
- 2 | I | (id,data) values ('1','test-data') | public.testdata
- 3 | remove-table | public.testdata |
+ ev_id | ev_type | ev_data | ev_extra1
+-------+-----------------------+------------------------------------+-----------------
+ 1 | londiste.add-table | public.testdata |
+ 2 | I | (id,data) values ('1','test-data') | public.testdata
+ 3 | londiste.remove-table | public.testdata |
(3 rows)
--- /dev/null
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+--
+-- sequences
+--
+create sequence masterseq;
+create sequence slaveseq;
+select * from pgq_node.register_location('seqroot', 'rnode', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('seqroot', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+ ret_code | ret_note
+----------+-------------------------------------------------------------
+ 200 | Node "rnode" initialized for queue "rnode" with type "root"
+(1 row)
+
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+ ret_code | ret_note
+----------+----------------
+ 200 | Sequence added
+(1 row)
+
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+ ret_code | ret_note
+----------+------------------------------------------
+ 201 | Sequence already added: public.masterseq
+(1 row)
+
+select * from londiste.root_check_seqs('seqroot');
+ ret_code | ret_note
+----------+-------------------
+ 200 | Sequences updated
+(1 row)
+
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+ ret_code | ret_note
+----------+------------------------------------
+ 200 | Sequence removed: public.masterseq
+(1 row)
+
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+ ret_code | ret_note
+----------+--------------------------------------
+ 400 | Sequence not found: public.masterseq
+(1 row)
+
+select * from londiste.get_seq_list('seqroot');
+ seq_name | last_value | local
+----------+------------+-------
+(0 rows)
+
+select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
+ ev_id | ev_type | ev_data | ev_extra1
+-------+-----------------------+------------------------------------+------------------
+ 1 | londiste.add-table | public.testdata |
+ 2 | I | (id,data) values ('1','test-data') | public.testdata
+ 3 | londiste.remove-table | public.testdata |
+ 4 | EXECUTE | drop all | DDL-root.sql
+ 5 | EXECUTE | drop all | DDL-root.sql
+ 1 | londiste.update-seq | 30001 | public.masterseq
+ 2 | londiste.remove-seq | public.masterseq |
+(7 rows)
+
+-- subscriber
+select * from pgq_node.register_location('seqbranch', 'subnode', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('seqbranch', 'rootnode', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('seqbranch', 'branch', 'subnode', 'londiste_branch', 'rootnode', 1, null::text);
+ ret_code | ret_note
+----------+-------------------------------------------------------------------
+ 200 | Node "subnode" initialized for queue "subnode" with type "branch"
+(1 row)
+
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+ ret_code | ret_note
+----------+------------------------------------
+ 404 | Unknown sequence: public.masterseq
+(1 row)
+
+select * from londiste.global_update_seq('seqbranch', 'masterseq', 5);
+ ret_code | ret_note
+----------+------------------
+ 200 | Sequence updated
+(1 row)
+
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+ ret_code | ret_note
+----------+----------------
+ 200 | Sequence added
+(1 row)
+
+select * from londiste.root_check_seqs('seqbranch');
+ ret_code | ret_note
+----------+-----------------
+ 402 | Not a root node
+(1 row)
+
+select * from londiste.get_seq_list('seqbranch');
+ seq_name | last_value | local
+------------------+------------+-------
+ public.masterseq | 5 | t
+(1 row)
+
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+ ret_code | ret_note
+----------+------------------------------------
+ 200 | Sequence removed: public.masterseq
+(1 row)
+
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+ ret_code | ret_note
+----------+--------------------------------------
+ 404 | Sequence not found: public.masterseq
+(1 row)
+
regression
(1 row)
-select * from pgq_set.add_member('branch_set', 'snode', 'dbname=db', false);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+select * from pgq_node.register_location('branch_set', 'snode', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
(1 row)
-select * from pgq_set.add_member('branch_set', 'pnode', 'dbname=db2', false);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+select * from pgq_node.register_location('branch_set', 'pnode', 'dbname=db2', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
(1 row)
-select * from pgq_set.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
- ret_code | ret_note
-----------+----------
- 200 | Ok
+select * from pgq_node.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
+ ret_code | ret_note
+----------+---------------------------------------------------------------
+ 200 | Node "snode" initialized for queue "snode" with type "branch"
(1 row)
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
- ret_code | ret_note
-----------+-----------------------------------------------
- 400 | Table not registered in set: public.slavedata
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
+ ret_code | ret_note
+----------+------------------------------------------------
+ 404 | Table not available on queue: public.slavedata
(1 row)
-select * from londiste.set_add_table('branch_set', 'public.slavedata');
+select * from londiste.global_add_table('branch_set', 'public.slavedata');
ret_code | ret_note
----------+----------
200 | OK
(1 row)
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
ret_code | ret_note
----------+-------------------------------
200 | Table added: public.slavedata
(1 row)
-select * from londiste.node_get_table_list('branch_set');
- table_name | merge_state | custom_snapshot | skip_truncate
-------------------+-------------+-----------------+---------------
- public.slavedata | | |
+select * from londiste.global_add_table('branch_set', 'public.tmp');
+ ret_code | ret_note
+----------+----------
+ 200 | OK
+(1 row)
+
+select * from londiste.get_table_list('branch_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+------------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.slavedata | t | | | | |
+ public.tmp | f | | | | |
+(2 rows)
+
+select * from londiste.global_remove_table('branch_set', 'public.tmp');
+ ret_code | ret_note
+----------+----------
+ 200 | OK
(1 row)
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
ret_code | ret_note
----------+---------------------------------
200 | Table removed: public.slavedata
(1 row)
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
- ret_code | ret_note
-----------+-----------------------------
- 400 | Not found: public.slavedata
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
+ ret_code | ret_note
+----------+------------------------------------------------
+ 400 | Table not registered locally: public.slavedata
(1 row)
-select * from londiste.node_get_table_list('branch_set');
- table_name | merge_state | custom_snapshot | skip_truncate
-------------+-------------+-----------------+---------------
-(0 rows)
+select * from londiste.get_table_list('branch_set');
+ table_name | local | merge_state | custom_snapshot | skip_truncate | dropped_ddl | copy_role
+------------------+-------+-------------+-----------------+---------------+-------------+-----------
+ public.slavedata | f | | | | |
+(1 row)
--- /dev/null
+create or replace function londiste.execute_finish(
+ in i_queue_name text,
+ in i_file_name text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.execute_finish(2)
+--
+-- Finish execution of DDL. Should be called at the
+-- end of the transaction that does the SQL execution.
+--
+-- Called-by:
+-- Londiste setup tool on root, replay on branches/leafs.
+--
+-- Returns:
+-- 200 - Proceed.
+-- 404 - Current entry not found, execute_start() was not called?
+-- ----------------------------------------------------------------------
+declare
+ is_root boolean;
+ sql text;
+begin
+ is_root := pgq_node.is_root_node(i_queue_name);
+
+ select execute_sql into sql
+ from londiste.applied_execute
+ where queue_name = i_queue_name
+ and execute_file = i_file_name;
+ if not found then
+ select 404, 'execute_file called without execute_start'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ if is_root then
+ perform pgq.insert_event(i_queue_name, 'EXECUTE', sql, i_file_name, null, null, null);
+ end if;
+
+ -- try educated guess of previous state
+ if is_root then
+ SET LOCAL session_replication_role = 'origin';
+ else
+ SET LOCAL session_replication_role = 'replica';
+ end if;
+
+ select 200, 'Execute finished: ' || i_file_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+create or replace function londiste.execute_start(
+ in i_queue_name text,
+ in i_file_name text,
+ in i_sql text,
+ in i_expect_root boolean,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.execute_start(4)
+--
+-- Start execution of DDL. Should be called at the
+-- start of the transaction that does the SQL execution.
+--
+-- Called-by:
+-- Londiste setup tool on root, replay on branches/leafs.
+--
+-- Parameters:
+-- i_queue_name - cascaded queue name
+-- i_file_name - Unique ID for SQL
+-- i_sql - Actual script (informative, not used here)
+-- i_expect_root - Is this on root? Setup tool sets this to avoid
+-- execution on branches.
+--
+-- Returns:
+-- 200 - Proceed.
+-- 301 - Already applied
+-- 401 - Not root.
+-- 404 - No such queue
+-- ----------------------------------------------------------------------
+declare
+ is_root boolean;
+begin
+ is_root := pgq_node.is_root_node(i_queue_name);
+ if i_expect_root then
+ if not is_root then
+ select 401, 'Node is not root node: ' || i_queue_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ end if;
+
+ perform 1 from londiste.applied_execute
+ where queue_name = i_queue_name
+ and execute_file = i_file_name;
+ if found then
+ select 301, 'EXECUTE(' || i_file_name || ') already applied'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- this also lock against potetial parallel execute
+ insert into londiste.applied_execute (queue_name, execute_file, execute_sql)
+ values (i_queue_name, i_file_name, i_sql);
+
+ SET LOCAL session_replication_role = 'local';
+
+ select 200, 'Executing: ' || i_file_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
+++ /dev/null
-
-create or replace function londiste.find_table_triggers(i_table_name text)
-returns setof londiste.pending_triggers as $$
--- ----------------------------------------------------------------------
--- Function: londiste.find_table_triggers(1)
---
--- Returns all existing triggers on table.
---
--- Parameters:
--- i_table_name - table name
---
--- Returns:
--- table_name - fq table name
--- trigger_name - name
--- trigger_def - partial def as returned by pg_get_triggerdef()
--- ----------------------------------------------------------------------
-declare
- tg record;
-begin
- for tg in
- select n.nspname || '.' || c.relname as table_name, t.tgname::text as name, pg_get_triggerdef(t.oid) as def
- from pg_trigger t, pg_class c, pg_namespace n
- where n.oid = c.relnamespace and c.oid = t.tgrelid
- and t.tgrelid = londiste.find_table_oid(i_table_name)
- and not t.tgisconstraint
- loop
- return next tg;
- end loop;
-
- return;
-end;
-$$ language plpgsql strict stable;
-
--- /dev/null
+
+create or replace function londiste.get_seq_list(
+ in i_queue_name text,
+ out seq_name text,
+ out last_value int8,
+ out local boolean)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_seq_list(1)
+--
+-- Returns registered seqs on this Londiste node.
+--
+-- Result fiels:
+-- seq_name - fully qualified name of sequence
+-- last_value - last globally published value
+-- local - is locally registered
+-- ----------------------------------------------------------------------
+declare
+ rec record;
+begin
+ for seq_name, last_value, local in
+ select s.seq_name, s.last_value, s.local from londiste.seq_info s
+ where s.queue_name = i_queue_name
+ order by s.nr, s.seq_name
+ loop
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.get_table_list(
+ in i_queue_name text,
+ out table_name text,
+ out local boolean,
+ out merge_state text,
+ out custom_snapshot text,
+ out skip_truncate bool,
+ out dropped_ddl text,
+ out copy_role text)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_table_list(1)
+--
+-- Return info about registered tables.
+--
+-- Parameters:
+-- i_queue_name - cascaded queue name
+--
+-- Returns:
+-- table_name - fully-quelified table name
+-- local - does events needs to be applied to local table
+-- merge_state - show phase of initial copy
+-- custom_snapshot - remote snapshot of COPY transaction
+-- skip_truncate - don't truncate table on copy
+-- dropped_ddl - partition combining: temp place to put DDL
+-- copy_role - partition combining: how to handle copy
+--
+-- copy_role = lead:
+-- on copy start, drop indexes and store in dropped_ddl
+-- on copy finish wait, until copy_role turns to NULL
+-- if dropped_ddl not NULL, restore them
+-- tag as catching-up
+-- copy_role = wait-copy:
+-- on copy start wait, until role changes (to wait-replay)
+-- copy_role = wait-replay:
+-- on copy finish, tag as 'catching-up'
+-- wait until copy_role is NULL, then proceed
+--
+declare
+ q_part1 text;
+ q_target text;
+ n_parts int4;
+ n_done int4;
+begin
+ -- get first part queue, if exists
+ select n.combined_queue into q_target
+ from pgq_node.node_info n
+ where n.queue_name = i_queue_name;
+ if q_target is not null then
+ select n.queue_name into q_part1
+ from pgq_node.node_info n
+ where n.combined_queue = q_target
+ order by n.queue_name
+ limit 1;
+ select count(*) into n_parts
+ from pgq_node.node_info n
+ where n.combined_queue = q_target;
+ end if;
+
+ for table_name, local, merge_state, custom_snapshot, skip_truncate, dropped_ddl in
+ select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.skip_truncate, t.dropped_ddl
+ from londiste.table_info t
+ where t.queue_name = i_queue_name
+ order by t.nr, t.table_name
+ loop
+ -- if the table is in middle of copy from multiple partitions,
+ -- the copy processes need coordination
+ copy_role := null;
+ if q_part1 is not null then
+ select count(*) into n_done
+ from londiste.table_info t, pgq_node.node_info n
+ where n.combined_queue = q_target
+ and t.queue_name = n.queue_name
+ and t.table_name = table_name
+ and (t.merge_state is not null
+ and t.merge_state <> 'in-copy');
+ if i_queue_name = q_part1 then
+ -- lead
+ if merge_state = 'in-copy' then
+ -- show copy_role only if need to wait for others
+ if n_done < n_parts - 1 then
+ copy_role := 'lead';
+ end if;
+ end if;
+ else
+ -- follow
+ if merge_state = 'in-copy' then
+ -- has lead already dropped ddl?
+ perform 1 from londiste.table_info t
+ where t.queue_name = q_part1
+ and t.table_name = table_name
+ and t.dropped_ddl is not null;
+ if found then
+ copy_role := 'wait-replay';
+ else
+ copy_role := 'wait-copy';
+ end if;
+ elsif merge_state = 'catching-up' then
+ -- show copy_role only if need to wait for lead
+ if n_done < n_parts then
+ copy_role := 'wait-replay';
+ end if;
+ end if;
+ end if;
+ end if;
+
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict stable;
+
-create or replace function londiste.set_add_table(
- in i_set_name text,
+create or replace function londiste.global_add_table(
+ in i_queue_name text,
in i_table_name text,
out ret_code int4,
out ret_note text)
as $$
-- ----------------------------------------------------------------------
--- Function: londiste.node_add_table(x)
+-- Function: londiste.global_add_table(2)
--
-- Register table on Londiste set.
--
-- in queue and nodes can attach to it.
--
-- Called by:
--- on root - londiste.node_add_table()
+-- on root - londiste.local_add_table()
-- elsewhere - londiste consumer when receives new table event
--
-- Returns:
-- 400 - No such set
-- ----------------------------------------------------------------------
declare
- col_types text;
fq_table_name text;
begin
fq_table_name := londiste.make_fqname(i_table_name);
- perform 1 from pgq_set.set_info where set_name = i_set_name;
+ perform 1 from pgq_node.node_info where queue_name = i_queue_name;
if not found then
- select 400, 'No such set: ' || i_set_name into ret_code, ret_note;
+ select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
return;
end if;
- perform 1 from londiste.set_table where set_name = i_set_name and table_name = fq_table_name;
+ perform 1 from londiste.table_info where queue_name = i_queue_name and table_name = fq_table_name;
if found then
select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_note;
return;
end if;
- insert into londiste.set_table (set_name, table_name)
- values (i_set_name, fq_table_name);
+ insert into londiste.table_info (queue_name, table_name)
+ values (i_queue_name, fq_table_name);
select 200, 'OK' into ret_code, ret_note;
return;
end;
--- /dev/null
+
+create or replace function londiste.global_remove_seq(
+ in i_queue_name text, in i_seq_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.global_remove_seq(2)
+--
+-- Removes sequence registration in set.
+--
+-- Called by:
+-- - On root by londiste.local_remove_seq()
+-- - Elsewhere by consumer receiving seq remove event
+--
+-- Returns:
+-- 200 - OK
+-- 400 - not found
+-- ----------------------------------------------------------------------
+declare
+ fq_name text;
+begin
+ fq_name := londiste.make_fqname(i_seq_name);
+ delete from londiste.seq_info
+ where queue_name = i_queue_name
+ and seq_name = fq_name;
+ if not found then
+ select 400, 'Sequence not found: '||fq_name into ret_code, ret_note;
+ return;
+ end if;
+ if pgq_node.is_root_node(i_queue_name) then
+ perform londiste.root_notify_change(i_queue_name, 'londiste.remove-seq', fq_name);
+ end if;
+ select 200, 'Sequence removed: '||fq_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
-create or replace function londiste.set_remove_table(
- in i_set_name text, in i_table_name text,
+create or replace function londiste.global_remove_table(
+ in i_queue_name text, in i_table_name text,
out ret_code int4, out ret_note text)
as $$
-- ----------------------------------------------------------------------
--- Function: londiste.set_remove_table(2)
+-- Function: londiste.global_remove_table(2)
--
-- Removes tables registration in set.
--
-- Means that nodes cannot attach to this table anymore.
--
-- Called by:
--- - On root by londiste.node_remove_table()
+-- - On root by londiste.local_remove_table()
-- - Elsewhere by consumer receiving table remove event
--
-- Returns:
fq_table_name text;
begin
fq_table_name := londiste.make_fqname(i_table_name);
- if not pgq_set.is_root(i_set_name) then
- perform londiste.node_remove_table(i_set_name, fq_table_name);
+ if not pgq_node.is_root_node(i_queue_name) then
+ perform londiste.local_remove_table(i_queue_name, fq_table_name);
end if;
- delete from londiste.set_table
- where set_name = i_set_name
+ delete from londiste.table_info
+ where queue_name = i_queue_name
and table_name = fq_table_name;
if not found then
select 400, 'Not found: '||fq_table_name into ret_code, ret_note;
--- /dev/null
+
+create or replace function londiste.global_update_seq(
+ in i_queue_name text, in i_seq_name text, in i_value int8,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.global_update_seq(3)
+--
+-- Update seq.
+--
+-- Parameters:
+-- i_queue_name - set name
+-- i_seq_name - seq name
+-- i_value - new published value
+--
+-- Returns:
+-- 200 - OK
+-- ----------------------------------------------------------------------
+declare
+ n record;
+ fqname text;
+ seq record;
+begin
+ select node_type, node_name into n
+ from pgq_node.node_info
+ where queue_name = i_queue_name;
+ if not found then
+ select 404, 'Set not found: ' || i_queue_name into ret_code, ret_note;
+ return;
+ end if;
+ if n.node_type = 'root' then
+ select 402, 'Must not run on root node' into ret_code, ret_note;
+ return;
+ end if;
+
+ fqname := londiste.make_fqname(i_seq_name);
+ select last_value, local from londiste.seq_info
+ into seq
+ where queue_name = i_queue_name and seq_name = fqname
+ for update;
+ if not found then
+ insert into londiste.seq_info
+ (queue_name, seq_name, last_value)
+ values (i_queue_name, fqname, i_value);
+ else
+ update londiste.seq_info
+ set last_value = i_value
+ where queue_name = i_queue_name and seq_name = fqname;
+ if seq.local then
+ perform pgq.seq_setval(fqname, i_value);
+ end if;
+ end if;
+ select 200, 'Sequence updated' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql;
+
$$ language plpgsql strict stable;
-create or replace function londiste.node_get_valid_pending_fkeys(i_set_name text)
+create or replace function londiste.get_valid_pending_fkeys(i_queue_name text)
returns setof londiste.pending_fkeys as $$
-- ----------------------------------------------------------------------
--- Function: londiste.node_get_valid_pending_fkeys(1)
+-- Function: londiste.get_valid_pending_fkeys(1)
--
-- Returns dropped fkeys where both sides are in sync now.
--
-- Parameters:
--- i_set_name - sets name
+-- i_queue_name - cascaded queue name
--
-- Returns:
-- desc
for fkeys in
select pf.*
from londiste.pending_fkeys pf
- left join londiste.node_table st_from on (st_from.table_name = pf.from_table)
- left join londiste.node_table st_to on (st_to.table_name = pf.to_table)
+ left join londiste.table_info st_from on (st_from.table_name = pf.from_table)
+ left join londiste.table_info st_to on (st_to.table_name = pf.to_table)
where (st_from.table_name is null or (st_from.merge_state = 'ok' and st_from.custom_snapshot is null))
and (st_to.table_name is null or (st_to.merge_state = 'ok' and st_to.custom_snapshot is null))
- and (coalesce(st_from.set_name = i_set_name, false)
- or coalesce(st_to.set_name = i_set_name, false))
+ and (coalesce(st_from.queue_name = i_queue_name, false)
+ or coalesce(st_to.queue_name = i_queue_name, false))
order by 1, 2, 3
loop
return next fkeys;
create or replace function londiste.drop_table_fkey(i_from_table text, i_fkey_name text)
returns integer as $$
-- ----------------------------------------------------------------------
--- Function: londiste.drop_table_fkey(x)
+-- Function: londiste.drop_table_fkey(2)
--
-- Drop one fkey, save in pending table.
-- ----------------------------------------------------------------------
+++ /dev/null
-
-create or replace function londiste.get_pending_triggers(i_table_name text)
-returns setof londiste.pending_triggers as $$
--- ----------------------------------------------------------------------
--- Function: londiste.get_pending_triggers(1)
---
--- Returns dropped triggers for one table.
---
--- Parameters:
--- i_table_name - fqname
---
--- Returns:
--- list of triggers
--- ----------------------------------------------------------------------
-declare
- trigger record;
-begin
- for trigger in
- select *
- from londiste.pending_triggers
- where table_name = i_table_name
- loop
- return next trigger;
- end loop;
-
- return;
-end;
-$$ language plpgsql strict stable;
-
-
-create or replace function londiste.drop_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.drop_table_trigger(2)
---
--- Drop one trigger, saves it to pending table.
--- ----------------------------------------------------------------------
-declare
- trig_def record;
-begin
- select * into trig_def
- from londiste.find_table_triggers(i_table_name)
- where trigger_name = i_trigger_name;
-
- if FOUND is not true then
- return 0;
- end if;
-
- insert into londiste.pending_triggers(table_name, trigger_name, trigger_def)
- values (i_table_name, i_trigger_name, trig_def.trigger_def);
-
- execute 'drop trigger ' || i_trigger_name || ' on ' || i_table_name;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.drop_all_table_triggers(i_table_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.drop_all_table_triggers(1)
---
--- Drop all triggers that exist.
--- ----------------------------------------------------------------------
-declare
- trigger record;
-begin
- for trigger in
- select trigger_name as name
- from londiste.find_table_triggers(i_table_name)
- loop
- perform londiste.drop_table_trigger(i_table_name, trigger.name);
- end loop;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.restore_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.restore_table_trigger(2)
---
--- Restore one trigger.
--- ----------------------------------------------------------------------
-declare
- trig_def text;
-begin
- select trigger_def into trig_def
- from londiste.pending_triggers
- where (table_name, trigger_name) = (i_table_name, i_trigger_name);
-
- if not found then
- return 0;
- end if;
-
- delete from londiste.pending_triggers
- where table_name = i_table_name and trigger_name = i_trigger_name;
-
- execute trig_def;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.restore_all_table_triggers(i_table_name text)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.restore_all_table_triggers(1)
---
--- Restore all dropped triggers.
--- ----------------------------------------------------------------------
-declare
- trigger record;
-begin
- for trigger in
- select trigger_name as name
- from londiste.get_pending_triggers(i_table_name)
- loop
- perform londiste.restore_table_trigger(i_table_name, trigger.name);
- end loop;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
--- /dev/null
+
+create or replace function londiste.local_add_seq(
+ in i_queue_name text, in i_seq_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_seq(2)
+--
+-- Register sequence.
+--
+-- Parameters:
+-- i_queue_name - cascaded queue name
+-- i_seq_name - seq name
+--
+-- Returns:
+-- 200 - OK
+-- 400 - Not found
+-- ----------------------------------------------------------------------
+declare
+ fq_seq_name text;
+ lastval int8;
+ seq record;
+begin
+ fq_seq_name := londiste.make_fqname(i_seq_name);
+
+ perform 1 from pg_class
+ where oid = londiste.find_seq_oid(fq_seq_name);
+ if not found then
+ select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_note;
+ return;
+ end if;
+
+ if pgq_node.is_root_node(i_queue_name) then
+ select local, last_value into seq
+ from londiste.seq_info
+ where queue_name = i_queue_name
+ and seq_name = fq_seq_name
+ for update;
+ if found and seq.local then
+ select 201, 'Sequence already added: ' || fq_seq_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ if not seq.local then
+ update londiste.seq_info set local = true
+ where queue_name = i_queue_name and seq_name = fq_seq_name;
+ else
+ insert into londiste.seq_info (queue_name, seq_name, local, last_value)
+ values (i_queue_name, fq_seq_name, true, 0);
+ end if;
+ perform * from londiste.root_check_seqs(i_queue_name);
+ else
+ select local, last_value into seq
+ from londiste.seq_info
+ where queue_name = i_queue_name
+ and seq_name = fq_seq_name
+ for update;
+ if not found then
+ select 404, 'Unknown sequence: ' || fq_seq_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ if seq.local then
+ select 201, 'Sequence already added: ' || fq_seq_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ update londiste.seq_info set local = true
+ where queue_name = i_queue_name and seq_name = fq_seq_name;
+ perform pgq.seq_setval(fq_seq_name, seq.last_value);
+ end if;
+
+ select 200, 'Sequence added' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql;
+
--- /dev/null
+create or replace function londiste.local_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(2)
+--
+-- Register table on Londiste node.
+--
+-- Returns:
+-- 200 - Ok
+-- 400 - No such set
+-- ----------------------------------------------------------------------
+declare
+ col_types text;
+ fq_table_name text;
+ new_state text;
+
+ logtrg_name text;
+ logtrg text;
+ tbl record;
+begin
+ fq_table_name := londiste.make_fqname(i_table_name);
+ col_types := londiste.find_column_types(fq_table_name);
+ if position('k' in col_types) < 1 then
+ select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+
+ perform 1 from pgq_node.node_info where queue_name = i_queue_name;
+ if not found then
+ select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
+ return;
+ end if;
+
+ select merge_state, local into tbl
+ from londiste.table_info
+ where queue_name = i_queue_name and table_name = fq_table_name;
+ if not found then
+ -- add to set on root
+ if pgq_node.is_root_node(i_queue_name) then
+ select f.ret_code, f.ret_note into ret_code, ret_note
+ from londiste.global_add_table(i_queue_name, i_table_name) f;
+ if ret_code <> 200 then
+ return;
+ end if;
+ else
+ select 404, 'Table not available on queue: ' || fq_table_name
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- reload info
+ select merge_state, local into tbl
+ from londiste.table_info
+ where queue_name = i_queue_name and table_name = fq_table_name;
+ end if;
+
+ if tbl.local then
+ select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+
+ if pgq_node.is_root_node(i_queue_name) then
+ new_state := 'ok';
+ perform londiste.root_notify_change(i_queue_name, 'londiste.add-table', fq_table_name);
+ else
+ new_state := NULL;
+ end if;
+
+ update londiste.table_info
+ set local = true,
+ merge_state = new_state
+ where queue_name = i_queue_name and table_name = fq_table_name;
+ if not found then
+ raise exception 'lost table: %', fq_table_name;
+ end if;
+
+ -- create trigger if it does not exists already
+ logtrg_name := i_queue_name || '_logtrigger';
+ perform 1 from pg_catalog.pg_trigger
+ where tgrelid = londiste.find_table_oid(fq_table_name)
+ and tgname = logtrg_name;
+ if not found then
+ logtrg := 'create trigger ' || quote_ident(logtrg_name)
+ || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
+ || ' for each row execute procedure pgq.sqltriga(' || quote_literal(i_queue_name) || ')';
+ execute logtrg;
+ end if;
+
+ select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.local_remove_seq(
+ in i_queue_name text, in i_seq_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_remove_seq(2)
+--
+-- Remove sequence.
+--
+-- Parameters:
+-- i_queue_name - set name
+-- i_seq_name - sequence name
+--
+-- Returns:
+-- 200 - OK
+-- 404 - Sequence not found
+-- ----------------------------------------------------------------------
+declare
+ fqname text;
+begin
+ fqname := londiste.make_fqname(i_seq_name);
+ if pgq_node.is_root_node(i_queue_name) then
+ select f.ret_code, f.ret_note
+ into ret_code, ret_note
+ from londiste.global_remove_seq(i_queue_name, fqname) f;
+ return;
+ end if;
+ update londiste.seq_info
+ set local = false
+ where queue_name = i_queue_name
+ and seq_name = fqname
+ and local;
+ if not found then
+ select 404, 'Sequence not found: '||fqname into ret_code, ret_note;
+ return;
+ end if;
+
+ select 200, 'Sequence removed: '||fqname into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.local_remove_table(
+ in i_queue_name text, in i_table_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_remove_table(2)
+--
+-- Remove table.
+--
+-- Parameters:
+-- i_queue_name - set name
+-- i_table_name - table name
+--
+-- Returns:
+-- 200 - OK
+-- 404 - Table not found
+-- ----------------------------------------------------------------------
+declare
+ fq_table_name text;
+ logtrg_name text;
+ tbl record;
+begin
+ fq_table_name := londiste.make_fqname(i_table_name);
+
+ select local into tbl
+ from londiste.table_info
+ where queue_name = i_queue_name
+ and table_name = fq_table_name;
+ if not found then
+ select 400, 'Table not found: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+
+ if tbl.local then
+ -- drop trigger if exists
+ logtrg_name := i_queue_name || '_logtrigger';
+ execute 'drop trigger if exists ' || quote_ident(logtrg_name)
+ || ' on ' || londiste.quote_fqname(fq_table_name);
+ -- reset data
+ update londiste.table_info
+ set local = false,
+ custom_snapshot = null,
+
+ ---- should we keep those?
+ -- skip_truncate = null,
+ -- dropped_ddl = null,
+ merge_state = null
+ where queue_name = i_queue_name
+ and table_name = fq_table_name;
+ else
+ if not pgq_node.is_root_node(i_queue_name) then
+ select 400, 'Table not registered locally: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+ end if;
+
+ if pgq_node.is_root_node(i_queue_name) then
+ perform londiste.global_remove_table(i_queue_name, fq_table_name);
+ perform londiste.root_notify_change(i_queue_name, 'londiste.remove-table', fq_table_name);
+ end if;
+
+ select 200, 'Table removed: ' || fq_table_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.local_set_skip_truncate(
+ in i_queue_name text,
+ in i_table text,
+ in i_value bool,
+ out ret_code int4,
+ out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_set_skip_truncate(3)
+--
+-- Change skip_truncate flag for table.
+-- ----------------------------------------------------------------------
+begin
+ update londiste.table_info
+ set skip_truncate = i_value
+ where queue_name = i_queue_name
+ and table_name = i_table;
+ if found then
+ select 200, 'skip_truncate=' || i_value::text
+ into ret_code, ret_note;
+ else
+ select 404, 'table not found: ' || i_table
+ into ret_code, ret_note;
+ end if;
+ return;
+end;
+$$ language plpgsql;
+
-create or replace function londiste.node_set_table_state(
- i_set_name text,
+create or replace function londiste.local_set_table_state(
+ i_queue_name text,
i_table_name text,
i_snapshot text,
i_merge_state text)
returns integer as $$
-- ----------------------------------------------------------------------
--- Function: londiste.node_set_table_state(4)
+-- Function: londiste.local_set_table_state(4)
--
-- Change table state.
--
-- Parameters:
--- i_set_name - set name
--- i-table - table name
+-- i_queue_name - cascaded queue name
+-- i_table - table name
-- i_snapshot - optional remote snapshot info
-- i_merge_state - merge state
--
-- nothing
-- ----------------------------------------------------------------------
begin
- update londiste.node_table
+ update londiste.table_info
set custom_snapshot = i_snapshot,
merge_state = i_merge_state,
-- reset skip_snapshot when table is copied over
then null
else skip_truncate
end
- where set_name = i_set_name
- and table_name = i_table_name;
+ where queue_name = i_queue_name
+ and table_name = i_table_name
+ and local;
if not found then
raise exception 'no such table';
end if;
--- /dev/null
+
+create or replace function londiste.local_set_table_struct(
+ in i_queue_name text,
+ in i_table_name text,
+ in i_dropped_ddl text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_set_table_struct(3)
+--
+-- Store dropped table struct temporarily.
+--
+-- Parameters:
+-- i_queue_name - cascaded queue name
+-- i_table - table name
+-- i_dropped_ddl - merge state
+-- ----------------------------------------------------------------------
+begin
+ update londiste.table_info
+ set dropped_ddl = i_dropped_ddl
+ where queue_name = i_queue_name
+ and table_name = i_table_name
+ and local;
+ if found then
+ select 200, 'Table struct stored'
+ into ret_code, ret_note;
+ else
+ select 404, 'no such local table: '||i_table_name
+ into ret_code, ret_note;
+
+ end if;
+ return;
+end;
+$$ language plpgsql;
+
+++ /dev/null
-
-create or replace function londiste.node_add_seq(
- in i_set_name text, in i_seq_name text,
- out ret_code int4, out ret_text text)
-as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_add_seq(2)
---
--- Register sequence.
---
--- Parameters:
--- i_set_name - set name
--- i_seq_name - seq name
---
--- Returns:
--- 200 - OK
--- 400 - Not found
--- ----------------------------------------------------------------------
-declare
- fq_seq_name text;
-begin
- fq_seq_name := londiste.make_fqname(i_seq_name);
-
- perform 1 from pg_class
- where oid = londiste.find_seq_oid(fq_seq_name);
- if not found then
- select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_text;
- return;
- end if;
-
- perform 1 from londiste.node_seq
- where set_name = i_set_name and seq_name = fq_seq_name;
- if found then
- select 200, 'OK, seqence already added' into ret_code, ret_text;
- return;
- end if;
-
- if pgq_set.is_root(i_set_name) then
- insert into londiste.set_seq (set_name, seq_name)
- values (i_set_name, fq_seq_name);
- perform londiste.node_notify_change(i_set_name, 'add-seq', fq_seq_name);
- end if;
-
- insert into londiste.node_seq (set_name, seq_name)
- values (i_set_name, fq_seq_name);
-
- select 200, 'OK' into ret_code, ret_text;
- return;
-end;
-$$ language plpgsql;
-
+++ /dev/null
-create or replace function londiste.node_add_table(
- in i_set_name text,
- in i_table_name text,
- out ret_code int4,
- out ret_note text)
-as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_add_table(2)
---
--- Register table on Londiste node.
---
--- Returns:
--- 200 - Ok
--- 400 - No such set
--- ----------------------------------------------------------------------
-declare
- col_types text;
- fq_table_name text;
- new_state text;
-begin
- fq_table_name := londiste.make_fqname(i_table_name);
- col_types := londiste.find_column_types(fq_table_name);
- if position('k' in col_types) < 1 then
- select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
- return;
- end if;
-
- perform 1 from pgq_set.set_info where set_name = i_set_name;
- if not found then
- select 400, 'No such set: ' || i_set_name into ret_code, ret_note;
- return;
- end if;
-
- perform 1 from londiste.node_table where set_name = i_set_name and table_name = fq_table_name;
- if found then
- select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
- return;
- end if;
-
- if pgq_set.is_root(i_set_name) then
- select * into ret_code, ret_note
- from londiste.set_add_table(i_set_name, fq_table_name);
- if ret_code <> 200 then
- return;
- end if;
- new_state := 'ok';
- perform londiste.root_notify_change(i_set_name, 'add-table', fq_table_name);
- else
- perform 1 from londiste.set_table where set_name = i_set_name and table_name = fq_table_name;
- if not found then
- select 400, 'Table not registered in set: ' || fq_table_name into ret_code, ret_note;
- return;
- end if;
- new_state := NULL;
- end if;
-
- insert into londiste.node_table (set_name, table_name, merge_state)
- values (i_set_name, fq_table_name, new_state);
-
- for ret_code, ret_note in
- select f.ret_code, f.ret_note
- from londiste.node_prepare_triggers(i_set_name, fq_table_name) f
- loop
- if ret_code > 299 then
- return;
- end if;
- end loop;
-
- for ret_code, ret_note in
- select f.ret_code, f.ret_note
- from londiste.node_refresh_triggers(i_set_name, fq_table_name) f
- loop
- if ret_code > 299 then
- return;
- end if;
- end loop;
-
- select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
- return;
-end;
-$$ language plpgsql strict;
-
+++ /dev/null
-
-create or replace function londiste.node_disable_triggers(
- in i_set_name text,
- in i_table_name text,
- out ret_code int4,
- out ret_note text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_disable_triggers(2)
---
--- Drop all registered triggers from particular table.
--- ----------------------------------------------------------------------
-declare
- tbl_oid oid;
- fq_table_name text;
- tg record;
- is_active int4;
-begin
- fq_table_name := londiste.make_fqname(i_table_name);
- perform 1 from pgq_set.set_info where set_name = i_set_name;
- if not found then
- select 400, 'Unknown set: ' || i_set_name;
- return next;
- return;
- end if;
- tbl_oid := londiste.find_table_oid(fq_table_name);
- for tg in
- select tg_name, tg_type, tg_def from londiste.node_trigger
- where set_name = i_set_name and table_name = fq_table_name
- order by tg_name
- loop
- -- check if active
- perform 1 from pg_catalog.pg_trigger
- where tgrelid = tbl_oid
- and tgname = tg.tg_name;
- if found then
- execute 'drop trigger ' || quote_ident(tg.tg_name)
- || ' on ' || londiste.quote_fqname(fq_table_name);
- select 200, 'Dropped trigger ' || tg.tg_name
- || ' from table ' || fq_table_name
- into ret_code, ret_note;
- return next;
- end if;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.node_disable_triggers(
- in i_set_name text,
- out ret_code int4,
- out ret_note text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_disable_triggers(1)
---
--- Drop all registered triggers from set tables.
--- ----------------------------------------------------------------------
-declare
- t record;
-begin
- for t in
- select table_name from londiste.node_table
- where set_name = i_set_name
- order by nr
- loop
- for ret_code, ret_note in
- select f.ret_code, f.ret_note
- from londiste.node_disable_triggers(i_set_name, t.table_name) f
- loop
- return next;
- end loop;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_get_seq_list(i_set_name text)
-returns setof text as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_get_seq_list(x)
---
--- Returns registered seqs on this Londiste node.
--- ----------------------------------------------------------------------
-declare
- rec record;
-begin
- for rec in
- select seq_name from londiste.node_seq
- where set_name = i_set_name
- order by nr
- loop
- return next rec.seq_name;
- end loop;
- return;
-end;
-$$ language plpgsql strict;
-
+++ /dev/null
-
-create or replace function londiste.node_get_table_list(
- in i_set_name text,
- out table_name text,
- out merge_state text,
- out custom_snapshot text,
- out skip_truncate bool)
-returns setof record as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_get_table_list(1)
---
--- Return info about registered tables.
---
--- Parameters:
--- i_set_name - set name
--- ----------------------------------------------------------------------
-begin
- for table_name, merge_state, custom_snapshot, skip_truncate in
- select t.table_name, t.merge_state, t.custom_snapshot, t.skip_truncate
- from londiste.node_table t
- where t.set_name= i_set_name
- order by t.nr
- loop
- return next;
- end loop;
- return;
-end;
-$$ language plpgsql strict stable;
-
+++ /dev/null
-
-create or replace function londiste.node_prepare_triggers(
- in i_set_name text,
- in i_table_name text,
- out ret_code int4,
- out ret_note text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_prepare_triggers(2)
---
--- Regsiter Londiste trigger for table.
--- ----------------------------------------------------------------------
-declare
- t_name text;
- logtrg text;
- denytrg text;
- logtrg_name text;
- denytrg_name text;
- qname text;
- fq_table_name text;
-begin
- fq_table_name := londiste.make_fqname(i_table_name);
- select queue_name into qname from pgq_set.set_info where set_name = i_set_name;
- if not found then
- select 400, 'Set not found: ' || i_set_name into ret_code, ret_note;
- return next;
- return;
- end if;
- if qname is not null then
- logtrg_name := i_set_name || '_logtrigger';
- logtrg := 'create trigger ' || quote_ident(logtrg_name)
- || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
- || ' for each row execute procedure pgq.sqltriga(' || quote_literal(qname) || ')';
- insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
- values (i_set_name, fq_table_name, logtrg_name, 'root', logtrg);
- select 200, logtrg into ret_code, ret_note;
- return next;
- end if;
-
- denytrg_name := i_set_name || '_denytrigger';
- denytrg := 'create trigger ' || quote_ident(denytrg_name)
- || ' after insert or update or delete on ' || londiste.quote_fqname(fq_table_name)
- || ' for each row execute procedure pgq.denytriga(' || quote_literal(i_set_name) || ')';
- insert into londiste.node_trigger (set_name, table_name, tg_name, tg_type, tg_def)
- values (i_set_name, fq_table_name, denytrg_name, 'non-root', denytrg);
- select 200, denytrg into ret_code, ret_note;
- return next;
-
- return;
-end;
-$$ language plpgsql;
-
+++ /dev/null
-
-create or replace function londiste.node_refresh_triggers(
- in i_set_name text,
- in i_table_name text,
- out ret_code int4,
- out ret_note text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_refresh_triggers(2)
---
--- Sync actual trigger state with registered triggers.
--- ----------------------------------------------------------------------
-declare
- tbl_oid oid;
- fq_table_name text;
- tg record;
- is_root bool;
- is_active int4;
-begin
- fq_table_name := londiste.make_fqname(i_table_name);
- perform 1 from pgq_set.set_info where set_name = i_set_name;
- if not found then
- select 400, 'Unknown set: ' || i_set_name;
- return next;
- return;
- end if;
- is_root := pgq_set.is_root(i_set_name);
- tbl_oid := londiste.find_table_oid(fq_table_name);
- for tg in
- select tg_name, tg_type, tg_def from londiste.node_trigger
- where set_name = i_set_name and table_name = fq_table_name
- order by tg_name
- loop
- if tg.tg_type not in ('root', 'non-root') then
- select 400, 'trigger ' || tg.tg_name
- || ' on table ' || fq_table_name
- || ' had unsupported type: ' || tg.tg_type
- into ret_code, ret_note;
- return next;
- else
- -- check if active
- select count(1) into is_active
- from pg_catalog.pg_trigger
- where tgrelid = tbl_oid
- and tgname = tg.tg_name;
-
- -- create or drop if needed
- if (tg.tg_type = 'root') = is_root then
- -- trigger must be active
- if is_active = 0 then
- execute tg.tg_def;
- select 200, 'Created trigger ' || tg.tg_name
- || ' on table ' || fq_table_name
- into ret_code, ret_note;
- return next;
- end if;
- else
- -- trigger must be dropped
- if is_active = 1 then
- execute 'drop trigger ' || quote_ident(tg.tg_name)
- || ' on ' || londiste.quote_fqname(fq_table_name);
- select 200, 'Dropped trigger ' || tg.tg_name
- || ' from table ' || fq_table_name
- into ret_code, ret_note;
- return next;
- end if;
- end if;
- end if;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.node_refresh_triggers(
- in i_set_name text,
- out ret_code int4,
- out ret_note text)
-returns setof record strict as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_refresh_triggers(2)
---
--- Sync actual trigger state with registered triggers for all tables.
--- ----------------------------------------------------------------------
-declare
- t record;
-begin
- for t in
- select table_name from londiste.node_table
- where set_name = i_set_name
- order by nr
- loop
- for ret_code, ret_note in
- select f.ret_code, f.ret_note
- from londiste.node_refresh_triggers(i_set_name, t.table_name) f
- loop
- return next;
- end loop;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_remove_seq(
- in i_set_name text, in i_seq_name text,
- out ret_code int4, out ret_note text)
-as $$
-begin
- delete from londiste.node_seq
- where set_name = i_set_name
- and seq_name = i_seq_name;
- if not found then
- select 400, 'Not found: '||i_seq_name into ret_code, ret_note;
- return;
- end if;
-
- -- perform londiste.provider_notify_change(i_queue_name);
- select 200, 'OK' into ret_code, ret_note;
- return;
-end;
-$$ language plpgsql strict;
-
+++ /dev/null
-
-create or replace function londiste.node_remove_table(
- in i_set_name text, in i_table_name text,
- out ret_code int4, out ret_note text)
-as $$
-declare
- fq_table_name text;
-begin
- fq_table_name := londiste.make_fqname(i_table_name);
-
- for ret_code, ret_note in
- select f.ret_code, f.ret_note from londiste.node_disable_triggers(i_set_name, fq_table_name) f
- loop
- if ret_code > 299 then
- return;
- end if;
- end loop;
- delete from londiste.node_trigger
- where set_name = i_set_name
- and table_name = fq_table_name;
- delete from londiste.node_table
- where set_name = i_set_name
- and table_name = fq_table_name;
- if not found then
- select 400, 'Not found: ' || fq_table_name into ret_code, ret_note;
- return;
- end if;
-
- if pgq_set.is_root(i_set_name) then
- perform londiste.set_remove_table(i_set_name, fq_table_name);
- perform londiste.root_notify_change(i_set_name, 'remove-table', fq_table_name);
- end if;
-
- select 200, 'Table removed: ' || fq_table_name into ret_code, ret_note;
- return;
-end;
-$$ language plpgsql strict;
-
+++ /dev/null
-
-create or replace function londiste.node_set_skip_truncate(
- i_set_name text,
- i_table text,
- i_value bool)
-returns integer as $$
--- ----------------------------------------------------------------------
--- Function: londiste.node_set_skip_truncate(x)
---
--- Change skip_truncate flag for table.
--- ----------------------------------------------------------------------
-begin
- update londiste.node_table
- set skip_truncate = i_value
- where set_name = i_set_name
- and table_name = i_table;
- if not found then
- raise exception 'table not found';
- end if;
-
- return 1;
-end;
-$$ language plpgsql;
-
--- /dev/null
+
+create or replace function londiste.root_check_seqs(
+ in i_queue_name text, in i_buffer int8,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.root_check_seqs(1)
+--
+-- Check sequences, and publish values if needed.
+--
+-- Parameters:
+-- i_queue_name - set name
+-- i_buffer - safety room
+--
+-- Returns:
+-- 200 - OK
+-- 402 - Not a root node
+-- 404 - Queue not found
+-- ----------------------------------------------------------------------
+declare
+ n record;
+ seq record;
+ real_value int8;
+ pub_value int8;
+ real_buffer int8;
+begin
+ if i_buffer is null or i_buffer < 10 then
+ real_buffer := 10000;
+ else
+ real_buffer := i_buffer;
+ end if;
+
+ select node_type, node_name into n
+ from pgq_node.node_info
+ where queue_name = i_queue_name;
+ if not found then
+ select 404, 'Queue not found: ' || i_queue_name into ret_code, ret_note;
+ return;
+ end if;
+ if n.node_type <> 'root' then
+ select 402, 'Not a root node' into ret_code, ret_note;
+ return;
+ end if;
+
+ for seq in
+ select seq_name, last_value,
+ londiste.quote_fqname(seq_name) as fqname
+ from londiste.seq_info
+ where queue_name = i_queue_name
+ and local
+ order by nr
+ loop
+ execute 'select last_value from ' || seq.fqname into real_value;
+ if real_value + real_buffer >= seq.last_value then
+ pub_value := real_value + real_buffer * 3;
+ perform pgq.insert_event(i_queue_name, 'londiste.update-seq',
+ pub_value::text, seq.seq_name, null, null, null);
+ update londiste.seq_info set last_value = pub_value
+ where queue_name = i_queue_name
+ and seq_name = seq.seq_name;
+ end if;
+ end loop;
+
+ select 200, 'Sequences updated' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql;
+
+create or replace function londiste.root_check_seqs(
+ in i_queue_name text,
+ out ret_code int4, out ret_note text)
+as $$
+begin
+ select f.ret_code, f.ret_note
+ into ret_code, ret_note
+ from londiste.root_check_seqs(i_queue_name, 10000) f;
+ return;
+end;
+$$ language plpgsql;
+
-create or replace function londiste.root_notify_change(i_set_name text, i_ev_type text, i_ev_data text)
+create or replace function londiste.root_notify_change(i_queue_name text, i_ev_type text, i_ev_data text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: londiste.root_notify_change(3)
que text;
ntype text;
begin
- select s.queue_name, s.node_type into que, ntype
- from pgq_set.set_info s
- where s.set_name = i_set_name;
- if not found then
- raise exception 'Unknown set: %', i_set_name;
- end if;
- if ntype <> 'root' then
+
+ if not coalesce(pgq_node.is_root_node(i_queue_name), false) then
raise exception 'only root node can send events';
end if;
-
- perform pgq.insert_event(que, i_ev_type, i_ev_data);
+ perform pgq.insert_event(i_queue_name, i_ev_type, i_ev_data);
return 1;
end;
+++ /dev/null
-
-create or replace function londiste.set_get_table_list(
- in i_set_name text,
- out table_name text,
- out is_local bool)
-returns setof record as $$
--- ----------------------------------------------------------------------
--- Function: londiste.set_get_table_list(1)
---
--- Show tables registered for set.
---
--- This means its available from root, events for it appear
--- in queue and nodes can attach to it.
---
--- Called by:
--- Admin tools.
--- ----------------------------------------------------------------------
-begin
- for table_name, is_local in
- select t.table_name, n.table_name is not null
- from londiste.set_table t left join londiste.node_table n
- on (t.set_name = n.set_name and t.table_name = n.table_name)
- where t.set_name = i_set_name
- loop
- return next;
- end loop;
- return;
-end;
-$$ language plpgsql strict security definer;
-
+++ /dev/null
-
-create table denytest ( val integer);
-insert into denytest values (1);
-create trigger xdeny after insert or update or delete
-on denytest for each row execute procedure londiste.deny_trigger();
-
-insert into denytest values (2);
-update denytest set val = 2;
-delete from denytest;
-
-select londiste.disable_deny_trigger(true);
-update denytest set val = 2;
-select londiste.disable_deny_trigger(true);
-update denytest set val = 2;
-select londiste.disable_deny_trigger(false);
-update denytest set val = 2;
-select londiste.disable_deny_trigger(false);
-update denytest set val = 2;
-
--- /dev/null
+
+set log_error_verbosity = 'terse';
+
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+select * from londiste.execute_start('branch_set', 'DDL-A.sql', 'drop all', false);
+
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+select * from londiste.execute_finish('branch_set', 'DDL-A.sql');
+select * from londiste.execute_finish('branch_set', 'DDL-XXX.sql');
+
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+select * from londiste.execute_start('branch_set', 'DDL-B.sql', 'drop all', true);
+
+
+
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+select * from londiste.execute_start('aset', 'DDL-root.sql', 'drop all', true);
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+select * from londiste.execute_finish('aset', 'DDL-root.sql');
+
val text
);
-select * from londiste.set_add_table('branch_set', 'public.ref_1');
-select * from londiste.set_add_table('branch_set', 'public.ref_2');
-select * from londiste.set_add_table('branch_set', 'public.ref_3');
+select * from londiste.global_add_table('branch_set', 'public.ref_1');
+select * from londiste.global_add_table('branch_set', 'public.ref_2');
+select * from londiste.global_add_table('branch_set', 'public.ref_3');
-select * from londiste.node_add_table('branch_set', 'public.ref_1');
-select * from londiste.node_add_table('branch_set', 'public.ref_2');
-select * from londiste.node_add_table('branch_set', 'public.ref_3');
+select * from londiste.local_add_table('branch_set', 'public.ref_1');
+select * from londiste.local_add_table('branch_set', 'public.ref_2');
+select * from londiste.local_add_table('branch_set', 'public.ref_3');
select * from londiste.find_table_fkeys('public.ref_1');
select * from londiste.find_table_fkeys('public.ref_2');
select * from londiste.get_table_pending_fkeys('public.ref_2');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
-- drop fkeys
-- look state
select * from londiste.get_table_pending_fkeys('public.ref_2');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
-- toggle sync
-select * from londiste.node_set_table_state('branch_set', 'public.ref_1', null, 'ok');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
-select * from londiste.node_set_table_state('branch_set', 'public.ref_2', null, 'ok');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
-select * from londiste.node_set_table_state('branch_set', 'public.ref_3', null, 'ok');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.local_set_table_state('branch_set', 'public.ref_1', null, 'ok');
+select * from londiste.get_valid_pending_fkeys('branch_set');
+select * from londiste.local_set_table_state('branch_set', 'public.ref_2', null, 'ok');
+select * from londiste.get_valid_pending_fkeys('branch_set');
+select * from londiste.local_set_table_state('branch_set', 'public.ref_3', null, 'ok');
+select * from londiste.get_valid_pending_fkeys('branch_set');
-- restore
select * from londiste.restore_table_fkey('public.ref_2', 'ref_2_ref_fkey');
-- look state
select * from londiste.get_table_pending_fkeys('public.ref_2');
-select * from londiste.node_get_valid_pending_fkeys('branch_set');
+select * from londiste.get_valid_pending_fkeys('branch_set');
select * from londiste.find_table_fkeys('public.ref_1');
select * from londiste.find_table_fkeys('public.ref_2');
select * from londiste.find_table_fkeys('public.ref_3');
\set ECHO off
set log_error_verbosity = 'terse';
+
\i ../txid/txid.sql
\i ../pgq/pgq.sql
-\i ../pgq_set/pgq_set.sql
---\i londiste.sql
+\i ../pgq_node/pgq_node.sql
+
+-- install directly from source files
\i structure/tables.sql
\i structure/functions.sql
+
\set ECHO all
--- /dev/null
+
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+
+--
+-- tables
+--
+create table tblmerge (
+ id int4 primary key,
+ data text
+);
+
+select * from pgq_node.register_location('combined_set', 'croot', 'dbname=db', false);
+select * from pgq_node.create_node('combined_set', 'root', 'croot', 'londiste_croot', null, null, null);
+
+select * from pgq_node.register_location('part1_set', 'p1root', 'dbname=db', false);
+select * from pgq_node.register_location('part1_set', 'p1merge', 'dbname=db2', false);
+select * from pgq_node.create_node('part1_set', 'leaf', 'p1merge', 'londiste_p1merge', 'p1root', 100, 'combined_set');
+
+select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', false);
+select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false);
+select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set');
+
+
+
+select * from londiste.local_add_table('combined_set', 'tblmerge');
+
+select * from londiste.global_add_table('part1_set', 'tblmerge');
+select * from londiste.local_add_table('part1_set', 'tblmerge');
+
+select * from londiste.global_add_table('part2_set', 'tblmerge');
+select * from londiste.local_add_table('part2_set', 'tblmerge');
+
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+select * from londiste.get_table_list('combined_set');
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'in-copy');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'create index;');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.get_table_list('part1_set');
+select * from londiste.get_table_list('part2_set');
+
+
+
+
+
+
+
+
+
+
+
select current_database();
-select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
-select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+select * from pgq_node.register_location('aset', 'rnode', 'dbname=db', false);
+select * from pgq_node.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
-select * from londiste.node_add_table('aset', 'public.testdata_nopk');
-select * from londiste.node_add_table('aset', 'public.testdata');
+select * from londiste.local_add_table('aset', 'public.testdata_nopk');
+select * from londiste.local_add_table('aset', 'public.testdata');
insert into testdata (data) values ('test-data');
-select * from londiste.node_get_table_list('aset');
-select * from londiste.node_remove_table('aset', 'public.testdata');
-select * from londiste.node_remove_table('aset', 'public.testdata');
-select * from londiste.node_get_table_list('aset');
+select * from londiste.get_table_list('aset');
+select * from londiste.local_remove_table('aset', 'public.testdata');
+select * from londiste.local_remove_table('aset', 'public.testdata');
+select * from londiste.get_table_list('aset');
select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
--- /dev/null
+
+set client_min_messages = 'warning';
+\set VERBOSITY 'terse'
+
+--
+-- sequences
+--
+
+create sequence masterseq;
+create sequence slaveseq;
+
+
+select * from pgq_node.register_location('seqroot', 'rnode', 'dbname=db', false);
+select * from pgq_node.create_node('seqroot', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
+
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+select * from londiste.local_add_seq('seqroot', 'masterseq');
+select * from londiste.root_check_seqs('seqroot');
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+select * from londiste.local_remove_seq('seqroot', 'masterseq');
+
+select * from londiste.get_seq_list('seqroot');
+
+select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
+
+-- subscriber
+select * from pgq_node.register_location('seqbranch', 'subnode', 'dbname=db', false);
+select * from pgq_node.register_location('seqbranch', 'rootnode', 'dbname=db', false);
+select * from pgq_node.create_node('seqbranch', 'branch', 'subnode', 'londiste_branch', 'rootnode', 1, null::text);
+
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+select * from londiste.global_update_seq('seqbranch', 'masterseq', 5);
+select * from londiste.local_add_seq('seqbranch', 'masterseq');
+select * from londiste.root_check_seqs('seqbranch');
+select * from londiste.get_seq_list('seqbranch');
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+select * from londiste.local_remove_seq('seqbranch', 'masterseq');
+
+
select current_database();
-select * from pgq_set.add_member('branch_set', 'snode', 'dbname=db', false);
-select * from pgq_set.add_member('branch_set', 'pnode', 'dbname=db2', false);
-select * from pgq_set.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
+select * from pgq_node.register_location('branch_set', 'snode', 'dbname=db', false);
+select * from pgq_node.register_location('branch_set', 'pnode', 'dbname=db2', false);
+select * from pgq_node.create_node('branch_set', 'branch', 'snode', 'londiste_branch', 'pnode', 100, null::text);
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
-select * from londiste.set_add_table('branch_set', 'public.slavedata');
-select * from londiste.node_add_table('branch_set', 'public.slavedata');
-select * from londiste.node_get_table_list('branch_set');
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
-select * from londiste.node_remove_table('branch_set', 'public.slavedata');
-select * from londiste.node_get_table_list('branch_set');
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
+select * from londiste.global_add_table('branch_set', 'public.slavedata');
+select * from londiste.local_add_table('branch_set', 'public.slavedata');
+select * from londiste.global_add_table('branch_set', 'public.tmp');
+select * from londiste.get_table_list('branch_set');
+select * from londiste.global_remove_table('branch_set', 'public.tmp');
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
+select * from londiste.local_remove_table('branch_set', 'public.slavedata');
+select * from londiste.get_table_list('branch_set');
-- Section: Londiste functions
--- Group: Main operations
-\i functions/londiste.node_add_seq.sql
-\i functions/londiste.node_add_table.sql
-\i functions/londiste.node_get_seq_list.sql
-\i functions/londiste.node_get_table_list.sql
-\i functions/londiste.node_remove_seq.sql
-\i functions/londiste.node_remove_table.sql
-\i functions/londiste.node_set_table_state.sql
-
--- Group: Set object registrations
-\i functions/londiste.set_add_table.sql
-\i functions/londiste.set_remove_table.sql
-\i functions/londiste.set_get_table_list.sql
+-- Group: Information
+\i functions/londiste.get_seq_list.sql
+\i functions/londiste.get_table_list.sql
+
+-- Group: Local object registration (setup tool)
+\i functions/londiste.local_add_seq.sql
+\i functions/londiste.local_add_table.sql
+\i functions/londiste.local_remove_seq.sql
+\i functions/londiste.local_remove_table.sql
+
+-- Group: Global object registrations (internal)
+\i functions/londiste.global_add_table.sql
+\i functions/londiste.global_remove_table.sql
+\i functions/londiste.global_update_seq.sql
+\i functions/londiste.global_remove_seq.sql
-- Group: FKey handling
\i functions/londiste.handle_fkeys.sql
--- Group: Trigger handling
-\i functions/londiste.handle_triggers.sql
+-- Group: Execute handling
+\i functions/londiste.execute_start.sql
+\i functions/londiste.execute_finish.sql
-- Group: Internal functions
-\i functions/londiste.node_set_skip_truncate.sql
-\i functions/londiste.node_prepare_triggers.sql
-\i functions/londiste.node_refresh_triggers.sql
-\i functions/londiste.node_disable_triggers.sql
+\i functions/londiste.root_check_seqs.sql
\i functions/londiste.root_notify_change.sql
+\i functions/londiste.local_set_table_state.sql
+\i functions/londiste.local_set_skip_truncate.sql
+\i functions/londiste.local_set_table_struct.sql
-- Group: Utility functions
\i functions/londiste.find_column_types.sql
\i functions/londiste.find_table_fkeys.sql
\i functions/londiste.find_table_oid.sql
-\i functions/londiste.find_table_triggers.sql
\i functions/londiste.quote_fqname.sql
\i functions/londiste.make_fqname.sql
grant usage on schema londiste to public;
-grant select on londiste.node_table to public;
-grant select on londiste.node_seq to public;
+grant select on londiste.table_info to public;
+grant select on londiste.seq_info to public;
grant select on londiste.pending_fkeys to public;
-grant select on londiste.pending_triggers to public;
+grant select on londiste.applied_execute to public;
--- /dev/null
+-- ----------------------------------------------------------------------
+-- Section: Londiste internals
+--
+-- Londiste storage: tables/seqs/fkeys/triggers/events.
+--
+-- Londiste event types:
+-- I/U/D - ev_data: table update in partial-sql format, ev_extra1: fq table name
+-- I:/U:/D: <pk> - ev_data: table update in urlencoded format, ev_extra1: fq table name
+-- londiste.add-table - ev_data: table name that was added on root
+-- londiste.remove-table - ev_data: table name that was removed on root
+-- londiste.update-seq - ev_data: new seq value from root, ev_extra1: seq name
+-- lodniste.remove-seq - ev_data: seq name that was removed on root
+-- ----------------------------------------------------------------------
+create schema londiste;
+
+set default_with_oids = 'off';
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_table
+--
+-- Tables available on root, meaning that events for only
+-- tables specified here can appear in queue.
+--
+-- Columns:
+-- nr - just to have stable order
+-- set_name - which set the table belongs to
+-- table_name - fq table name
+-- ----------------------------------------------------------------------
+create table londiste.set_table (
+ nr serial not null,
+ set_name text not null,
+ table_name text not null,
+ foreign key (set_name) references pgq_node.node_info (queue_name),
+ primary key (set_name, table_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_seq
+--
+-- Sequences available on root, meaning that events for only
+-- sequences specified here can appear in queue.
+--
+-- Columns:
+-- nr - just to have stable order
+-- set_name - which set the table belongs to
+-- seq_name - fq seq name
+-- local - there is actual seq on local node
+-- last_value - last published value from root
+-- ----------------------------------------------------------------------
+create table londiste.seq_state (
+ nr serial not null,
+ set_name text not null,
+ seq_name text not null,
+ local boolean not null default false,
+ last_value int8 not null,
+ foreign key (set_name) references pgq_node.node_info (queue_name),
+ primary key (set_name, seq_name)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_table
+--
+-- Info about attached tables.
+--
+-- Columns:
+-- nr - Dummy number for visual ordering
+-- set_name - Set name
+-- table_name - fully-qualified table name
+-- merge_state - State for tables
+-- trigger_type - trigger type
+-- trigger_name - londiste trigger name
+-- copy_snapshot - remote snapshot for COPY command
+-- custom_tg_args - user-specified
+-- skip_truncate - if 'in-copy' should not do TRUNCATE
+--
+-- Tables merge states:
+-- master - master: all in sync
+-- ok - slave: all in sync
+-- in-copy -
+-- catching-up -
+-- wanna-sync:% -
+-- do-sync:% -
+-- unsynced -
+--
+-- Trigger type:
+-- notrigger - no trigger applied
+-- pgq.logtriga - Partial SQL trigger with fixed column list
+-- pgq.sqltriga - Partial SQL trigger with autodetection
+-- pgq.logutriga - urlenc trigger with autodetection
+-- pgq.denytrigger - deny trigger
+-- ----------------------------------------------------------------------
+create table londiste.node_table (
+ nr serial not null,
+ set_name text not null,
+ table_name text not null,
+ merge_state text,
+ custom_snapshot text,
+ skip_truncate bool,
+
+ foreign key (set_name, table_name) references londiste.set_table,
+ primary key (set_name, table_name)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.applied_execute
+--
+-- Info about EXECUTE commands that are ran.
+--
+-- Columns:
+-- set_name - which set it belongs to
+-- execute_file - filename / unique id
+-- execute_time - the time execute happened
+-- execute_sql - contains SQL for EXECUTE event (informative)
+-- ----------------------------------------------------------------------
+create table londiste.applied_execute (
+ set_name text not null,
+ execute_file text not null,
+ execute_time timestamptz not null default now(),
+ execute_sql text not null,
+ primary key (set_name, execute_file)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_fkeys
+--
+-- Details on dropped fkeys. Global, not specific to any set.
+--
+-- Columns:
+-- from_table - fully-qualified table name
+-- to_table - fully-qualified table name
+-- fkey_name - name of constraint
+-- fkey_def - full fkey definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_fkeys (
+ from_table text not null,
+ to_table text not null,
+ fkey_name text not null,
+ fkey_def text not null,
+
+ primary key (from_table, fkey_name)
+);
+
+
-- Londiste storage: tables/seqs/fkeys/triggers/events.
--
-- Londiste event types:
--- I/U/D - ev_data: table update in partial-sql format, ev_extra1: fq table name
--- I:/U:/D: - ev_data: table update in urlencoded format, ev_extra1: fq table name
--- add-seq - ev_data: seq name that was added on root
--- del-seq - ev_data: seq name that was removed on root
--- add-tbl - ev_data: table name that was added on root
--- del-tbl - ev_data: table name that was removed on root
--- seq-values - ev_data: urlencoded fqname:value pairs
+-- I/U/D - partial SQL event from pgq.sqltriga()
+-- I:/U:/D: <pk> - urlencoded event from pgq.logutriga()
+-- EXECUTE - SQL script execution
+-- TRUNCATE - table truncation
+-- londiste.add-table - global table addition
+-- londiste.remove-table - global table removal
+-- londiste.update-seq - sequence update
+-- londiste.remove-seq - global sequence removal
+--
+-- pgq.sqltriga() event:
+-- ev_type - I/U/D which means insert/update/delete
+-- ev_data - partial SQL
+-- ev_extra1 - table name
+--
+-- Insert: ev_type = "I", ev_data = "(col1, col2) values (2, 'foo')", ev_extra1 = "public.tblname"
+--
+-- Update: ev_type = "U", ev_data = "col2 = null where col1 = 2", ev_extra1 = "public.tblname"
+--
+-- Delete: ev_type = "D", ev_data = "col1 = 2", ev_extra1 = "public.tblname"
+--
+-- pgq.logutriga() event:
+-- ev_type - I:/U:/D: plus comma separated list of pkey columns
+-- ev_data - urlencoded row columns
+-- ev_extra1 - table name
+--
+-- Insert: ev_type = "I:col1", ev_data = ""
+--
+-- Truncate trigger event:
+-- ev_type - TRUNCATE
+-- ev_extra1 - table name
+--
+-- Execute SQL event:
+-- ev_type - EXECUTE
+-- ev_data - SQL script
+-- ev_extra1 - Script ID
+--
+-- Global table addition:
+-- ev_type - londiste.add-table
+-- ev_data - table name
+--
+-- Global table removal:
+-- ev_type - londiste.remove-table
+-- ev_data - table name
+--
+-- Global sequence update:
+-- ev_type - londiste.update-seq
+-- ev_data - seq value
+-- ev_extra1 - seq name
+--5)
+-- Global sequence removal:
+-- ev_type - londiste.remove-seq
+-- ev_data - seq name
-- ----------------------------------------------------------------------
create schema londiste;
-- ----------------------------------------------------------------------
--- Table: londiste.set_table
+-- Table: londiste.table_info
--
--- Tables available on root, meaning that events for only
--- tables specified here can appear in queue.
+-- Info about registered tables.
--
-- Columns:
--- nr - just to have stable order
--- set_name - which set the table belongs to
--- table_name - fq table name
--- ----------------------------------------------------------------------
-create table londiste.set_table (
- nr serial not null,
- set_name text not null,
- table_name text not null,
- foreign key (set_name) references pgq_set.set_info,
- primary key (set_name, table_name)
-);
-
--- ----------------------------------------------------------------------
--- Table: londiste.set_seq
---
--- Sequences available on root, meaning that events for only
--- sequences specified here can appear in queue.
---
--- Columns:
--- nr - just to have stable order
--- set_name - which set the table belongs to
--- seq_name - fq seq name
--- ----------------------------------------------------------------------
-create table londiste.set_seq (
- nr serial not null,
- set_name text not null,
- seq_name text not null,
- foreign key (set_name) references pgq_set.set_info,
- primary key (set_name, seq_name)
-);
-
-
--- ----------------------------------------------------------------------
--- Table: londiste.node_table
---
--- Info about attached tables.
---
--- Columns:
--- nr - Dummy number for visual ordering
--- set_name - Set name
+-- nr - number for visual ordering
+-- queue_name - Cascaded queue name
-- table_name - fully-qualified table name
+-- local - Is used locally
-- merge_state - State for tables
--- trigger_type - trigger type
--- trigger_name - londiste trigger name
--- copy_snapshot - remote snapshot for COPY command
--- custom_tg_args - user-specified
+-- custom_snapshot - remote snapshot for COPY command
-- skip_truncate - if 'in-copy' should not do TRUNCATE
+-- dropped_ddl - temp place to store ddl
--
-- Tables merge states:
--- master - master: all in sync
--- ok - slave: all in sync
--- in-copy -
--- catching-up -
--- wanna-sync:% -
--- do-sync:% -
--- unsynced -
---
--- Trigger type:
--- notrigger - no trigger applied
--- pgq.logtriga - Partial SQL trigger with fixed column list
--- pgq.sqltriga - Partial SQL trigger with autodetection
--- pgq.logutriga - urlenc trigger with autodetection
--- pgq.denytrigger - deny trigger
--- ----------------------------------------------------------------------
-create table londiste.node_table (
+-- NULL - copy has not yet happened
+-- in-copy - ongoing bulk copy
+-- catching-up - copy process applies events that happened during copy
+-- wanna-sync:% - copy process caught up, wants to hand table over to replay
+-- do-sync:% - replay process is ready to accept the table
+-- ok - in sync, replay applies events
+-- ----------------------------------------------------------------------
+create table londiste.table_info (
nr serial not null,
- set_name text not null,
+ queue_name text not null,
table_name text not null,
+ local boolean not null default false,
merge_state text,
custom_snapshot text,
skip_truncate bool,
+ dropped_ddl text,
- foreign key (set_name, table_name) references londiste.set_table,
- primary key (set_name, table_name)
+ primary key (queue_name, table_name),
+ foreign key (queue_name) references pgq_node.node_info (queue_name),
+ check (dropped_ddl is null or merge_state = 'in-copy')
);
-- ----------------------------------------------------------------------
--- Table: londiste.node_trigger
+-- Table: londiste.seq_info
--
--- Node-specific triggers. When node type changes,
--- Londiste will make sure unnecessary triggers are
--- dropped and new triggers created.
+-- Sequences available on this queue.
--
-- Columns:
--- set_name - set it belongs to
--- table_name - table name
--- tg_type - any / root / non-root / unknown?
--- tg_name - name for the trigger
--- tg_def - full statement for trigger creation
+-- nr - number for visual ordering
+-- queue_name - cascaded queue name
+-- seq_name - fully-qualified seq name
+-- local - there is actual seq on local node
+-- last_value - last published value from root
-- ----------------------------------------------------------------------
-create table londiste.node_trigger (
- set_name text not null,
- table_name text not null,
- tg_name text not null,
- tg_type text not null,
- tg_def text not null,
- foreign key (set_name, table_name) references londiste.node_table,
- primary key (set_name, table_name, tg_name),
- check (tg_type in ('root', 'non-root'))
- -- check (tg_type in ('always', 'origin', 'replica', 'disabled'))
+create table londiste.seq_info (
+ nr serial not null,
+ queue_name text not null,
+ seq_name text not null,
+ local boolean not null default false,
+ last_value int8 not null,
+
+ primary key (queue_name, seq_name),
+ foreign key (queue_name) references pgq_node.node_info (queue_name)
);
+
-- ----------------------------------------------------------------------
--- Table: londiste.node_seq
+-- Table: londiste.applied_execute
--
--- Info about attached sequences.
+-- Info about EXECUTE commands that are ran.
--
-- Columns:
--- nr - dummy number for ordering
--- set_name - which set it belongs to
--- seq_name - fully-qualified seq name
--- ----------------------------------------------------------------------
-create table londiste.node_seq (
- nr serial not null,
- set_name text not null,
- seq_name text not null,
- foreign key (set_name, seq_name) references londiste.set_seq,
- primary key (set_name, seq_name)
+-- queue_name - cascaded queue name
+-- execute_file - filename / unique id
+-- execute_time - the time execute happened
+-- execute_sql - contains SQL for EXECUTE event (informative)
+-- ----------------------------------------------------------------------
+create table londiste.applied_execute (
+ queue_name text not null,
+ execute_file text not null,
+ execute_time timestamptz not null default now(),
+ execute_sql text not null,
+ primary key (queue_name, execute_file)
);
);
--- ----------------------------------------------------------------------
--- Table: londiste.pending_triggers
---
--- Details on dropped triggers. Global, not specific to any set.
---
--- Columns:
--- table_name - fully-qualified table name
--- trigger_name - trigger name
--- trigger_def - full trigger definition
--- ----------------------------------------------------------------------
-create table londiste.pending_triggers (
- table_name text not null,
- trigger_name text not null,
- trigger_def text not null,
-
- primary key (table_name, trigger_name)
-);
-