unfinished londiste reorg, backup commit
authorMarko Kreen <markokr@gmail.com>
Fri, 7 Mar 2008 16:53:06 +0000 (16:53 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 7 Mar 2008 16:53:06 +0000 (16:53 +0000)
42 files changed:
sql/londiste/Makefile
sql/londiste/expected/londiste_provider.out
sql/londiste/functions/londiste.find_column_types.sql
sql/londiste/functions/londiste.find_table_fkeys.sql
sql/londiste/functions/londiste.find_table_oid.sql
sql/londiste/functions/londiste.find_table_triggers.sql
sql/londiste/functions/londiste.handle_fkeys.sql [new file with mode: 0644]
sql/londiste/functions/londiste.handle_triggers.sql [new file with mode: 0644]
sql/londiste/functions/londiste.make_fqname.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_add_seq.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_add_table.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_get_seq_list.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_get_table_list.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_notify_change.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_remove_seq.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_remove_table.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_set_skip_truncate.sql [new file with mode: 0644]
sql/londiste/functions/londiste.node_set_table_state.sql [new file with mode: 0644]
sql/londiste/functions/londiste.provider_add_seq.sql [deleted file]
sql/londiste/functions/londiste.provider_add_table.sql [deleted file]
sql/londiste/functions/londiste.provider_create_trigger.sql [deleted file]
sql/londiste/functions/londiste.provider_get_seq_list.sql [deleted file]
sql/londiste/functions/londiste.provider_get_table_list.sql [deleted file]
sql/londiste/functions/londiste.provider_notify_change.sql [deleted file]
sql/londiste/functions/londiste.provider_refresh_trigger.sql [deleted file]
sql/londiste/functions/londiste.provider_remove_seq.sql [deleted file]
sql/londiste/functions/londiste.provider_remove_table.sql [deleted file]
sql/londiste/functions/londiste.quote_fqname.sql
sql/londiste/functions/londiste.subscriber_add_seq.sql [deleted file]
sql/londiste/functions/londiste.subscriber_add_table.sql [deleted file]
sql/londiste/functions/londiste.subscriber_fkeys_funcs.sql [deleted file]
sql/londiste/functions/londiste.subscriber_get_seq_list.sql [deleted file]
sql/londiste/functions/londiste.subscriber_get_table_list.sql [deleted file]
sql/londiste/functions/londiste.subscriber_remove_seq.sql [deleted file]
sql/londiste/functions/londiste.subscriber_remove_table.sql [deleted file]
sql/londiste/functions/londiste.subscriber_set_skip_truncate.sql [deleted file]
sql/londiste/functions/londiste.subscriber_set_table_state.sql [deleted file]
sql/londiste/functions/londiste.subscriber_trigger_funcs.sql [deleted file]
sql/londiste/sql/londiste_install.sql
sql/londiste/sql/londiste_provider.sql
sql/londiste/structure/grants.sql
sql/londiste/structure/tables.sql

index 49f05beffcfccece8a00a8da5fdf281225d487c5..6d0b00b485640aa5efef048db2e1761803519250 100644 (file)
@@ -2,22 +2,23 @@
 DATA_built = londiste.sql londiste.upgrade.sql
 DOCS = README.londiste
 
-FUNCS = $(wildcard functions/*.sql)
-SRCS = structure/tables.sql structure/grants.sql structure/types.sql $(FUNCS)
+SQLS = structure/tables.sql structure/grants.sql structure/functions.sql
+FUNCS = $(shell sed -n -e '/^\\/{s/\\i //;p}' $(SQLS))
+SRCS = $(SQLS) $(FUNCS)
 
 REGRESS = londiste_install londiste_provider londiste_subscriber londiste_fkeys
 # londiste_denytrigger
-REGRESS_OPTS = --load-language=plpythonu --load-language=plpgsql
+REGRESS_OPTS = --load-language=plpgsql
 
 include ../../config.mak
 
 include $(PGXS)
 
 londiste.sql: $(SRCS)
-       cat $(SRCS) > $@
+       $(CATSQL) $(SQLS) > $@
 
-londiste.upgrade.sql: $(FUNCS)
-       cat $(FUNCS) > $@
+londiste.upgrade.sql: $(SRCS)
+       $(CATSQL) structure/functions.sql > $@
 
 test: londiste.sql
        $(MAKE) installcheck || { less regression.diffs; exit 1; }
@@ -25,3 +26,17 @@ test: londiste.sql
 ack:
        cp results/* expected/
 
+NDOC = NaturalDocs
+NDOCARGS = -r -o html docs/html -p docs -i docs/sql
+CATSQL = ../../scripts/catsql.py
+
+dox: cleandox
+       mkdir -p docs/html
+       mkdir -p docs/sql
+       $(CATSQL) --ndoc structure/tables.sql structure/types.sql > docs/sql/schema.sql
+       $(CATSQL) --ndoc structure/functions.sql > docs/sql/functions.sql
+       $(NDOC) $(NDOCARGS)
+
+cleandox:
+       rm -rf docs/html docs/Data docs/sql
+
index 3ff2a201a9b8fc47cab30531728fadb581a676b2..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,125 +0,0 @@
-set client_min_messages = 'warning';
-\set VERBOSITY 'terse'
---
--- tables
---
-create table testdata (
-    id serial primary key,
-    data text
-);
-create table testdata_nopk (
-    id serial,
-    data text
-);
-select londiste.provider_add_table('pqueue', 'public.testdata_nopk');
-ERROR:  need key column
-select londiste.provider_add_table('pqueue', 'public.testdata');
-ERROR:  no such event queue
-select pgq.create_queue('pqueue');
- create_queue 
---------------
-            1
-(1 row)
-
-select londiste.provider_add_table('pqueue', 'public.testdata');
- provider_add_table 
---------------------
-                  1
-(1 row)
-
-select londiste.provider_add_table('pqueue', 'public.testdata');
-ERROR:  duplicate key violates unique constraint "provider_table_pkey"
-select londiste.provider_refresh_trigger('pqueue', 'public.testdata');
- provider_refresh_trigger 
---------------------------
-                        1
-(1 row)
-
-select * from londiste.provider_get_table_list('pqueue');
-   table_name    | trigger_name  
------------------+---------------
- public.testdata | pqueue_logger
-(1 row)
-
-select londiste.provider_remove_table('pqueue', 'public.nonexist');
-ERROR:  no such table registered
-select londiste.provider_remove_table('pqueue', 'public.testdata');
- provider_remove_table 
------------------------
-                     1
-(1 row)
-
-select * from londiste.provider_get_table_list('pqueue');
- table_name | trigger_name 
-------------+--------------
-(0 rows)
-
---
--- seqs
---
-select * from londiste.provider_get_seq_list('pqueue');
- provider_get_seq_list 
------------------------
-(0 rows)
-
-select londiste.provider_add_seq('pqueue', 'public.no_seq');
-ERROR:  seq not found
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
- provider_add_seq 
-------------------
-                0
-(1 row)
-
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-ERROR:  duplicate key violates unique constraint "provider_seq_pkey"
-select * from londiste.provider_get_seq_list('pqueue');
- provider_get_seq_list  
-------------------------
- public.testdata_id_seq
-(1 row)
-
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
- provider_remove_seq 
----------------------
-                   0
-(1 row)
-
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
-ERROR:  seq not attached
-select * from londiste.provider_get_seq_list('pqueue');
- provider_get_seq_list 
------------------------
-(0 rows)
-
---
--- linked queue
---
-select londiste.provider_add_table('pqueue', 'public.testdata');
- provider_add_table 
---------------------
-                  1
-(1 row)
-
-insert into londiste.link (source, dest) values ('mqueue', 'pqueue');
-select londiste.provider_add_table('pqueue', 'public.testdata');
-ERROR:  Linked queue, manipulation not allowed
-select londiste.provider_remove_table('pqueue', 'public.testdata');
-ERROR:  Linked queue, manipulation not allowed
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-ERROR:  Linked queue, cannot modify
-select londiste.provider_remove_seq('pqueue', 'public.testdata_seq');
-ERROR:  Linked queue, cannot modify
---
--- cleanup
---
-delete from londiste.link;
-drop table testdata;
-drop table testdata_nopk;
-delete from londiste.provider_seq;
-delete from londiste.provider_table;
-select pgq.drop_queue('pqueue');
- drop_queue 
-------------
-          1
-(1 row)
-
index 594bfdfecda593c4e7cb2a92bd3052f90e0dec00..e079b0f19297757645876f0febd13a39ddba052e 100644 (file)
@@ -1,5 +1,16 @@
 create or replace function londiste.find_column_types(tbl text)
 returns text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_column_types(1)
+--
+--      Returns columnt type string for logtriga().
+--
+-- Parameters:
+--      tbl - fqname
+--
+-- Returns:
+--      String of 'kv'.
+-- ----------------------------------------------------------------------
 declare
     res      text;
     col      record;
index 43a9cd87e7a1bf3e1c5eec6e4ec508b4ee193a38..3e4ff9da083fbd11ebe46a0bda1a8e4a3c9394c2 100644 (file)
@@ -1,6 +1,20 @@
 
 create or replace function londiste.find_table_fkeys(i_table_name text)
-returns setof londiste.subscriber_pending_fkeys as $$
+returns setof londiste.pending_fkeys as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_table_fkeys(1)
+--
+--      Return all active fkeys.
+--
+-- Parameters:
+--      i_table_name    - fqname
+--
+-- Returns:
+--      from_table      - fqname
+--      to_table        - fqname
+--      fkey_name       - name
+--      fkey_def        - full def
+-- ----------------------------------------------------------------------
 declare
     fkey      record;
     tbl_oid   oid;
index 4d7da888d9c0c51f099dc923e017991f2bd7b4f6..2888575d264a1ac657fa3bdd65b38b874628a1fc 100644 (file)
@@ -1,49 +1,80 @@
-create or replace function londiste.find_rel_oid(tbl text, kind text)
+
+create or replace function londiste.find_rel_oid(i_fqname text, i_kind text)
 returns oid as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_rel_oid(2)
+--
+--      Find pg_class row oid.
+--
+-- Parameters:
+--      i_fqname    - fq object name
+--      i_kind      - relkind value
+--
+-- Returns:
+--      oid or exception of not found
+-- ----------------------------------------------------------------------
 declare
     res      oid;
     pos      integer;
     schema   text;
     name     text;
 begin
-    pos := position('.' in tbl);
+    pos := position('.' in i_fqname);
     if pos > 0 then
-        schema := substring(tbl for pos - 1);
-        name := substring(tbl from pos + 1);
+        schema := substring(i_fqname for pos - 1);
+        name := substring(i_fqname from pos + 1);
     else
         schema := 'public';
-        name := tbl;
+        name := i_fqname;
     end if;
     select c.oid into res
       from pg_namespace n, pg_class c
      where c.relnamespace = n.oid
-       and c.relkind = kind
+       and c.relkind = i_kind
        and n.nspname = schema and c.relname = name;
     if not found then
-        if kind = 'r' then
-            raise exception 'table not found';
-        elsif kind = 'S' then
-            raise exception 'seq not found';
-        else
-            raise exception 'weird relkind';
-        end if;
+        res := NULL;
     end if;
 
     return res;
 end;
 $$ language plpgsql strict stable;
 
+
 create or replace function londiste.find_table_oid(tbl text)
 returns oid as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_table_oid(1)
+--
+--      Find table oid based on fqname.
+--
+-- Parameters:
+--      tbl - fqname
+--
+-- Returns:
+--      oid
+-- ----------------------------------------------------------------------
 begin
     return londiste.find_rel_oid(tbl, 'r');
 end;
 $$ language plpgsql strict stable;
 
-create or replace function londiste.find_seq_oid(tbl text)
+
+create or replace function londiste.find_seq_oid(seq text)
 returns oid as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_seq_oid(1)
+--
+--      Find sequence oid based on fqname.
+--
+-- Parameters:
+--      seq - fqname
+--
+-- Returns:
+--      oid
+-- ----------------------------------------------------------------------
 begin
-    return londiste.find_rel_oid(tbl, 'S');
+    return londiste.find_rel_oid(seq, 'S');
 end;
 $$ language plpgsql strict stable;
 
index f43c41771504d351d201c5b2e48fca61ab0aa797..d75e5e9d85178398c96da73fd0c26120b9c1d824 100644 (file)
@@ -1,6 +1,19 @@
 
 create or replace function londiste.find_table_triggers(i_table_name text)
-returns setof londiste.subscriber_pending_triggers as $$
+returns setof londiste.pending_triggers as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.find_table_triggers(1)
+--
+--      Returns all existing triggers on table.
+--
+-- Parameters:
+--      i_table_name - table name
+--
+-- Returns:
+--      table_name      - fq table name
+--      trigger_name    - name
+--      trigger_def     - partial def as returned by pg_get_triggerdef()
+-- ----------------------------------------------------------------------
 declare
     tg        record;
 begin
@@ -17,3 +30,4 @@ begin
     return;
 end;
 $$ language plpgsql strict stable;
+
diff --git a/sql/londiste/functions/londiste.handle_fkeys.sql b/sql/londiste/functions/londiste.handle_fkeys.sql
new file mode 100644 (file)
index 0000000..a261117
--- /dev/null
@@ -0,0 +1,126 @@
+
+create or replace function londiste.get_table_pending_fkeys(i_table_name text) 
+returns setof londiste.pending_fkeys as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_table_pending_fkeys(1)
+--
+--      Return dropped fkeys for table.
+--
+-- Parameters:
+--      i_table_name - fqname
+--
+-- Returns:
+--      desc
+-- ----------------------------------------------------------------------
+declare
+    fkeys   record;
+begin
+    for fkeys in
+        select *
+        from londiste.pending_fkeys
+        where from_table = i_table_name or to_table = i_table_name
+        order by 1,2,3
+    loop
+        return next fkeys;
+    end loop;
+    return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.node_get_valid_pending_fkeys(i_set_name text)
+returns setof londiste.pending_fkeys as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_get_valid_pending_fkeys(1)
+--
+--      Returns dropped fkeys where both sides are in sync now.
+--
+-- Parameters:
+--      i_set_name - sets name
+--
+-- Returns:
+--      desc
+-- ----------------------------------------------------------------------
+declare
+    fkeys   record;
+begin
+    for fkeys in
+        select pf.*
+        from londiste.pending_fkeys pf
+             left join londiste.node_table st_from on (st_from.table_name = pf.from_table)
+             left join londiste.node_table st_to on (st_to.table_name = pf.to_table)
+        where (st_from.table_name is null or (st_from.merge_state = 'ok' and st_from.snapshot is null))
+          and (st_to.table_name is null or (st_to.merge_state = 'ok' and st_to.snapshot is null))
+          and (coalesce(st_from.queue_name = i_queue_name, false)
+               or coalesce(st_to.queue_name = i_queue_name, false))
+        order by 1, 2, 3
+    loop
+        return next fkeys;
+    end loop;
+    
+    return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.drop_table_fkey(i_from_table text, i_fkey_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.drop_table_fkey(x)
+--
+--      Drop one fkey, save in pending table.
+-- ----------------------------------------------------------------------
+declare
+    fkey       record;
+begin        
+    select * into fkey
+    from londiste.find_table_fkeys(i_from_table) 
+    where fkey_name = i_fkey_name and from_table = i_from_table;
+    
+    if not found then
+        return 0;
+    end if;
+            
+    insert into londiste.pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
+        
+    execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
+            || ' drop constraint ' || quote_ident(i_fkey_name);
+    
+    return 1;
+end;
+$$ language plpgsql strict;
+
+
+create or replace function londiste.restore_table_fkey(i_from_table text, i_fkey_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.restore_table_fkey(2)
+--
+--      Restore dropped fkey.
+--
+-- Parameters:
+--      i_from_table - source table
+--      i_fkey_name  - fkey name
+--
+-- Returns:
+--      nothing
+-- ----------------------------------------------------------------------
+declare
+    fkey    record;
+begin
+    select * into fkey
+    from londiste.pending_fkeys 
+    where fkey_name = i_fkey_name and from_table = i_from_table;
+    
+    if not found then
+        return 0;
+    end if;
+
+    execute fkey.fkey_def;
+
+    delete from londiste.pending_fkeys where fkey_name = fkey.fkey_name;
+        
+    return 1;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.handle_triggers.sql b/sql/londiste/functions/londiste.handle_triggers.sql
new file mode 100644 (file)
index 0000000..b244f78
--- /dev/null
@@ -0,0 +1,130 @@
+
+create or replace function londiste.get_pending_triggers(i_table_name text)
+returns setof londiste.pending_triggers as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_pending_triggers(1)
+--
+--      Returns dropped triggers for one table.
+--
+-- Parameters:
+--      i_table_name - fqname
+--
+-- Returns:
+--      list of triggers
+-- ----------------------------------------------------------------------
+declare
+    trigger    record;
+begin
+    for trigger in
+        select *
+        from londiste.pending_triggers
+        where table_name = i_table_name
+    loop
+        return next trigger;
+    end loop;
+    
+    return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.drop_table_trigger(i_table_name text, i_trigger_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.drop_table_trigger(2)
+--
+--      Drop one trigger, saves it to pending table.
+-- ----------------------------------------------------------------------
+declare
+    trig_def record;
+begin
+    select * into trig_def
+    from londiste.find_table_triggers(i_table_name)
+    where trigger_name = i_trigger_name;
+    
+    if FOUND is not true then
+        return 0;
+    end if;
+    
+    insert into londiste.pending_triggers(table_name, trigger_name, trigger_def) 
+        values (i_table_name, i_trigger_name, trig_def.trigger_def);
+    
+    execute 'drop trigger ' || i_trigger_name || ' on ' || i_table_name;
+    
+    return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.drop_all_table_triggers(i_table_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.drop_all_table_triggers(1)
+--
+--      Drop all triggers that exist.
+-- ----------------------------------------------------------------------
+declare
+    trigger record;
+begin
+    for trigger in
+        select trigger_name as name
+        from londiste.find_table_triggers(i_table_name)
+    loop
+        perform londiste.drop_table_trigger(i_table_name, trigger.name);
+    end loop;
+    
+    return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.restore_table_trigger(i_table_name text, i_trigger_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.restore_table_trigger(2)
+--
+--      Restore one trigger.
+-- ----------------------------------------------------------------------
+declare
+    trig_def text;
+begin
+    select trigger_def into trig_def
+    from londiste.pending_triggers
+    where (table_name, trigger_name) = (i_table_name, i_trigger_name);
+    
+    if not found then
+        return 0;
+    end if;
+    
+    delete from londiste.pending_triggers 
+    where table_name = i_table_name and trigger_name = i_trigger_name;
+    
+    execute trig_def;
+
+    return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.restore_all_table_triggers(i_table_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.restore_all_table_triggers(1)
+--
+--      Restore all dropped triggers.
+-- ----------------------------------------------------------------------
+declare
+    trigger record;
+begin
+    for trigger in
+        select trigger_name as name
+        from londiste.get_pending_triggers(i_table_name)
+    loop
+        perform londiste.restore_table_trigger(i_table_name, trigger.name);
+    end loop;
+    
+    return 1;
+end;
+$$ language plpgsql;
+
+
diff --git a/sql/londiste/functions/londiste.make_fqname.sql b/sql/londiste/functions/londiste.make_fqname.sql
new file mode 100644 (file)
index 0000000..0d4aa95
--- /dev/null
@@ -0,0 +1,27 @@
+
+create or replace function londiste.make_fqname(i_name text)
+returns text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.make_fqname(1)
+--
+--      Make name to schema-qualified one.
+--
+--      First dot is taken as schema separator.
+--
+--      If schema is missing, 'public' is assumed.
+--
+-- Parameters:
+--      i_name  - object name.
+--
+-- Returns:
+--      Schema qualified name.
+-- ----------------------------------------------------------------------
+begin
+    if position('.' in i_name) > 0 then
+        return i_name;
+    else
+        return 'public.' || i_name;
+    end if;
+end;
+$$ language plpgsql strict immutable;
+
diff --git a/sql/londiste/functions/londiste.node_add_seq.sql b/sql/londiste/functions/londiste.node_add_seq.sql
new file mode 100644 (file)
index 0000000..4156f0b
--- /dev/null
@@ -0,0 +1,51 @@
+
+create or replace function londiste.node_add_seq(
+    in i_set_name text, in i_seq_name text,
+    out ret_code int4, out ret_text text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_add_seq(2)
+--
+--      Register sequence.
+--
+-- Parameters:
+--      i_set_name  - set name
+--      i_seq_name  - seq name
+--
+-- Returns:
+--      200 - OK
+--      400 - Not found
+-- ----------------------------------------------------------------------
+declare
+    fq_seq_name text;
+begin
+    fq_seq_name := londiste.make_fqname(i_seq_name);
+
+    perform 1 from pg_class
+        where oid = londiste.find_seq_oid(fq_seq_name);
+    if not found then
+        select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_text;
+        return;
+    end if;
+
+    perform 1 from londiste.node_seq
+        where set_name = i_set_name and seq_name = fq_seq_name;
+    if found then
+        select 200, 'OK, seqence already added' into ret_code, ret_text;
+        return;
+    end if;
+
+    if pgq_set.is_root(i_set_name) then
+        insert into londiste.set_seq (set_name, seq_name)
+            values (i_set_name, fq_seq_name);
+        perform londiste.node_notify_change(i_set_name, 'add-seq', fq_seq_name);
+    end if;
+
+    insert into londiste.node_seq (set_name, seq_name)
+        values (i_set_name, fq_seq_name);
+
+    select 200, 'OK' into ret_code, ret_text;
+    return;
+end;
+$$ language plpgsql;
+
diff --git a/sql/londiste/functions/londiste.node_add_table.sql b/sql/londiste/functions/londiste.node_add_table.sql
new file mode 100644 (file)
index 0000000..39e097f
--- /dev/null
@@ -0,0 +1,44 @@
+create or replace function londiste.node_add_table(
+    in i_set_name       text,
+    in i_table_name     text,
+    out ret_code        int4,
+    out ret_desc        text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_add_table(x)
+--
+--      Register table on Londiste node.
+--
+-- Returns:
+--      200 - Ok
+--      400 - No such set
+-- ----------------------------------------------------------------------
+declare
+    col_types text;
+    fq_table_name text;
+begin
+    fq_table_name := londiste.make_fqname(i_table_name);
+    col_types := londiste.find_column_types(fq_table_name);
+    if position('k' in col_types) < 1 then
+        raise exception 'need key column';
+    end if;
+
+    perform 1 from pgq_set.set_info where set_name = i_set_name;
+    if not found then
+        select 400, 'No such set: ' || i_set_name into ret_code, ret_desc;
+        return;
+    end if;
+
+    perform 1 from londiste.node_table where set_name = i_set_name and table_name = fq_table_name;
+    if found then
+        select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_desc;
+        return;
+    end if;
+
+    insert into londiste.node_table (set_name, table_name)
+        values (i_set_name, fq_table_name);
+    select 200, 'OK' into ret_code, ret_desc;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.node_get_seq_list.sql b/sql/londiste/functions/londiste.node_get_seq_list.sql
new file mode 100644 (file)
index 0000000..4a72041
--- /dev/null
@@ -0,0 +1,22 @@
+
+create or replace function londiste.provider_get_seq_list(i_set_name text)
+returns setof text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_get_seq_list(x)
+--
+--      Returns registered seqs on this Londiste node.
+-- ----------------------------------------------------------------------
+declare
+    rec record;
+begin
+    for rec in
+        select seq_name from londiste.node_seq
+            where set_name = i_set_name
+            order by nr
+    loop
+        return next rec.seq_name;
+    end loop;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.node_get_table_list.sql b/sql/londiste/functions/londiste.node_get_table_list.sql
new file mode 100644 (file)
index 0000000..500d94b
--- /dev/null
@@ -0,0 +1,29 @@
+
+create or replace function londiste.node_get_table_list(
+    in i_set_name text,
+    out table_name text,
+    out merge_state text,
+    out custom_snapshot text,
+    out skip_truncate bool)
+returns setof record as $$ 
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_get_table_list(1)
+--
+--      Return info about registered tables.
+--
+-- Parameters:
+--      i_set_name - set name
+-- ----------------------------------------------------------------------
+begin 
+    for table_name, merge_state, custom_snapshot, skip_truncate in 
+        select t.table_name, t.merge_state, t.custom_snapshot, t.skip_truncate
+            from londiste.node_table t
+            where t.set_name= i_set_name
+            order by t.nr
+    loop
+        return next;
+    end loop; 
+    return;
+end; 
+$$ language plpgsql strict stable;
+
diff --git a/sql/londiste/functions/londiste.node_notify_change.sql b/sql/londiste/functions/londiste.node_notify_change.sql
new file mode 100644 (file)
index 0000000..85df7d3
--- /dev/null
@@ -0,0 +1,19 @@
+
+create or replace function londiste.node_send_event(i_set_name text, i_ev_type text, i_ev_data text)
+returns integer as $$
+declare
+    que     text;
+begin
+    select s.queue_name into que
+        from pgq_set s
+        where s.set_name = i_set_name;
+    if not found then
+        raise exception 'Unknown set: %', i_set_name;
+    end if;
+
+    perform pgq.insert_event(que, i_ev_data, i_ev_data);
+
+    return 1;
+end;
+$$ language plpgsql;
+
diff --git a/sql/londiste/functions/londiste.node_remove_seq.sql b/sql/londiste/functions/londiste.node_remove_seq.sql
new file mode 100644 (file)
index 0000000..3906ca5
--- /dev/null
@@ -0,0 +1,20 @@
+
+create or replace function londiste.provider_remove_seq(
+    in i_set_name text, in i_seq_name text,
+    out ret_code int4, out ret_desc text)
+as $$
+begin
+    delete from londiste.node_seq
+        where set_name = i_set_name
+          and seq_name = i_seq_name;
+    if not found then
+        select 400, 'Not found: '||i_seq_name into ret_code, ret_desc;
+        return;
+    end if;
+
+    -- perform londiste.provider_notify_change(i_queue_name);
+    select 200, 'OK' into ret_code, ret_desc;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.node_remove_table.sql b/sql/londiste/functions/londiste.node_remove_table.sql
new file mode 100644 (file)
index 0000000..13a5c6a
--- /dev/null
@@ -0,0 +1,21 @@
+
+create or replace function londiste.node_remove_table(
+    in i_set_name text, in i_table_name text,
+    out ret_code int4, out ret_desc text)
+as $$
+begin
+    delete from londiste.node_table
+        where set_name = i_set_name
+          and table_name = i_table_name;
+    if not found then
+        select 400, 'Not found: '||i_table_name into ret_code, ret_desc;
+        return;
+    end if;
+
+    -- perform londiste.provider_notify_change(i_queue_name);
+    -- triggers
+    select 200, 'OK' into ret_code, ret_desc;
+    return;
+end;
+$$ language plpgsql strict;
+
diff --git a/sql/londiste/functions/londiste.node_set_skip_truncate.sql b/sql/londiste/functions/londiste.node_set_skip_truncate.sql
new file mode 100644 (file)
index 0000000..20aae7e
--- /dev/null
@@ -0,0 +1,24 @@
+
+create or replace function londiste.node_set_skip_truncate(
+    i_set_name  text,
+    i_table     text,
+    i_value     bool)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_set_skip_truncate(x)
+--
+--      Change skip_truncate flag for table.
+-- ----------------------------------------------------------------------
+begin
+    update londiste.node_table
+       set skip_truncate = i_value
+     where set_name = i_set_name
+       and table_name = i_table;
+    if not found then
+        raise exception 'table not found';
+    end if;
+
+    return 1;
+end;
+$$ language plpgsql;
+
diff --git a/sql/londiste/functions/londiste.node_set_table_state.sql b/sql/londiste/functions/londiste.node_set_table_state.sql
new file mode 100644 (file)
index 0000000..b0bb8f5
--- /dev/null
@@ -0,0 +1,40 @@
+
+create or replace function londiste.node_set_table_state(
+    i_set_name text,
+    i_table_name text,
+    i_snapshot text,
+    i_merge_state text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.node_set_table_state(4)
+--
+--      Change table state.
+--
+-- Parameters:
+--      i_set_name      - set name
+--      i-table         - table name
+--      i_snapshot      - optional remote snapshot info
+--      i_merge_state   - merge state
+--
+-- Returns:
+--      nothing
+-- ----------------------------------------------------------------------
+begin
+    update londiste.node_table
+        set snapshot = i_snapshot,
+            merge_state = i_merge_state,
+            -- reset skip_snapshot when table is copied over
+            skip_truncate = case when i_merge_state = 'ok'
+                                 then null
+                                 else skip_truncate
+                            end
+      where set_name = i_set_name
+        and table_name = i_table_name;
+    if not found then
+        raise exception 'no such table';
+    end if;
+
+    return 1;
+end;
+$$ language plpgsql;
+
diff --git a/sql/londiste/functions/londiste.provider_add_seq.sql b/sql/londiste/functions/londiste.provider_add_seq.sql
deleted file mode 100644 (file)
index 6658ef6..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-
-create or replace function londiste.provider_add_seq(
-    i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
-    link text;
-begin
-    -- check if linked queue
-    link := londiste.link_source(i_queue_name);
-    if link is not null then
-        raise exception 'Linked queue, cannot modify';
-    end if;
-
-    perform 1 from pg_class
-        where oid = londiste.find_seq_oid(i_seq_name);
-    if not found then
-        raise exception 'seq not found';
-    end if;
-
-    insert into londiste.provider_seq (queue_name, seq_name)
-        values (i_queue_name, i_seq_name);
-    perform londiste.provider_notify_change(i_queue_name);
-
-    return 0;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_add_table.sql b/sql/londiste/functions/londiste.provider_add_table.sql
deleted file mode 100644 (file)
index 1e14c39..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-create or replace function londiste.provider_add_table(
-    i_queue_name    text,
-    i_table_name    text,
-    i_col_types     text
-) returns integer strict as $$
-declare
-    tgname text;
-    sql    text;
-begin
-    if londiste.link_source(i_queue_name) is not null then
-        raise exception 'Linked queue, manipulation not allowed';
-    end if;
-
-    if position('k' in i_col_types) < 1 then
-        raise exception 'need key column';
-    end if;
-    if position('.' in i_table_name) < 1 then
-        raise exception 'need fully-qualified table name';
-    end if;
-    select queue_name into tgname
-        from pgq.queue where queue_name = i_queue_name;
-    if not found then
-        raise exception 'no such event queue';
-    end if;
-
-    tgname := i_queue_name || '_logger';
-    tgname := replace(lower(tgname), '.', '_');
-    insert into londiste.provider_table
-        (queue_name, table_name, trigger_name)
-        values (i_queue_name, i_table_name, tgname);
-
-    perform londiste.provider_create_trigger(
-        i_queue_name, i_table_name, i_col_types);
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.provider_add_table(
-    i_queue_name text,
-    i_table_name text
-) returns integer as $$
-begin
-    return londiste.provider_add_table(i_queue_name, i_table_name,
-        londiste.find_column_types(i_table_name));
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_create_trigger.sql b/sql/londiste/functions/londiste.provider_create_trigger.sql
deleted file mode 100644 (file)
index fc7c9f4..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-
-create or replace function londiste.provider_create_trigger(
-    i_queue_name    text,
-    i_table_name    text,
-    i_col_types     text
-) returns integer strict as $$
-declare
-    tgname text;
-begin
-    select trigger_name into tgname
-        from londiste.provider_table
-        where queue_name = i_queue_name
-          and table_name = i_table_name;
-    if not found then
-        raise exception 'table not found';
-    end if;
-
-    execute 'create trigger ' || tgname
-        || ' after insert or update or delete on '
-        || i_table_name
-        || ' for each row execute procedure pgq.logtriga('
-        || quote_literal(i_queue_name) || ', '
-        || quote_literal(i_col_types) || ', '
-        || quote_literal(i_table_name) || ')';
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_get_seq_list.sql b/sql/londiste/functions/londiste.provider_get_seq_list.sql
deleted file mode 100644 (file)
index 3c053fc..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-
-create or replace function londiste.provider_get_seq_list(i_queue_name text)
-returns setof text as $$
-declare
-    rec record;
-begin
-    for rec in
-        select seq_name from londiste.provider_seq
-            where queue_name = i_queue_name
-            order by nr
-    loop
-        return next rec.seq_name;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_get_table_list.sql b/sql/londiste/functions/londiste.provider_get_table_list.sql
deleted file mode 100644 (file)
index 9627802..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-
-create or replace function londiste.provider_get_table_list(i_queue text)
-returns setof londiste.ret_provider_table_list as $$ 
-declare 
-    rec   londiste.ret_provider_table_list%rowtype; 
-begin 
-    for rec in 
-        select table_name, trigger_name
-            from londiste.provider_table
-            where queue_name = i_queue
-            order by nr
-    loop
-        return next rec;
-    end loop; 
-    return;
-end; 
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_notify_change.sql b/sql/londiste/functions/londiste.provider_notify_change.sql
deleted file mode 100644 (file)
index 2b6576e..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-
-create or replace function londiste.provider_notify_change(i_queue_name text)
-returns integer as $$
-declare
-    res      text;
-    tbl      record;
-begin
-    res := '';
-    for tbl in
-        select table_name from londiste.provider_table
-            where queue_name = i_queue_name
-            order by nr
-    loop
-        if res = '' then
-            res := tbl.table_name;
-        else
-            res := res || ',' || tbl.table_name;
-        end if;
-    end loop;
-    
-    perform pgq.insert_event(i_queue_name, 'T', res);
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_refresh_trigger.sql b/sql/londiste/functions/londiste.provider_refresh_trigger.sql
deleted file mode 100644 (file)
index fe361c1..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-
-create or replace function londiste.provider_refresh_trigger(
-    i_queue_name    text,
-    i_table_name    text,
-    i_col_types     text
-) returns integer strict as $$
-declare
-    t_name   text;
-    tbl_oid  oid;
-begin
-    select trigger_name into t_name
-        from londiste.provider_table
-        where queue_name = i_queue_name
-          and table_name = i_table_name;
-    if not found then
-        raise exception 'table not found';
-    end if;
-
-    tbl_oid := londiste.find_table_oid(i_table_name);
-    perform 1 from pg_trigger
-        where tgrelid = tbl_oid
-          and tgname = t_name;
-    if found then
-        execute 'drop trigger ' || t_name || ' on ' || i_table_name;
-    end if;
-
-    perform londiste.provider_create_trigger(i_queue_name, i_table_name, i_col_types);
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.provider_refresh_trigger(
-    i_queue_name    text,
-    i_table_name    text
-) returns integer strict as $$
-begin
-    return londiste.provider_refresh_trigger(i_queue_name, i_table_name,
-                            londiste.find_column_types(i_table_name));
-end;
-$$ language plpgsql security definer;
-
-
-
diff --git a/sql/londiste/functions/londiste.provider_remove_seq.sql b/sql/londiste/functions/londiste.provider_remove_seq.sql
deleted file mode 100644 (file)
index 47754b8..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-
-create or replace function londiste.provider_remove_seq(
-    i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
-    link text;
-begin
-    -- check if linked queue
-    link := londiste.link_source(i_queue_name);
-    if link is not null then
-        raise exception 'Linked queue, cannot modify';
-    end if;
-
-    delete from londiste.provider_seq
-        where queue_name = i_queue_name
-          and seq_name = i_seq_name;
-    if not found then
-        raise exception 'seq not attached';
-    end if;
-
-    perform londiste.provider_notify_change(i_queue_name);
-
-    return 0;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.provider_remove_table.sql b/sql/londiste/functions/londiste.provider_remove_table.sql
deleted file mode 100644 (file)
index b2148b7..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-
-create or replace function londiste.provider_remove_table(
-    i_queue_name   text,
-    i_table_name   text
-) returns integer as $$
-declare
-    tgname text;
-begin
-    if londiste.link_source(i_queue_name) is not null then
-        raise exception 'Linked queue, manipulation not allowed';
-    end if;
-
-    select trigger_name into tgname from londiste.provider_table
-        where queue_name = i_queue_name
-          and table_name = i_table_name;
-    if not found then
-        raise exception 'no such table registered';
-    end if;
-
-    begin
-        execute 'drop trigger ' || tgname || ' on ' || i_table_name;
-    exception
-        when undefined_table then
-            raise notice 'table % does not exist', i_table_name;
-        when undefined_object then
-            raise notice 'trigger % does not exist on table %', tgname, i_table_name;
-    end;
-
-    delete from londiste.provider_table
-        where queue_name = i_queue_name
-          and table_name = i_table_name;
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
-
index e14168d0e239fe5e462c72ba322d295fe8a38302..1c4cdc0094bc4ca9d865883364e506aa5a17dc01 100644 (file)
@@ -1,6 +1,21 @@
 
 create or replace function londiste.quote_fqname(i_name text)
 returns text as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.quote_fqname(1)
+--
+--      Quete fully-qualified object name for SQL.
+--
+--      First dot is taken as schema separator.
+--
+--      If schema is missing, 'public' is assumed.
+--
+-- Parameters:
+--      i_name  - fully qualified object name.
+--
+-- Returns:
+--      Quoted name.
+-- ----------------------------------------------------------------------
 declare
     res     text;
     pos     integer;
diff --git a/sql/londiste/functions/londiste.subscriber_add_seq.sql b/sql/londiste/functions/londiste.subscriber_add_seq.sql
deleted file mode 100644 (file)
index c144e47..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-
-create or replace function londiste.subscriber_add_seq(
-    i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
-    link text;
-begin
-    insert into londiste.subscriber_seq (queue_name, seq_name)
-        values (i_queue_name, i_seq_name);
-
-    -- update linked queue if needed
-    link := londiste.link_dest(i_queue_name);
-    if link is not null then
-        insert into londiste.provider_seq
-            (queue_name, seq_name)
-        values (link, i_seq_name);
-        perform londiste.provider_notify_change(link);
-    end if;
-
-    return 0;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_add_table.sql b/sql/londiste/functions/londiste.subscriber_add_table.sql
deleted file mode 100644 (file)
index d5a7331..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-
-create or replace function londiste.subscriber_add_table(
-    i_queue_name text, i_table text)
-returns integer as $$
-begin
-    insert into londiste.subscriber_table (queue_name, table_name)
-        values (i_queue_name, i_table);
-
-    -- linked queue is updated, when the table is copied
-
-    return 0;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_fkeys_funcs.sql b/sql/londiste/functions/londiste.subscriber_fkeys_funcs.sql
deleted file mode 100644 (file)
index 3eda8d6..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-
-
-create or replace function londiste.subscriber_get_table_pending_fkeys(i_table_name text) 
-returns setof londiste.subscriber_pending_fkeys as $$
-declare
-    fkeys   record;
-begin
-    for fkeys in
-        select *
-        from londiste.subscriber_pending_fkeys
-        where from_table=i_table_name or to_table=i_table_name
-        order by 1,2,3
-    loop
-        return next fkeys;
-    end loop;
-    
-    return;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_get_queue_valid_pending_fkeys(i_queue_name text)
-returns setof londiste.subscriber_pending_fkeys as $$
-declare
-    fkeys   record;
-begin
-    for fkeys in
-        select pf.*
-        from londiste.subscriber_pending_fkeys pf
-             left join londiste.subscriber_table st_from on (st_from.table_name = pf.from_table)
-             left join londiste.subscriber_table st_to on (st_to.table_name = pf.to_table)
-        where (st_from.table_name is null or (st_from.merge_state = 'ok' and st_from.snapshot is null))
-          and (st_to.table_name is null or (st_to.merge_state = 'ok' and st_to.snapshot is null))
-          and (coalesce(st_from.queue_name = i_queue_name, false)
-               or coalesce(st_to.queue_name = i_queue_name, false))
-        order by 1, 2, 3
-    loop
-        return next fkeys;
-    end loop;
-    
-    return;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_drop_table_fkey(i_from_table text, i_fkey_name text)
-returns integer as $$
-declare
-    fkey       record;
-begin        
-    select * into fkey
-    from londiste.find_table_fkeys(i_from_table) 
-    where fkey_name = i_fkey_name and from_table = i_from_table;
-    
-    if not found then
-        return 0;
-    end if;
-            
-    insert into londiste.subscriber_pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
-        
-    execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
-            || ' drop constraint ' || quote_ident(i_fkey_name);
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_restore_table_fkey(i_from_table text, i_fkey_name text)
-returns integer as $$
-declare
-    fkey    record;
-begin
-    select * into fkey
-    from londiste.subscriber_pending_fkeys 
-    where fkey_name = i_fkey_name and from_table = i_from_table;
-    
-    if not found then
-        return 0;
-    end if;
-    
-    delete from londiste.subscriber_pending_fkeys where fkey_name = fkey.fkey_name;
-        
-    execute fkey.fkey_def;
-        
-    return 1;
-end;
-$$ language plpgsql;
diff --git a/sql/londiste/functions/londiste.subscriber_get_seq_list.sql b/sql/londiste/functions/londiste.subscriber_get_seq_list.sql
deleted file mode 100644 (file)
index 1d218f4..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-
-create or replace function londiste.subscriber_get_seq_list(i_queue_name text)
-returns setof text as $$
-declare
-    rec record;
-begin
-    for rec in
-        select seq_name from londiste.subscriber_seq
-            where queue_name = i_queue_name
-            order by nr
-    loop
-        return next rec.seq_name;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_get_table_list.sql b/sql/londiste/functions/londiste.subscriber_get_table_list.sql
deleted file mode 100644 (file)
index 06f79e8..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-
-create or replace function londiste.subscriber_get_table_list(i_queue_name text)
-returns setof londiste.ret_subscriber_table as $$
-declare
-    rec londiste.ret_subscriber_table%rowtype;
-begin
-    for rec in
-        select table_name, merge_state, snapshot, trigger_name, skip_truncate
-          from londiste.subscriber_table
-         where queue_name = i_queue_name
-         order by nr
-    loop
-        return next rec;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
--- compat
-create or replace function londiste.get_table_state(i_queue text)
-returns setof londiste.subscriber_table as $$
-declare
-    rec londiste.subscriber_table%rowtype;
-begin
-    for rec in
-        select * from londiste.subscriber_table
-            where queue_name = i_queue
-            order by nr
-    loop
-        return next rec;
-    end loop;
-    return;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_remove_seq.sql b/sql/londiste/functions/londiste.subscriber_remove_seq.sql
deleted file mode 100644 (file)
index f8715a4..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-
-create or replace function londiste.subscriber_remove_seq(
-    i_queue_name text, i_seq_name text)
-returns integer as $$
-declare
-    link text;
-begin
-    delete from londiste.subscriber_seq
-        where queue_name = i_queue_name
-          and seq_name = i_seq_name;
-    if not found then
-        raise exception 'no such seq?';
-    end if;
-
-    -- update linked queue if needed
-    link := londiste.link_dest(i_queue_name);
-    if link is not null then
-        delete from londiste.provider_seq
-         where queue_name = link
-           and seq_name = i_seq_name;
-        perform londiste.provider_notify_change(link);
-    end if;
-
-    return 0;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_remove_table.sql b/sql/londiste/functions/londiste.subscriber_remove_table.sql
deleted file mode 100644 (file)
index 23f2124..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-
-create or replace function londiste.subscriber_remove_table(
-    i_queue_name text, i_table text)
-returns integer as $$
-declare
-    link  text;
-begin
-    delete from londiste.subscriber_table
-     where queue_name = i_queue_name
-       and table_name = i_table;
-    if not found then
-        raise exception 'no such table';
-    end if;
-
-    -- sync link
-    link := londiste.link_dest(i_queue_name);
-    if link is not null then
-        delete from londiste.provider_table
-            where queue_name = link
-              and table_name = i_table;
-        perform londiste.provider_notify_change(link);
-    end if;
-
-    return 0;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_set_skip_truncate.sql b/sql/londiste/functions/londiste.subscriber_set_skip_truncate.sql
deleted file mode 100644 (file)
index cddebff..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-
-create or replace function londiste.subscriber_set_skip_truncate(
-    i_queue text,
-    i_table text,
-    i_value bool)
-returns integer as $$
-begin
-    update londiste.subscriber_table
-       set skip_truncate = i_value
-     where queue_name = i_queue
-       and table_name = i_table;
-    if not found then
-        raise exception 'table not found';
-    end if;
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
diff --git a/sql/londiste/functions/londiste.subscriber_set_table_state.sql b/sql/londiste/functions/londiste.subscriber_set_table_state.sql
deleted file mode 100644 (file)
index cf161f0..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-
-create or replace function londiste.subscriber_set_table_state(
-    i_queue_name text,
-    i_table_name text,
-    i_snapshot text,
-    i_merge_state text)
-returns integer as $$
-declare
-    link  text;
-    ok    integer;
-begin
-    update londiste.subscriber_table
-        set snapshot = i_snapshot,
-            merge_state = i_merge_state,
-            -- reset skip_snapshot when table is copied over
-            skip_truncate = case when i_merge_state = 'ok'
-                                 then null
-                                 else skip_truncate
-                            end
-      where queue_name = i_queue_name
-        and table_name = i_table_name;
-    if not found then
-        raise exception 'no such table';
-    end if;
-
-    -- sync link state also
-    link := londiste.link_dest(i_queue_name);
-    if link then
-        select * from londiste.provider_table
-            where queue_name = linkdst
-              and table_name = i_table_name;
-        if found then
-            if i_merge_state is null or i_merge_state <> 'ok' then
-                delete from londiste.provider_table
-                 where queue_name = link
-                   and table_name = i_table_name;
-                perform londiste.notify_change(link);
-            end if;
-        else
-            if i_merge_state = 'ok' then
-                insert into londiste.provider_table (queue_name, table_name)
-                    values (link, i_table_name);
-                perform londiste.notify_change(link);
-            end if;
-        end if;
-    end if;
-
-    return 1;
-end;
-$$ language plpgsql security definer;
-
-create or replace function londiste.set_table_state(
-    i_queue_name text,
-    i_table_name text,
-    i_snapshot text,
-    i_merge_state text)
-returns integer as $$
-begin
-    return londiste.subscriber_set_table_state(i_queue_name, i_table_name, i_snapshot, i_merge_state);
-end;
-$$ language plpgsql security definer;
-
-
diff --git a/sql/londiste/functions/londiste.subscriber_trigger_funcs.sql b/sql/londiste/functions/londiste.subscriber_trigger_funcs.sql
deleted file mode 100644 (file)
index 5930a0f..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-
-create or replace function londiste.subscriber_get_table_pending_triggers(i_table_name text)
-returns setof londiste.subscriber_pending_triggers as $$
-declare
-    trigger    record;
-begin
-    for trigger in
-        select *
-        from londiste.subscriber_pending_triggers
-        where table_name = i_table_name
-    loop
-        return next trigger;
-    end loop;
-    
-    return;
-end;
-$$ language plpgsql strict stable;
-
-
-create or replace function londiste.subscriber_drop_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
-declare
-    trig_def record;
-begin
-    select * into trig_def
-    from londiste.find_table_triggers(i_table_name)
-    where trigger_name = i_trigger_name;
-    
-    if FOUND is not true then
-        return 0;
-    end if;
-    
-    insert into londiste.subscriber_pending_triggers(table_name, trigger_name, trigger_def) 
-        values (i_table_name, i_trigger_name, trig_def.trigger_def);
-    
-    execute 'drop trigger ' || i_trigger_name || ' on ' || i_table_name;
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_drop_all_table_triggers(i_table_name text)
-returns integer as $$
-declare
-    trigger record;
-begin
-    for trigger in
-        select trigger_name as name
-        from londiste.find_table_triggers(i_table_name)
-    loop
-        perform londiste.subscriber_drop_table_trigger(i_table_name, trigger.name);
-    end loop;
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_restore_table_trigger(i_table_name text, i_trigger_name text)
-returns integer as $$
-declare
-    trig_def text;
-begin
-    select trigger_def into trig_def
-    from londiste.subscriber_pending_triggers
-    where (table_name, trigger_name) = (i_table_name, i_trigger_name);
-    
-    if not found then
-        return 0;
-    end if;
-    
-    delete from londiste.subscriber_pending_triggers 
-    where table_name = i_table_name and trigger_name = i_trigger_name;
-    
-    execute trig_def;
-
-    return 1;
-end;
-$$ language plpgsql;
-
-
-create or replace function londiste.subscriber_restore_all_table_triggers(i_table_name text)
-returns integer as $$
-declare
-    trigger record;
-begin
-    for trigger in
-        select trigger_name as name
-        from londiste.subscriber_get_table_pending_triggers(i_table_name)
-    loop
-        perform londiste.subscriber_restore_table_trigger(i_table_name, trigger.name);
-    end loop;
-    
-    return 1;
-end;
-$$ language plpgsql;
-
-
index 4637659f3b3e4cbc9751ccc9369c3ef53136e2df..9bd55963152f2802a267ef0f43eb69cb2a495f3a 100644 (file)
@@ -2,7 +2,7 @@
 set log_error_verbosity = 'terse';
 \i ../txid/txid.sql
 \i ../pgq/pgq.sql
-\i ../logtriga/logtriga.sql
+\i ../pgq_set/pgq_set.sql
 \i londiste.sql
 \set ECHO all
 
index 87a143302cf7e7c615588286668295a3c51c75bb..49b0cf55247c6594a97d0db3c96ea2cee3d7df0f 100644 (file)
@@ -14,47 +14,52 @@ create table testdata_nopk (
     data text
 );
 
-select londiste.provider_add_table('pqueue', 'public.testdata_nopk');
-select londiste.provider_add_table('pqueue', 'public.testdata');
+select current_database();
 
-select pgq.create_queue('pqueue');
-select londiste.provider_add_table('pqueue', 'public.testdata');
-select londiste.provider_add_table('pqueue', 'public.testdata');
+select * from pgq_set.add_member('aset', 'rnode', 'dbname=db', false);
+select * from pgq_set.create_node('aset', 'root', 'rnode', 'londiste_root', null::text, null::int8, null::text);
 
-select londiste.provider_refresh_trigger('pqueue', 'public.testdata');
+select * from londiste.node_add_table('aset', 'public.testdata_nopk');
+select * from londiste.node_add_table('aset', 'public.testdata');
 
-select * from londiste.provider_get_table_list('pqueue');
+select * from londiste.node_add_table('pqueue', 'public.testdata');
+select * from londiste.node_add_table('pqueue', 'public.testdata');
+select * from londiste.node_add_table('pset', 'public.testdata_nopk');
 
-select londiste.provider_remove_table('pqueue', 'public.nonexist');
-select londiste.provider_remove_table('pqueue', 'public.testdata');
+select londiste.node_refresh_trigger('pqueue', 'public.testdata');
 
-select * from londiste.provider_get_table_list('pqueue');
+select * from londiste.node_get_table_list('pqueue');
+
+select londiste.node_remove_table('pqueue', 'public.nonexist');
+select londiste.node_remove_table('pqueue', 'public.testdata');
+
+select * from londiste.node_get_table_list('pqueue');
 
 --
 -- seqs
 --
 
-select * from londiste.provider_get_seq_list('pqueue');
-select londiste.provider_add_seq('pqueue', 'public.no_seq');
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-select * from londiste.provider_get_seq_list('pqueue');
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
-select londiste.provider_remove_seq('pqueue', 'public.testdata_id_seq');
-select * from londiste.provider_get_seq_list('pqueue');
+select * from londiste.node_get_seq_list('pqueue');
+select londiste.node_add_seq('pqueue', 'public.no_seq');
+select londiste.node_add_seq('pqueue', 'public.testdata_id_seq');
+select londiste.node_add_seq('pqueue', 'public.testdata_id_seq');
+select * from londiste.node_get_seq_list('pqueue');
+select londiste.node_remove_seq('pqueue', 'public.testdata_id_seq');
+select londiste.node_remove_seq('pqueue', 'public.testdata_id_seq');
+select * from londiste.node_get_seq_list('pqueue');
 
 --
 -- linked queue
 --
-select londiste.provider_add_table('pqueue', 'public.testdata');
+select londiste.node_add_table('pqueue', 'public.testdata');
 insert into londiste.link (source, dest) values ('mqueue', 'pqueue');
 
 
-select londiste.provider_add_table('pqueue', 'public.testdata');
-select londiste.provider_remove_table('pqueue', 'public.testdata');
+select londiste.node_add_table('pqueue', 'public.testdata');
+select londiste.node_remove_table('pqueue', 'public.testdata');
 
-select londiste.provider_add_seq('pqueue', 'public.testdata_id_seq');
-select londiste.provider_remove_seq('pqueue', 'public.testdata_seq');
+select londiste.node_add_seq('pqueue', 'public.testdata_id_seq');
+select londiste.node_remove_seq('pqueue', 'public.testdata_seq');
 
 --
 -- cleanup
@@ -63,7 +68,7 @@ select londiste.provider_remove_seq('pqueue', 'public.testdata_seq');
 delete from londiste.link;
 drop table testdata;
 drop table testdata_nopk;
-delete from londiste.provider_seq;
-delete from londiste.provider_table;
+delete from londiste.node_seq;
+delete from londiste.node_table;
 select pgq.drop_queue('pqueue');
 
index 8ca944384b3ce43c005a905783e962bffb482ec3..a68321980ef11729153bf90d51a7ff269bd84e3b 100644 (file)
@@ -1,7 +1,7 @@
 
 grant usage on schema londiste to public;
-grant select on londiste.provider_table to public;
-grant select on londiste.completed to public;
-grant select on londiste.link to public;
-grant select on londiste.subscriber_table to public;
+grant select on londiste.node_table to public;
+grant select on londiste.node_seq to public;
+grant select on londiste.pending_fkeys to public;
+grant select on londiste.pending_triggers to public;
 
index e03af5185455fcca1ba2f6600ec6d1b323eba8db..3b2e85335f1fea63bca08fdcba4f1692a2e7b411 100644 (file)
+-- ----------------------------------------------------------------------
+-- Section: Londiste internals
+--
+--      Londiste storage: tables/seqs/fkeys/triggers/events.
+--
+-- Londiste event types:
+--      I/U/D       - ev_data: table update in partial-sql format, ev_extra1: fq table name
+--      I:/U:/D:    - ev_data: table update in urlencoded format, ev_extra1: fq table name
+--      add-seq     - ev_data: seq name that was added on root
+--      del-seq     - ev_data: seq name that was removed on root
+--      add-tbl     - ev_data: table name that was added on root
+--      del-tbl     - ev_data: table name that was removed on root
+--      seq-values  - ev_data: urlencoded fqname:value pairs
+-- ----------------------------------------------------------------------
+create schema londiste;
+
 set default_with_oids = 'off';
 
-create schema londiste;
 
-create table londiste.provider_table (
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_table
+--
+--      Tables available on root, meaning that events for only
+--      tables specified here can appear in queue.
+--
+-- Columns:
+--      nr          - just to have stable order
+--      set_name    - which set the table belongs to
+--      table_name  - fq table name
+-- ----------------------------------------------------------------------
+create table londiste.set_table (
     nr                  serial not null,
-    queue_name          text not null,
+    set_name            text not null,
     table_name          text not null,
-    trigger_name        text,
-    primary key (queue_name, table_name)
+    foreign key (set_name) references pgq_set.set_info,
+    primary key (set_name, table_name)
 );
 
-create table londiste.provider_seq (
+-- ----------------------------------------------------------------------
+-- Table: londiste.set_seq
+--
+--      Sequences available on root, meaning that events for only
+--      sequences specified here can appear in queue.
+--
+-- Columns:
+--      nr          - just to have stable order
+--      set_name    - which set the table belongs to
+--      seq_name    - fq seq name
+-- ----------------------------------------------------------------------
+create table londiste.set_seq (
     nr                  serial not null,
-    queue_name          text not null,
+    set_name            text not null,
     seq_name            text not null,
-    primary key (queue_name, seq_name)
-);
-
-create table londiste.completed (
-    consumer_id     text not null,
-    last_tick_id    bigint not null,
-
-    primary key (consumer_id)
+    foreign key (set_name) references pgq_set.set_info,
+    primary key (set_name, seq_name)
 );
 
-create table londiste.link (
-    source    text not null,
-    dest      text not null,
-    primary key (source),
-    unique (dest)
-);
 
-create table londiste.subscriber_table (
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_table
+--
+--      Info about attached tables.
+--
+-- Columns:
+--      nr              - Dummy number for visual ordering
+--      set_name        - Set name
+--      table_name      - fully-qualified table name
+--      merge_state     - State for tables
+--      trigger_type    - trigger type
+--      trigger_name    - londiste trigger name
+--      copy_snapshot   - remote snapshot for COPY command
+--      custom_tg_args  - user-specified 
+--      skip_truncate   - if 'in-copy' should not do TRUNCATE
+--
+-- Tables merge states:
+--      master          - master: all in sync
+--      ok              - slave: all in sync
+--      in-copy         -
+--      catching-up     -
+--      wanna-sync:%    -
+--      do-sync:%       -
+--      unsynced        -
+--
+-- Trigger type:
+--      notrigger       - no trigger applied
+--      pgq.logtriga    - Partial SQL trigger with fixed column list
+--      pgq.sqltriga    - Partial SQL trigger with autodetection
+--      pgq.logutriga   - urlenc trigger with autodetection
+--      pgq.denytrigger - deny trigger
+-- ----------------------------------------------------------------------
+create table londiste.node_table (
     nr                  serial not null,
-    queue_name          text not null,
+    set_name            text not null,
     table_name          text not null,
-    snapshot            text,
     merge_state         text,
-    trigger_name        text,
-
+    custom_snapshot     text,
     skip_truncate       bool,
 
-    primary key (queue_name, table_name)
+    foreign key (set_name, table_name) references londiste.set_table,
+    primary key (set_name, table_name)
 );
 
-create table londiste.subscriber_seq (
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_trigger
+--
+--      Node-specific triggers.  When node type changes,
+--      Londiste will make sure unnecessary triggers are
+--      dropped and new triggers created.
+--
+-- Columns:
+--      set_name        - set it belongs to
+--      table_name      - table name
+--      tg_type         - any / root / non-root
+--      tg_name         - name for the trigger
+--      tg_def          - full statement for trigger creation
+-- ----------------------------------------------------------------------
+create table londiste.node_trigger (
+    set_name            text not null,
+    table_name          text not null,
+    tg_name             text not null,
+    tg_type             text not null,
+    tg_def              text not null,
+    foreign key (set_name, table_name) references londiste.node_table,
+    primary key (set_name, table_name, tg_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.node_seq
+--
+--      Info about attached sequences.
+--
+-- Columns:
+--      nr              - dummy number for ordering
+--      set_name        - which set it belongs to
+--      seq_name        - fully-qualified seq name
+-- ----------------------------------------------------------------------
+create table londiste.node_seq (
     nr                  serial not null,
-    queue_name          text not null,
+    set_name            text not null,
     seq_name            text not null,
-
-    primary key (queue_name, seq_name)
+    foreign key (set_name, seq_name) references londiste.set_seq,
+    primary key (set_name, seq_name)
 );
 
-create table londiste.subscriber_pending_fkeys (
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_fkeys
+--
+--      Details on dropped fkeys.  Global, not specific to any set.
+--
+-- Columns:
+--      from_table      - fully-qualified table name
+--      to_table        - fully-qualified table name
+--      fkey_name       - name of constraint
+--      fkey_def        - full fkey definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_fkeys (
     from_table          text not null,
     to_table            text not null,
     fkey_name           text not null,
@@ -61,10 +164,22 @@ create table londiste.subscriber_pending_fkeys (
     primary key (from_table, fkey_name)
 );
 
-create table londiste.subscriber_pending_triggers (
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_triggers
+--
+--      Details on dropped triggers.  Global, not specific to any set.
+--
+-- Columns:
+--      table_name      - fully-qualified table name
+--      trigger_name    - trigger name
+--      trigger_def     - full trigger definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_triggers (
     table_name          text not null,
     trigger_name        text not null,
     trigger_def         text not null,
     
     primary key (table_name, trigger_name)
 );
+