DATA_built = londiste.sql londiste.upgrade.sql
DOCS = README.londiste
-FUNCS = $(wildcard functions/*.sql)
-SRCS = structure/tables.sql structure/grants.sql structure/types.sql $(FUNCS)
+SQLS = structure/tables.sql structure/grants.sql structure/functions.sql
+FUNCS = $(shell sed -n -e '/^\\/{s/\\i //;p}' $(SQLS))
+SRCS = $(SQLS) $(FUNCS)
REGRESS = londiste_install londiste_provider londiste_subscriber londiste_fkeys
# londiste_denytrigger
-REGRESS_OPTS = --load-language=plpythonu --load-language=plpgsql
+REGRESS_OPTS = --load-language=plpgsql
include ../../config.mak
include $(PGXS)
londiste.sql: $(SRCS)
- cat $(SRCS) > $@
+ $(CATSQL) $(SQLS) > $@
-londiste.upgrade.sql: $(FUNCS)
- cat $(FUNCS) > $@
+londiste.upgrade.sql: $(SRCS)
+ $(CATSQL) structure/functions.sql > $@
test: londiste.sql
$(MAKE) installcheck || { less regression.diffs; exit 1; }
ack:
cp results/* expected/
+NDOC = NaturalDocs
+NDOCARGS = -r -o html docs/html -p docs -i docs/sql
+CATSQL = ../../scripts/catsql.py
+
+dox: cleandox
+ mkdir -p docs/html
+ mkdir -p docs/sql
+ $(CATSQL) --ndoc structure/tables.sql structure/types.sql > docs/sql/schema.sql
+ $(CATSQL) --ndoc structure/functions.sql > docs/sql/functions.sql
+ $(NDOC) $(NDOCARGS)
+
+cleandox:
+ rm -rf docs/html docs/Data docs/sql
+
-set client_min_messages = 'warning';
-\set VERBOSITY 'terse'
---
--- tables
---
-create table testdata (
- id serial primary key,
- data text
-);
-create table testdata_nopk (
- id serial,
- data text
-);
-select londiste.provider_add_table('pqueue', 'public.testdata_nopk');
-ERROR: need key column
-select londiste.provider_add_table('pqueue', 'public.testdata');
-ERROR: no such event queue
-select pgq.create_queue('pqueue');
- create_queue
---------------
- 1
-(1 row)
-
-select londiste.provider_add_table('pqueue', 'public.testdata');
- provider_add_table
---------------------
- 1
-(1 row)
-
-select londiste.provider_add_table('pqueue', 'public.testdata');
-ERROR: duplicate key violates unique constraint "provider_table_pkey"
-select londiste.provider_refresh_trigger('pqueue', 'public.testdata');
- provider_refresh_trigger
---------------------------
- 1
-(1 row)
-
-select * from londiste.provider_get_table_list('pqueue');
- table_name | trigger_name
------------------+---------------
- public.testdata | pqueue_logger
-(1 row)
-
-select londiste.provider_remove_table('pqueue', 'public.nonexist');
-ERROR: no such table registered
-select londiste.provider_remove_table('pqueue', 'public.testdata');
- provider_remove_table
------------------------
- 1
-(1 row)
-
-select * from londiste.provider_get_table_list('pqueue');
- table_name | trigger_name
-------------+--------------
-(0 rows)
-
---
--- seqs
---
-select * from londiste.provider_get_seq_list('pqueue');
- provider_get_seq_list
------------------------
-(0 rows)
-
-select londiste.provider_add_seq('pqueue', 'public.no_seq');
-ERROR: seq not found
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
- provider_add_seq
-------------------
- 0
-(1 row)
-
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-ERROR: duplicate key violates unique constraint "provider_seq_pkey"
-select * from londiste.provider_get_seq_list('pqueue');
- provider_get_seq_list
-------------------------
- public.testdata_id_seq
-(1 row)
-
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
- provider_remove_seq
----------------------
- 0
-(1 row)
-
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
-ERROR: seq not attached
-select * from londiste.provider_get_seq_list('pqueue');
- provider_get_seq_list
------------------------
-(0 rows)
-
---
--- linked queue
---
-select londiste.provider_add_table('pqueue', 'public.testdata');
- provider_add_table
---------------------
- 1
-(1 row)
-
-insert into londiste.link (source, dest) values ('mqueue', 'pqueue');
-select londiste.provider_add_table('pqueue', 'public.testdata');
-ERROR: Linked queue, manipulation not allowed
-select londiste.provider_remove_table('pqueue', 'public.testdata');
-ERROR: Linked queue, manipulation not allowed
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-ERROR: Linked queue, cannot modify
-select londiste.provider_remove_seq('pqueue', 'public.testdata_seq');
-ERROR: Linked queue, cannot modify
---
--- cleanup
---
-delete from londiste.link;
-drop table testdata;
-drop table testdata_nopk;
-delete from londiste.provider_seq;
-delete from londiste.provider_table;
-select pgq.drop_queue('pqueue');
- drop_queue
-------------
- 1
-(1 row)
-
create or replace function londiste.find_column_types(tbl text)
returns text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_column_types(1)
+--
+-- Returns columnt type string for logtriga().
+--
+-- Parameters:
+-- tbl - fqname
+--
+-- Returns:
+-- String of 'kv'.
+-- ----------------------------------------------------------------------
declare
res text;
col record;
create or replace function londiste.find_table_fkeys(i_table_name text)
-returns setof londiste.subscriber_pending_fkeys as $$
+returns setof londiste.pending_fkeys as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_table_fkeys(1)
+--
+-- Return all active fkeys.
+--
+-- Parameters:
+-- i_table_name - fqname
+--
+-- Returns:
+-- from_table - fqname
+-- to_table - fqname
+-- fkey_name - name
+-- fkey_def - full def
+-- ----------------------------------------------------------------------
declare
fkey record;
tbl_oid oid;
-create or replace function londiste.find_rel_oid(tbl text, kind text)
+
+create or replace function londiste.find_rel_oid(i_fqname text, i_kind text)
returns oid as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_rel_oid(2)
+--
+-- Find pg_class row oid.
+--
+-- Parameters:
+-- i_fqname - fq object name
+-- i_kind - relkind value
+--
+-- Returns:
+-- oid or exception of not found
+-- ----------------------------------------------------------------------
declare
res oid;
pos integer;
schema text;
name text;
begin
- pos := position('.' in tbl);
+ pos := position('.' in i_fqname);
if pos > 0 then
- schema := substring(tbl for pos - 1);
- name := substring(tbl from pos + 1);
+ schema := substring(i_fqname for pos - 1);
+ name := substring(i_fqname from pos + 1);
else
schema := 'public';
- name := tbl;
+ name := i_fqname;
end if;
select c.oid into res
from pg_namespace n, pg_class c
where c.relnamespace = n.oid
- and c.relkind = kind
+ and c.relkind = i_kind
and n.nspname = schema and c.relname = name;
if not found then
- if kind = 'r' then
- raise exception 'table not found';
- elsif kind = 'S' then
- raise exception 'seq not found';
- else
- raise exception 'weird relkind';
- end if;
+ res := NULL;
end if;
return res;
end;
$$ language plpgsql strict stable;
+
create or replace function londiste.find_table_oid(tbl text)
returns oid as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_table_oid(1)
+--
+-- Find table oid based on fqname.
+--
+-- Parameters:
+-- tbl - fqname
+--
+-- Returns:
+-- oid
+-- ----------------------------------------------------------------------
begin
return londiste.find_rel_oid(tbl, 'r');
end;
$$ language plpgsql strict stable;
-create or replace function londiste.find_seq_oid(tbl text)
+
+create or replace function londiste.find_seq_oid(seq text)
returns oid as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_seq_oid(1)
+--
+-- Find sequence oid based on fqname.
+--
+-- Parameters:
+-- seq - fqname
+--
+-- Returns:
+-- oid
+-- ----------------------------------------------------------------------
begin
- return londiste.find_rel_oid(tbl, 'S');
+ return londiste.find_rel_oid(seq, 'S');
end;
$$ language plpgsql strict stable;
create or replace function londiste.find_table_triggers(i_table_name text)
-returns setof londiste.subscriber_pending_triggers as $$
+returns setof londiste.pending_triggers as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_table_triggers(1)
+--
+-- Returns all existing triggers on table.
+--
+-- Parameters:
+-- i_table_name - table name
+--
+-- Returns:
+-- table_name - fq table name
+-- trigger_name - name
+-- trigger_def - partial def as returned by pg_get_triggerdef()
+-- ----------------------------------------------------------------------
declare
tg record;
begin
return;
end;
$$ language plpgsql strict stable;
+
--- /dev/null
+
+create or replace function londiste.get_table_pending_fkeys(i_table_name text)
+returns setof londiste.pending_fkeys as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_table_pending_fkeys(1)
+--
+-- Return dropped fkeys for table.
+--
+-- Parameters:
+-- i_table_name - fqname
+--
+-- Returns:
+-- desc
+-- ----------------------------------------------------------------------
+declare
+ fkeys record;
+begin
+ for fkeys in
+ select *
+ from londiste.pending_fkeys
+ where from_table = i_table_name or to_table = i_table_name
+ order by 1,2,3
+ loop
+ return next fkeys;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.node_get_valid_pending_fkeys(i_set_name text)
+returns setof londiste.pending_fkeys as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_get_valid_pending_fkeys(1)
+--
+-- Returns dropped fkeys where both sides are in sync now.
+--
+-- Parameters:
+-- i_set_name - sets name
+--
+-- Returns:
+-- desc
+-- ----------------------------------------------------------------------
+declare
+ fkeys record;
+begin
+ for fkeys in
+ select pf.*
+ from londiste.pending_fkeys pf
+ left join londiste.node_table st_from on (st_from.table_name = pf.from_table)
+ left join londiste.node_table st_to on (st_to.table_name = pf.to_table)
+ where (st_from.table_name is null or (st_from.merge_state = 'ok' and st_from.snapshot is null))
+ and (st_to.table_name is null or (st_to.merge_state = 'ok' and st_to.snapshot is null))
+ and (coalesce(st_from.queue_name = i_queue_name, false)
+ or coalesce(st_to.queue_name = i_queue_name, false))
+ order by 1, 2, 3
+ loop
+ return next fkeys;
+ end loop;
+
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.drop_table_fkey(i_from_table text, i_fkey_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.drop_table_fkey(x)
+--
+-- Drop one fkey, save in pending table.
+-- ----------------------------------------------------------------------
+declare
+ fkey record;
+begin
+ select * into fkey
+ from londiste.find_table_fkeys(i_from_table)
+ where fkey_name = i_fkey_name and from_table = i_from_table;
+
+ if not found then
+ return 0;
+ end if;
+
+ insert into londiste.pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
+
+ execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
+ || ' drop constraint ' || quote_ident(i_fkey_name);
+
+ return 1;
+end;
+$$ language plpgsql strict;
+
+
+create or replace function londiste.restore_table_fkey(i_from_table text, i_fkey_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.restore_table_fkey(2)
+--
+-- Restore dropped fkey.
+--
+-- Parameters:
+-- i_from_table - source table
+-- i_fkey_name - fkey name
+--
+-- Returns:
+-- nothing
+-- ----------------------------------------------------------------------
+declare
+ fkey record;
+begin
+ select * into fkey
+ from londiste.pending_fkeys
+ where fkey_name = i_fkey_name and from_table = i_from_table;
+
+ if not found then
+ return 0;
+ end if;
+
+ execute fkey.fkey_def;
+
+ delete from londiste.pending_fkeys where fkey_name = fkey.fkey_name;
+
+ return 1;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.get_pending_triggers(i_table_name text)
+returns setof londiste.pending_triggers as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_pending_triggers(1)
+--
+-- Returns dropped triggers for one table.
+--
+-- Parameters:
+-- i_table_name - fqname
+--
+-- Returns:
+-- list of triggers
+-- ----------------------------------------------------------------------
+declare
+ trigger record;
+begin
+ for trigger in
+ select *
+ from londiste.pending_triggers
+ where table_name = i_table_name
+ loop
+ return next trigger;
+ end loop;
+
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.drop_table_trigger(i_table_name text, i_trigger_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.drop_table_trigger(2)
+--
+-- Drop one trigger, saves it to pending table.
+-- ----------------------------------------------------------------------
+declare
+ trig_def record;
+begin
+ select * into trig_def
+ from londiste.find_table_triggers(i_table_name)
+ where trigger_name = i_trigger_name;
+
+ if FOUND is not true then
+ return 0;
+ end if;
+
+ insert into londiste.pending_triggers(table_name, trigger_name, trigger_def)
+ values (i_table_name, i_trigger_name, trig_def.trigger_def);
+
+ execute 'drop trigger ' || i_trigger_name || ' on ' || i_table_name;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.drop_all_table_triggers(i_table_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.drop_all_table_triggers(1)
+--
+-- Drop all triggers that exist.
+-- ----------------------------------------------------------------------
+declare
+ trigger record;
+begin
+ for trigger in
+ select trigger_name as name
+ from londiste.find_table_triggers(i_table_name)
+ loop
+ perform londiste.drop_table_trigger(i_table_name, trigger.name);
+ end loop;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.restore_table_trigger(i_table_name text, i_trigger_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.restore_table_trigger(2)
+--
+-- Restore one trigger.
+-- ----------------------------------------------------------------------
+declare
+ trig_def text;
+begin
+ select trigger_def into trig_def
+ from londiste.pending_triggers
+ where (table_name, trigger_name) = (i_table_name, i_trigger_name);
+
+ if not found then
+ return 0;
+ end if;
+
+ delete from londiste.pending_triggers
+ where table_name = i_table_name and trigger_name = i_trigger_name;
+
+ execute trig_def;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.restore_all_table_triggers(i_table_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.restore_all_table_triggers(1)
+--
+-- Restore all dropped triggers.
+-- ----------------------------------------------------------------------
+declare
+ trigger record;
+begin
+ for trigger in
+ select trigger_name as name
+ from londiste.get_pending_triggers(i_table_name)
+ loop
+ perform londiste.restore_table_trigger(i_table_name, trigger.name);
+ end loop;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
--- /dev/null
+
+create or replace function londiste.make_fqname(i_name text)
+returns text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.make_fqname(1)
+--
+-- Make name to schema-qualified one.
+--
+-- First dot is taken as schema separator.
+--
+-- If schema is missing, 'public' is assumed.
+--
+-- Parameters:
+-- i_name - object name.
+--
+-- Returns:
+-- Schema qualified name.
+-- ----------------------------------------------------------------------
+begin
+ if position('.' in i_name) > 0 then
+ return i_name;
+ else
+ return 'public.' || i_name;
+ end if;
+end;
+$$ language plpgsql strict immutable;
+
--- /dev/null
+
+create or replace function londiste.node_add_seq(
+ in i_set_name text, in i_seq_name text,
+ out ret_code int4, out ret_text text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_add_seq(2)
+--
+-- Register sequence.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_seq_name - seq name
+--
+-- Returns:
+-- 200 - OK
+-- 400 - Not found
+-- ----------------------------------------------------------------------
+declare
+ fq_seq_name text;
+begin
+ fq_seq_name := londiste.make_fqname(i_seq_name);
+
+ perform 1 from pg_class
+ where oid = londiste.find_seq_oid(fq_seq_name);
+ if not found then
+ select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_text;
+ return;
+ end if;
+
+ perform 1 from londiste.node_seq
+ where set_name = i_set_name and seq_name = fq_seq_name;
+ if found then
+ select 200, 'OK, seqence already added' into ret_code, ret_text;
+ return;
+ end if;
+
+ if pgq_set.is_root(i_set_name) then
+ insert into londiste.set_seq (set_name, seq_name)
+ values (i_set_name, fq_seq_name);
+ perform londiste.node_notify_change(i_set_name, 'add-seq', fq_seq_name);
+ end if;
+
+ insert into londiste.node_seq (set_name, seq_name)
+ values (i_set_name, fq_seq_name);
+
+ select 200, 'OK' into ret_code, ret_text;
+ return;
+end;
+$$ language plpgsql;
+
--- /dev/null
+create or replace function londiste.node_add_table(
+ in i_set_name text,
+ in i_table_name text,
+ out ret_code int4,
+ out ret_desc text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_add_table(x)
+--
+-- Register table on Londiste node.
+--
+-- Returns:
+-- 200 - Ok
+-- 400 - No such set
+-- ----------------------------------------------------------------------
+declare
+ col_types text;
+ fq_table_name text;
+begin
+ fq_table_name := londiste.make_fqname(i_table_name);
+ col_types := londiste.find_column_types(fq_table_name);
+ if position('k' in col_types) < 1 then
+ raise exception 'need key column';
+ end if;
+
+ perform 1 from pgq_set.set_info where set_name = i_set_name;
+ if not found then
+ select 400, 'No such set: ' || i_set_name into ret_code, ret_desc;
+ return;
+ end if;
+
+ perform 1 from londiste.node_table where set_name = i_set_name and table_name = fq_table_name;
+ if found then
+ select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_desc;
+ return;
+ end if;
+
+ insert into londiste.node_table (set_name, table_name)
+ values (i_set_name, fq_table_name);
+ select 200, 'OK' into ret_code, ret_desc;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.provider_get_seq_list(i_set_name text)
+returns setof text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_get_seq_list(x)
+--
+-- Returns registered seqs on this Londiste node.
+-- ----------------------------------------------------------------------
+declare
+ rec record;
+begin
+ for rec in
+ select seq_name from londiste.node_seq
+ where set_name = i_set_name
+ order by nr
+ loop
+ return next rec.seq_name;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.node_get_table_list(
+ in i_set_name text,
+ out table_name text,
+ out merge_state text,
+ out custom_snapshot text,
+ out skip_truncate bool)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_get_table_list(1)
+--
+-- Return info about registered tables.
+--
+-- Parameters:
+-- i_set_name - set name
+-- ----------------------------------------------------------------------
+begin
+ for table_name, merge_state, custom_snapshot, skip_truncate in
+ select t.table_name, t.merge_state, t.custom_snapshot, t.skip_truncate
+ from londiste.node_table t
+ where t.set_name= i_set_name
+ order by t.nr
+ loop
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict stable;
+
--- /dev/null
+
+create or replace function londiste.node_send_event(i_set_name text, i_ev_type text, i_ev_data text)
+returns integer as $$
+declare
+ que text;
+begin
+ select s.queue_name into que
+ from pgq_set s
+ where s.set_name = i_set_name;
+ if not found then
+ raise exception 'Unknown set: %', i_set_name;
+ end if;
+
+ perform pgq.insert_event(que, i_ev_data, i_ev_data);
+
+ return 1;
+end;
+$$ language plpgsql;
+
--- /dev/null
+
+create or replace function londiste.provider_remove_seq(
+ in i_set_name text, in i_seq_name text,
+ out ret_code int4, out ret_desc text)
+as $$
+begin
+ delete from londiste.node_seq
+ where set_name = i_set_name
+ and seq_name = i_seq_name;
+ if not found then
+ select 400, 'Not found: '||i_seq_name into ret_code, ret_desc;
+ return;
+ end if;
+
+ -- perform londiste.provider_notify_change(i_queue_name);
+ select 200, 'OK' into ret_code, ret_desc;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.node_remove_table(
+ in i_set_name text, in i_table_name text,
+ out ret_code int4, out ret_desc text)
+as $$
+begin
+ delete from londiste.node_table
+ where set_name = i_set_name
+ and table_name = i_table_name;
+ if not found then
+ select 400, 'Not found: '||i_table_name into ret_code, ret_desc;
+ return;
+ end if;
+
+ -- perform londiste.provider_notify_change(i_queue_name);
+ -- triggers
+ select 200, 'OK' into ret_code, ret_desc;
+ return;
+end;
+$$ language plpgsql strict;
+
--- /dev/null
+
+create or replace function londiste.node_set_skip_truncate(
+ i_set_name text,
+ i_table text,
+ i_value bool)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_set_skip_truncate(x)
+--
+-- Change skip_truncate flag for table.
+-- ----------------------------------------------------------------------
+begin
+ update londiste.node_table
+ set skip_truncate = i_value
+ where set_name = i_set_name
+ and table_name = i_table;
+ if not found then
+ raise exception 'table not found';
+ end if;
+
+ return 1;
+end;
+$$ language plpgsql;
+
--- /dev/null
+
+create or replace function londiste.node_set_table_state(
+ i_set_name text,
+ i_table_name text,
+ i_snapshot text,
+ i_merge_state text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_set_table_state(4)
+--
+-- Change table state.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i-table - table name
+-- i_snapshot - optional remote snapshot info
+-- i_merge_state - merge state
+--
+-- Returns:
+-- nothing
+-- ----------------------------------------------------------------------
+begin
+ update londiste.node_table
+ set snapshot = i_snapshot,
+ merge_state = i_merge_state,
+ -- reset skip_snapshot when table is copied over
+ skip_truncate = case when i_merge_state = 'ok'
+ then null
+ else skip_truncate
+ end
+ where set_name = i_set_name
+ and table_name = i_table_name;
+ if not found then
+ raise exception 'no such table';
+ end if;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+++ /dev/null
-
-create or replace function londiste.provider_add_seq(
- i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
- link text;
-begin
- -- check if linked queue
- link := londiste.link_source(i_queue_name);
- if link is not null then
- raise exception 'Linked queue, cannot modify';
- end if;
-
- perform 1 from pg_class
- where oid = londiste.find_seq_oid(i_seq_name);
- if not found then
- raise exception 'seq not found';
- end if;
-
- insert into londiste.provider_seq (queue_name, seq_name)
- values (i_queue_name, i_seq_name);
- perform londiste.provider_notify_change(i_queue_name);
-
- return 0;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-create or replace function londiste.provider_add_table(
- i_queue_name text,
- i_table_name text,
- i_col_types text
-) returns integer strict as $$
-declare
- tgname text;
- sql text;
-begin
- if londiste.link_source(i_queue_name) is not null then
- raise exception 'Linked queue, manipulation not allowed';
- end if;
-
- if position('k' in i_col_types) < 1 then
- raise exception 'need key column';
- end if;
- if position('.' in i_table_name) < 1 then
- raise exception 'need fully-qualified table name';
- end if;
- select queue_name into tgname
- from pgq.queue where queue_name = i_queue_name;
- if not found then
- raise exception 'no such event queue';
- end if;
-
- tgname := i_queue_name || '_logger';
- tgname := replace(lower(tgname), '.', '_');
- insert into londiste.provider_table
- (queue_name, table_name, trigger_name)
- values (i_queue_name, i_table_name, tgname);
-
- perform londiste.provider_create_trigger(
- i_queue_name, i_table_name, i_col_types);
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.provider_add_table(
- i_queue_name text,
- i_table_name text
-) returns integer as $$
-begin
- return londiste.provider_add_table(i_queue_name, i_table_name,
- londiste.find_column_types(i_table_name));
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_create_trigger(
- i_queue_name text,
- i_table_name text,
- i_col_types text
-) returns integer strict as $$
-declare
- tgname text;
-begin
- select trigger_name into tgname
- from londiste.provider_table
- where queue_name = i_queue_name
- and table_name = i_table_name;
- if not found then
- raise exception 'table not found';
- end if;
-
- execute 'create trigger ' || tgname
- || ' after insert or update or delete on '
- || i_table_name
- || ' for each row execute procedure pgq.logtriga('
- || quote_literal(i_queue_name) || ', '
- || quote_literal(i_col_types) || ', '
- || quote_literal(i_table_name) || ')';
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_get_seq_list(i_queue_name text)
-returns setof text as $$
-declare
- rec record;
-begin
- for rec in
- select seq_name from londiste.provider_seq
- where queue_name = i_queue_name
- order by nr
- loop
- return next rec.seq_name;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_get_table_list(i_queue text)
-returns setof londiste.ret_provider_table_list as $$
-declare
- rec londiste.ret_provider_table_list%rowtype;
-begin
- for rec in
- select table_name, trigger_name
- from londiste.provider_table
- where queue_name = i_queue
- order by nr
- loop
- return next rec;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_notify_change(i_queue_name text)
-returns integer as $$
-declare
- res text;
- tbl record;
-begin
- res := '';
- for tbl in
- select table_name from londiste.provider_table
- where queue_name = i_queue_name
- order by nr
- loop
- if res = '' then
- res := tbl.table_name;
- else
- res := res || ',' || tbl.table_name;
- end if;
- end loop;
-
- perform pgq.insert_event(i_queue_name, 'T', res);
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_refresh_trigger(
- i_queue_name text,
- i_table_name text,
- i_col_types text
-) returns integer strict as $$
-declare
- t_name text;
- tbl_oid oid;
-begin
- select trigger_name into t_name
- from londiste.provider_table
- where queue_name = i_queue_name
- and table_name = i_table_name;
- if not found then
- raise exception 'table not found';
- end if;
-
- tbl_oid := londiste.find_table_oid(i_table_name);
- perform 1 from pg_trigger
- where tgrelid = tbl_oid
- and tgname = t_name;
- if found then
- execute 'drop trigger ' || t_name || ' on ' || i_table_name;
- end if;
-
- perform londiste.provider_create_trigger(i_queue_name, i_table_name, i_col_types);
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.provider_refresh_trigger(
- i_queue_name text,
- i_table_name text
-) returns integer strict as $$
-begin
- return londiste.provider_refresh_trigger(i_queue_name, i_table_name,
- londiste.find_column_types(i_table_name));
-end;
-$$ language plpgsql security definer;
-
-
-
+++ /dev/null
-
-create or replace function londiste.provider_remove_seq(
- i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
- link text;
-begin
- -- check if linked queue
- link := londiste.link_source(i_queue_name);
- if link is not null then
- raise exception 'Linked queue, cannot modify';
- end if;
-
- delete from londiste.provider_seq
- where queue_name = i_queue_name
- and seq_name = i_seq_name;
- if not found then
- raise exception 'seq not attached';
- end if;
-
- perform londiste.provider_notify_change(i_queue_name);
-
- return 0;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.provider_remove_table(
- i_queue_name text,
- i_table_name text
-) returns integer as $$
-declare
- tgname text;
-begin
- if londiste.link_source(i_queue_name) is not null then
- raise exception 'Linked queue, manipulation not allowed';
- end if;
-
- select trigger_name into tgname from londiste.provider_table
- where queue_name = i_queue_name
- and table_name = i_table_name;
- if not found then
- raise exception 'no such table registered';
- end if;
-
- begin
- execute 'drop trigger ' || tgname || ' on ' || i_table_name;
- exception
- when undefined_table then
- raise notice 'table % does not exist', i_table_name;
- when undefined_object then
- raise notice 'trigger % does not exist on table %', tgname, i_table_name;
- end;
-
- delete from londiste.provider_table
- where queue_name = i_queue_name
- and table_name = i_table_name;
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
-
create or replace function londiste.quote_fqname(i_name text)
returns text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.quote_fqname(1)
+--
+-- Quete fully-qualified object name for SQL.
+--
+-- First dot is taken as schema separator.
+--
+-- If schema is missing, 'public' is assumed.
+--
+-- Parameters:
+-- i_name - fully qualified object name.
+--
+-- Returns:
+-- Quoted name.
+-- ----------------------------------------------------------------------
declare
res text;
pos integer;
+++ /dev/null
-
-create or replace function londiste.subscriber_add_seq(
- i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
- link text;
-begin
- insert into londiste.subscriber_seq (queue_name, seq_name)
- values (i_queue_name, i_seq_name);
-
- -- update linked queue if needed
- link := londiste.link_dest(i_queue_name);
- if link is not null then
- insert into londiste.provider_seq
- (queue_name, seq_name)
- values (link, i_seq_name);
- perform londiste.provider_notify_change(link);
- end if;
-
- return 0;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.subscriber_add_table(
- i_queue_name text, i_table text)
-returns integer as $$
-begin
- insert into londiste.subscriber_table (queue_name, table_name)
- values (i_queue_name, i_table);
-
- -- linked queue is updated, when the table is copied
-
- return 0;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-
-create or replace function londiste.subscriber_get_table_pending_fkeys(i_table_name text)
-returns setof londiste.subscriber_pending_fkeys as $$
-declare
- fkeys record;
-begin
- for fkeys in
- select *
- from londiste.subscriber_pending_fkeys
- where from_table=i_table_name or to_table=i_table_name
- order by 1,2,3
- loop
- return next fkeys;
- end loop;
-
- return;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_get_queue_valid_pending_fkeys(i_queue_name text)
-returns setof londiste.subscriber_pending_fkeys as $$
-declare
- fkeys record;
-begin
- for fkeys in
- select pf.*
- from londiste.subscriber_pending_fkeys pf
- left join londiste.subscriber_table st_from on (st_from.table_name = pf.from_table)
- left join londiste.subscriber_table st_to on (st_to.table_name = pf.to_table)
- where (st_from.table_name is null or (st_from.merge_state = 'ok' and st_from.snapshot is null))
- and (st_to.table_name is null or (st_to.merge_state = 'ok' and st_to.snapshot is null))
- and (coalesce(st_from.queue_name = i_queue_name, false)
- or coalesce(st_to.queue_name = i_queue_name, false))
- order by 1, 2, 3
- loop
- return next fkeys;
- end loop;
-
- return;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_drop_table_fkey(i_from_table text, i_fkey_name text)
-returns integer as $$
-declare
- fkey record;
-begin
- select * into fkey
- from londiste.find_table_fkeys(i_from_table)
- where fkey_name = i_fkey_name and from_table = i_from_table;
-
- if not found then
- return 0;
- end if;
-
- insert into londiste.subscriber_pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
-
- execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
- || ' drop constraint ' || quote_ident(i_fkey_name);
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_restore_table_fkey(i_from_table text, i_fkey_name text)
-returns integer as $$
-declare
- fkey record;
-begin
- select * into fkey
- from londiste.subscriber_pending_fkeys
- where fkey_name = i_fkey_name and from_table = i_from_table;
-
- if not found then
- return 0;
- end if;
-
- delete from londiste.subscriber_pending_fkeys where fkey_name = fkey.fkey_name;
-
- execute fkey.fkey_def;
-
- return 1;
-end;
-$$ language plpgsql;
+++ /dev/null
-
-create or replace function londiste.subscriber_get_seq_list(i_queue_name text)
-returns setof text as $$
-declare
- rec record;
-begin
- for rec in
- select seq_name from londiste.subscriber_seq
- where queue_name = i_queue_name
- order by nr
- loop
- return next rec.seq_name;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.subscriber_get_table_list(i_queue_name text)
-returns setof londiste.ret_subscriber_table as $$
-declare
- rec londiste.ret_subscriber_table%rowtype;
-begin
- for rec in
- select table_name, merge_state, snapshot, trigger_name, skip_truncate
- from londiste.subscriber_table
- where queue_name = i_queue_name
- order by nr
- loop
- return next rec;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
--- compat
-create or replace function londiste.get_table_state(i_queue text)
-returns setof londiste.subscriber_table as $$
-declare
- rec londiste.subscriber_table%rowtype;
-begin
- for rec in
- select * from londiste.subscriber_table
- where queue_name = i_queue
- order by nr
- loop
- return next rec;
- end loop;
- return;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.subscriber_remove_seq(
- i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
- link text;
-begin
- delete from londiste.subscriber_seq
- where queue_name = i_queue_name
- and seq_name = i_seq_name;
- if not found then
- raise exception 'no such seq?';
- end if;
-
- -- update linked queue if needed
- link := londiste.link_dest(i_queue_name);
- if link is not null then
- delete from londiste.provider_seq
- where queue_name = link
- and seq_name = i_seq_name;
- perform londiste.provider_notify_change(link);
- end if;
-
- return 0;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.subscriber_remove_table(
- i_queue_name text, i_table text)
-returns integer as $$
-declare
- link text;
-begin
- delete from londiste.subscriber_table
- where queue_name = i_queue_name
- and table_name = i_table;
- if not found then
- raise exception 'no such table';
- end if;
-
- -- sync link
- link := londiste.link_dest(i_queue_name);
- if link is not null then
- delete from londiste.provider_table
- where queue_name = link
- and table_name = i_table;
- perform londiste.provider_notify_change(link);
- end if;
-
- return 0;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.subscriber_set_skip_truncate(
- i_queue text,
- i_table text,
- i_value bool)
-returns integer as $$
-begin
- update londiste.subscriber_table
- set skip_truncate = i_value
- where queue_name = i_queue
- and table_name = i_table;
- if not found then
- raise exception 'table not found';
- end if;
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
+++ /dev/null
-
-create or replace function londiste.subscriber_set_table_state(
- i_queue_name text,
- i_table_name text,
- i_snapshot text,
- i_merge_state text)
-returns integer as $$
-declare
- link text;
- ok integer;
-begin
- update londiste.subscriber_table
- set snapshot = i_snapshot,
- merge_state = i_merge_state,
- -- reset skip_snapshot when table is copied over
- skip_truncate = case when i_merge_state = 'ok'
- then null
- else skip_truncate
- end
- where queue_name = i_queue_name
- and table_name = i_table_name;
- if not found then
- raise exception 'no such table';
- end if;
-
- -- sync link state also
- link := londiste.link_dest(i_queue_name);
- if link then
- select * from londiste.provider_table
- where queue_name = linkdst
- and table_name = i_table_name;
- if found then
- if i_merge_state is null or i_merge_state <> 'ok' then
- delete from londiste.provider_table
- where queue_name = link
- and table_name = i_table_name;
- perform londiste.notify_change(link);
- end if;
- else
- if i_merge_state = 'ok' then
- insert into londiste.provider_table (queue_name, table_name)
- values (link, i_table_name);
- perform londiste.notify_change(link);
- end if;
- end if;
- end if;
-
- return 1;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.set_table_state(
- i_queue_name text,
- i_table_name text,
- i_snapshot text,
- i_merge_state text)
-returns integer as $$
-begin
- return londiste.subscriber_set_table_state(i_queue_name, i_table_name, i_snapshot, i_merge_state);
-end;
-$$ language plpgsql security definer;
-
-
+++ /dev/null
-
-create or replace function londiste.subscriber_get_table_pending_triggers(i_table_name text)
-returns setof londiste.subscriber_pending_triggers as $$
-declare
- trigger record;
-begin
- for trigger in
- select *
- from londiste.subscriber_pending_triggers
- where table_name = i_table_name
- loop
- return next trigger;
- end loop;
-
- return;
-end;
-$$ language plpgsql strict stable;
-
-
-create or replace function londiste.subscriber_drop_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
-declare
- trig_def record;
-begin
- select * into trig_def
- from londiste.find_table_triggers(i_table_name)
- where trigger_name = i_trigger_name;
-
- if FOUND is not true then
- return 0;
- end if;
-
- insert into londiste.subscriber_pending_triggers(table_name, trigger_name, trigger_def)
- values (i_table_name, i_trigger_name, trig_def.trigger_def);
-
- execute 'drop trigger ' || i_trigger_name || ' on ' || i_table_name;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_drop_all_table_triggers(i_table_name text)
-returns integer as $$
-declare
- trigger record;
-begin
- for trigger in
- select trigger_name as name
- from londiste.find_table_triggers(i_table_name)
- loop
- perform londiste.subscriber_drop_table_trigger(i_table_name, trigger.name);
- end loop;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_restore_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
-declare
- trig_def text;
-begin
- select trigger_def into trig_def
- from londiste.subscriber_pending_triggers
- where (table_name, trigger_name) = (i_table_name, i_trigger_name);
-
- if not found then
- return 0;
- end if;
-
- delete from londiste.subscriber_pending_triggers
- where table_name = i_table_name and trigger_name = i_trigger_name;
-
- execute trig_def;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_restore_all_table_triggers(i_table_name text)
-returns integer as $$
-declare
- trigger record;
-begin
- for trigger in
- select trigger_name as name
- from londiste.subscriber_get_table_pending_triggers(i_table_name)
- loop
- perform londiste.subscriber_restore_table_trigger(i_table_name, trigger.name);
- end loop;
-
- return 1;
-end;
-$$ language plpgsql;
-
-
set log_error_verbosity = 'terse';
\i ../txid/txid.sql
\i ../pgq/pgq.sql
-\i ../logtriga/logtriga.sql
+\i ../pgq_set/pgq_set.sql
\i londiste.sql
\set ECHO all
data text
);
-select londiste.provider_add_table('pqueue', 'public.testdata_nopk');
-select londiste.provider_add_table('pqueue', 'public.testdata');
+select current_database();
-select pgq.create_queue('pqueue');
-select londiste.provider_add_table('pqueue', 'public.testdata');
-select londiste.provider_add_table('pqueue', 'public.testdata');
+select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
+select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
-select londiste.provider_refresh_trigger('pqueue', 'public.testdata');
+select * from londiste.node_add_table('aset', 'public.testdata_nopk');
+select * from londiste.node_add_table('aset', 'public.testdata');
-select * from londiste.provider_get_table_list('pqueue');
+select * from londiste.node_add_table('pqueue', 'public.testdata');
+select * from londiste.node_add_table('pqueue', 'public.testdata');
+select * from londiste.node_add_table('pset', 'public.testdata_nopk');
-select londiste.provider_remove_table('pqueue', 'public.nonexist');
-select londiste.provider_remove_table('pqueue', 'public.testdata');
+select londiste.node_refresh_trigger('pqueue', 'public.testdata');
-select * from londiste.provider_get_table_list('pqueue');
+select * from londiste.node_get_table_list('pqueue');
+
+select londiste.node_remove_table('pqueue', 'public.nonexist');
+select londiste.node_remove_table('pqueue', 'public.testdata');
+
+select * from londiste.node_get_table_list('pqueue');
--
-- seqs
--
-select * from londiste.provider_get_seq_list('pqueue');
-select londiste.provider_add_seq('pqueue', 'public.no_seq');
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-select * from londiste.provider_get_seq_list('pqueue');
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
-select * from londiste.provider_get_seq_list('pqueue');
+select * from londiste.node_get_seq_list('pqueue');
+select londiste.node_add_seq('pqueue', 'public.no_seq');
+select londiste.node_add_seq('pqueue', 'public.testdata_id_seq');
+select londiste.node_add_seq('pqueue', 'public.testdata_id_seq');
+select * from londiste.node_get_seq_list('pqueue');
+select londiste.node_remove_seq('pqueue', 'public.testdata_id_seq');
+select londiste.node_remove_seq('pqueue', 'public.testdata_id_seq');
+select * from londiste.node_get_seq_list('pqueue');
--
-- linked queue
--
-select londiste.provider_add_table('pqueue', 'public.testdata');
+select londiste.node_add_table('pqueue', 'public.testdata');
insert into londiste.link (source, dest) values ('mqueue', 'pqueue');
-select londiste.provider_add_table('pqueue', 'public.testdata');
-select londiste.provider_remove_table('pqueue', 'public.testdata');
+select londiste.node_add_table('pqueue', 'public.testdata');
+select londiste.node_remove_table('pqueue', 'public.testdata');
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-select londiste.provider_remove_seq('pqueue', 'public.testdata_seq');
+select londiste.node_add_seq('pqueue', 'public.testdata_id_seq');
+select londiste.node_remove_seq('pqueue', 'public.testdata_seq');
--
-- cleanup
delete from londiste.link;
drop table testdata;
drop table testdata_nopk;
-delete from londiste.provider_seq;
-delete from londiste.provider_table;
+delete from londiste.node_seq;
+delete from londiste.node_table;
select pgq.drop_queue('pqueue');
grant usage on schema londiste to public;
-grant select on londiste.provider_table to public;
-grant select on londiste.completed to public;
-grant select on londiste.link to public;
-grant select on londiste.subscriber_table to public;
+grant select on londiste.node_table to public;
+grant select on londiste.node_seq to public;
+grant select on londiste.pending_fkeys to public;
+grant select on londiste.pending_triggers to public;
+-- ----------------------------------------------------------------------
+-- Section: Londiste internals
+--
+-- Londiste storage: tables/seqs/fkeys/triggers/events.
+--
+-- Londiste event types:
+-- I/U/D - ev_data: table update in partial-sql format, ev_extra1: fq table name
+-- I:/U:/D: - ev_data: table update in urlencoded format, ev_extra1: fq table name
+-- add-seq - ev_data: seq name that was added on root
+-- del-seq - ev_data: seq name that was removed on root
+-- add-tbl - ev_data: table name that was added on root
+-- del-tbl - ev_data: table name that was removed on root
+-- seq-values - ev_data: urlencoded fqname:value pairs
+-- ----------------------------------------------------------------------
+create schema londiste;
+
set default_with_oids = 'off';
-create schema londiste;
-create table londiste.provider_table (
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_table
+--
+-- Tables available on root, meaning that events for only
+-- tables specified here can appear in queue.
+--
+-- Columns:
+-- nr - just to have stable order
+-- set_name - which set the table belongs to
+-- table_name - fq table name
+-- ----------------------------------------------------------------------
+create table londiste.set_table (
nr serial not null,
- queue_name text not null,
+ set_name text not null,
table_name text not null,
- trigger_name text,
- primary key (queue_name, table_name)
+ foreign key (set_name) references pgq_set.set_info,
+ primary key (set_name, table_name)
);
-create table londiste.provider_seq (
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_seq
+--
+-- Sequences available on root, meaning that events for only
+-- sequences specified here can appear in queue.
+--
+-- Columns:
+-- nr - just to have stable order
+-- set_name - which set the table belongs to
+-- seq_name - fq seq name
+-- ----------------------------------------------------------------------
+create table londiste.set_seq (
nr serial not null,
- queue_name text not null,
+ set_name text not null,
seq_name text not null,
- primary key (queue_name, seq_name)
-);
-
-create table londiste.completed (
- consumer_id text not null,
- last_tick_id bigint not null,
-
- primary key (consumer_id)
+ foreign key (set_name) references pgq_set.set_info,
+ primary key (set_name, seq_name)
);
-create table londiste.link (
- source text not null,
- dest text not null,
- primary key (source),
- unique (dest)
-);
-create table londiste.subscriber_table (
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_table
+--
+-- Info about attached tables.
+--
+-- Columns:
+-- nr - Dummy number for visual ordering
+-- set_name - Set name
+-- table_name - fully-qualified table name
+-- merge_state - State for tables
+-- trigger_type - trigger type
+-- trigger_name - londiste trigger name
+-- copy_snapshot - remote snapshot for COPY command
+-- custom_tg_args - user-specified
+-- skip_truncate - if 'in-copy' should not do TRUNCATE
+--
+-- Tables merge states:
+-- master - master: all in sync
+-- ok - slave: all in sync
+-- in-copy -
+-- catching-up -
+-- wanna-sync:% -
+-- do-sync:% -
+-- unsynced -
+--
+-- Trigger type:
+-- notrigger - no trigger applied
+-- pgq.logtriga - Partial SQL trigger with fixed column list
+-- pgq.sqltriga - Partial SQL trigger with autodetection
+-- pgq.logutriga - urlenc trigger with autodetection
+-- pgq.denytrigger - deny trigger
+-- ----------------------------------------------------------------------
+create table londiste.node_table (
nr serial not null,
- queue_name text not null,
+ set_name text not null,
table_name text not null,
- snapshot text,
merge_state text,
- trigger_name text,
-
+ custom_snapshot text,
skip_truncate bool,
- primary key (queue_name, table_name)
+ foreign key (set_name, table_name) references londiste.set_table,
+ primary key (set_name, table_name)
);
-create table londiste.subscriber_seq (
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_trigger
+--
+-- Node-specific triggers. When node type changes,
+-- Londiste will make sure unnecessary triggers are
+-- dropped and new triggers created.
+--
+-- Columns:
+-- set_name - set it belongs to
+-- table_name - table name
+-- tg_type - any / root / non-root
+-- tg_name - name for the trigger
+-- tg_def - full statement for trigger creation
+-- ----------------------------------------------------------------------
+create table londiste.node_trigger (
+ set_name text not null,
+ table_name text not null,
+ tg_name text not null,
+ tg_type text not null,
+ tg_def text not null,
+ foreign key (set_name, table_name) references londiste.node_table,
+ primary key (set_name, table_name, tg_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_seq
+--
+-- Info about attached sequences.
+--
+-- Columns:
+-- nr - dummy number for ordering
+-- set_name - which set it belongs to
+-- seq_name - fully-qualified seq name
+-- ----------------------------------------------------------------------
+create table londiste.node_seq (
nr serial not null,
- queue_name text not null,
+ set_name text not null,
seq_name text not null,
-
- primary key (queue_name, seq_name)
+ foreign key (set_name, seq_name) references londiste.set_seq,
+ primary key (set_name, seq_name)
);
-create table londiste.subscriber_pending_fkeys (
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_fkeys
+--
+-- Details on dropped fkeys. Global, not specific to any set.
+--
+-- Columns:
+-- from_table - fully-qualified table name
+-- to_table - fully-qualified table name
+-- fkey_name - name of constraint
+-- fkey_def - full fkey definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_fkeys (
from_table text not null,
to_table text not null,
fkey_name text not null,
primary key (from_table, fkey_name)
);
-create table londiste.subscriber_pending_triggers (
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_triggers
+--
+-- Details on dropped triggers. Global, not specific to any set.
+--
+-- Columns:
+-- table_name - fully-qualified table name
+-- trigger_name - trigger name
+-- trigger_def - full trigger definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_triggers (
table_name text not null,
trigger_name text not null,
trigger_def text not null,
primary key (table_name, trigger_name)
);
+