draft version of pgq_set sql code
authorMarko Kreen <markokr@gmail.com>
Wed, 5 Dec 2007 15:40:06 +0000 (15:40 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 5 Dec 2007 15:40:06 +0000 (15:40 +0000)
19 files changed:
setup.py
sql/Makefile
sql/pgq_set/Makefile [new file with mode: 0644]
sql/pgq_set/expected/pgq_set.out [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.add_member.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.create_node.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.drop_node.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.get_member_info.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.get_node_info.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.remove_member.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.set_global_watermark.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.set_node_uptodate.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.set_partition_watermark.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.subscribe_node.sql [new file with mode: 0644]
sql/pgq_set/functions/pgq_set.unsubscribe_node.sql [new file with mode: 0644]
sql/pgq_set/sql/pgq_set.sql [new file with mode: 0644]
sql/pgq_set/structure/functions.sql [new file with mode: 0644]
sql/pgq_set/structure/pgq_set.sql [new file with mode: 0644]

index 3eadc3dae95ab84ed8bc99cae1fbacaad8b0b8ff..fe8cedb3e41075cd9c3729924aa8711179ac1669 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -18,6 +18,7 @@ share_dup_files = [
    'sql/pgq/pgq.sql',
    'sql/londiste/londiste.sql',
    'sql/pgq_ext/pgq_ext.sql',
+   'sql/pgq_set/pgq_set.sql',
    'sql/logtriga/logtriga.sql',
 ]
 if os.path.isfile('sql/txid/txid.sql'):
index a6545e79349fe26a487b8812df260794a8f0dc8d..5e2d508d346d387edcf618b1e5f3b47fc25961d6 100644 (file)
@@ -1,7 +1,7 @@
 
 include ../config.mak
 
-SUBDIRS = logtriga londiste pgq pgq_ext txid
+SUBDIRS = logtriga londiste pgq pgq_ext pgq_set txid
 
 all install clean distclean installcheck:
        for dir in $(SUBDIRS); do \
diff --git a/sql/pgq_set/Makefile b/sql/pgq_set/Makefile
new file mode 100644 (file)
index 0000000..2776378
--- /dev/null
@@ -0,0 +1,53 @@
+
+DATA_built = pgq_set.sql pgq_set.upgrade.sql
+
+SRCS = $(wildcard structure/*.sql) $(wildcard functions/*.sql)
+
+REGRESS = pgq_set
+REGRESS_OPTS = --load-language=plpgsql
+
+PGXS = $(shell pg_config --pgxs)
+include $(PGXS)
+
+NDOC = NaturalDocs
+NDOCARGS = -r -o html docs/html -p docs -i docs/sql
+CATSQL = ../../scripts/catsql.py
+
+#
+# combined SQL files
+#
+
+pgq_set.sql: $(SRCS)
+       $(CATSQL) structure/pgq_set.sql structure/functions.sql > $@
+
+pgq_set.upgrade.sql: $(SRCS)
+       $(CATSQL) structure/functions.sql > $@
+
+#
+# docs
+#
+dox: cleandox $(SRCS)
+       mkdir -p docs/html
+       mkdir -p docs/sql
+       $(CATSQL) --ndoc structure/pgq_set.sql > docs/sql/pgq_set.sql
+       $(CATSQL) --ndoc structure/functions.sql > docs/sql/functions.sql
+       $(NDOC) $(NDOCARGS)
+
+cleandox:
+       rm -rf docs/html docs/Data docs/sql
+
+clean: cleandox
+
+upload: dox
+       rsync -az --delete docs/html/* data1:public_html/pgq-set/
+
+#
+# regtest shortcuts
+#
+
+test: pgq_set.sql
+       $(MAKE) installcheck || { less regression.diffs; exit 1; }
+
+ack:
+       cp results/*.out expected/
+
diff --git a/sql/pgq_set/expected/pgq_set.out b/sql/pgq_set/expected/pgq_set.out
new file mode 100644 (file)
index 0000000..cfb58c8
--- /dev/null
@@ -0,0 +1,60 @@
+\set ECHO none
+select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false);
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false);
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false);
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false);
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.get_member_info('aset');
+ node_name | node_location | dead 
+-----------+---------------+------
+ node1     | dbname=node1  | f
+ node2     | dbname=node2  | f
+ node3     | dbname=node3  | f
+ node4     | dbname=node4  | f
+(4 rows)
+
+select * from pgq_set.remove_member('aset', 'node4');
+ ret_code | ret_note 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.get_member_info('aset');
+ node_name | node_location | dead 
+-----------+---------------+------
+ node1     | dbname=node1  | f
+ node2     | dbname=node2  | f
+ node3     | dbname=node3  | f
+(3 rows)
+
+select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null);
+ ret_code | ret_desc 
+----------+----------
+      200 | Ok
+(1 row)
+
+select * from pgq_set.get_node_info('aset');
+ node_type | node_name | queue_name | global_watermark | local_watermark | completed_tick | provider_node | paused | resync | up_to_date | combined_set | combined_type | combined_queue 
+-----------+-----------+------------+------------------+-----------------+----------------+---------------+--------+--------+------------+--------------+---------------+----------------
+ root      | node1     | aset       |                1 |                 |                |               | f      | f      | f          |              |               | 
+(1 row)
+
diff --git a/sql/pgq_set/functions/pgq_set.add_member.sql b/sql/pgq_set/functions/pgq_set.add_member.sql
new file mode 100644 (file)
index 0000000..064b65e
--- /dev/null
@@ -0,0 +1,50 @@
+
+create or replace function pgq_set.add_member(
+    in i_set_name text,
+    in i_node_name text,
+    in i_node_location text,
+    in i_dead boolean,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.add_member(3)
+--
+--      Add new set member.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_name - node name
+--      i_node_location - node connect string
+--      i_dead - dead flag for node
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - error description
+--
+-- Return Codes:
+--      200 - Ok
+--      404 - No such set
+-- ----------------------------------------------------------------------
+declare
+    o  record;
+begin
+    select node_location into o
+      from pgq_set.member_info
+     where set_name = i_set_name
+       and node_name = i_node_name;
+    if found then
+        update pgq_set.member_info
+           set node_location = i_node_location,
+               dead = i_dead
+         where set_name = i_set_name
+           and node_name = i_node_name;
+    else
+        insert into pgq_set.member_info (set_name, node_name, node_location, dead)
+        values (i_set_name, i_node_name, i_node_location, i_dead);
+    end if;
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.create_node.sql b/sql/pgq_set/functions/pgq_set.create_node.sql
new file mode 100644 (file)
index 0000000..322dca1
--- /dev/null
@@ -0,0 +1,91 @@
+
+create or replace function pgq_set.create_node(
+    in i_set_name text,
+    in i_node_type text,
+    in i_node_name text,
+    in i_provider_name text,
+    in i_global_watermark bigint,
+    in i_combined_set text,
+    out ret_code int4,
+    out ret_desc text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.create_node(6)
+--
+--      Initialize node.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_type - node type
+--      i_node_name - node name
+--      i_provider_name - provider node name for non-root nodes
+--      i_global_watermark - global lowest tick_id
+--      i_combined_set - merge-leaf: target set
+--
+-- Returns:
+--      desc
+--
+-- Node Types:
+--      root - master node
+--      branch - subscriber node that can be provider to others
+--      leaf - subscriber node that cannot be provider to others
+--      combined-root - root node for combined set
+--      combined-branch - failover node for combined set
+--      merge-leaf - leaf node on partition set that will be merged into combined-root
+-- ----------------------------------------------------------------------
+declare
+    _queue_name text;
+    _wm_consumer text;
+    _global_wm bigint;
+begin
+    if i_node_type in ('root', 'combined-root') then
+        if coalesce(i_provider_name, i_global_watermark::text,
+                    i_combined_set) is not null then
+            raise exception 'unexpected args for %', i_node_type;
+        end if;
+
+        _queue_name := i_set_name;
+        _wm_consumer := i_set_name || '.watermark';
+        perform pgq.create_queue(_queue_name);
+        perform pgq.register_consumer(_queue_name, _wm_consumer);
+        _global_wm := (select last_tick from pgq.get_consumer_info(_queue_name, _wm_consumer));
+    elsif i_node_type in ('branch', 'combined-branch') then
+        if i_provider_name is null then
+            raise exception 'provider not set for %', i_node_type;
+        end if;
+        if i_global_watermark is null then
+            raise exception 'global_watermark not set for %', i_node_type;
+        end if;
+        if i_node_type = 'combined-branch' and i_combined_set is null then
+            raise exception 'combined-set not given for %', i_node_type;
+        end if;
+        _queue_name := i_set_name;
+        _wm_consumer := i_set_name || '.watermark';
+        perform pgq.create_queue(_queue_name);
+        update pgq.queue set queue_external_ticker = true where queue_name = _queue_name;
+        if i_global_watermark > 1 then
+            perform pgq.ticker(_queue_name, i_global_watermark);
+        end if;
+        perform pgq.register_consumer_at(_queue_name, _wm_consumer, i_global_watermark);
+        _global_wm := i_global_watermark;
+    elsif i_node_type in ('leaf', 'merge-leaf') then
+        _queue_name := null;
+        _global_wm := i_global_watermark;
+    end if;
+
+    insert into pgq_set.set_info
+      (set_name, node_type, node_name, queue_name,
+       provider_node, combined_set, global_watermark)
+    values (i_set_name, i_node_type, i_node_name, _queue_name,
+       i_provider_name, i_combined_set, _global_wm);
+
+    if i_node_type not in ('root', 'combined-root') then
+        insert into pgq_set.completed_tick (set_name, tick_id)
+            values (i_set_name, _global_wm);
+    end if;
+
+    select 200, 'Ok' into ret_code, ret_desc;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.drop_node.sql b/sql/pgq_set/functions/pgq_set.drop_node.sql
new file mode 100644 (file)
index 0000000..e32b8e3
--- /dev/null
@@ -0,0 +1,53 @@
+
+create or replace function pgq_set.drop_node(
+    in i_set_name text,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.drop_node(1)
+--
+--      Drop node.
+--
+-- Parameters:
+--      i_set_name - set name
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - error description
+--
+-- Return Codes:
+--      200 - Ok
+--      404 - No such set
+-- ----------------------------------------------------------------------
+declare
+    _queue_name text;
+    _wm_consumer text;
+    _global_wm bigint;
+    sub record;
+begin
+    perform 1 from pgq_set.set_info
+      where set_name = i_set_name;
+    if not found then
+        select 404, 'No such set: ' || i_set_name into ret_code, ret_note;
+        return;
+    end if;
+
+    perform pgq_set.unsubscribe_node(s.set_name, s.node_name)
+       from pgq_set.subscriber_info s
+      where set_name = i_set_name;
+
+    delete from pgq_set.completed_tick
+     where set_name = i_set_name;
+
+    delete from pgq_set.set_info
+     where set_name = i_set_name;
+
+    delete from pgq_set.member_info
+     where set_name = i_set_name;
+
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.get_member_info.sql b/sql/pgq_set/functions/pgq_set.get_member_info.sql
new file mode 100644 (file)
index 0000000..64ba024
--- /dev/null
@@ -0,0 +1,33 @@
+
+create or replace function pgq_set.get_member_info(
+    in i_set_name text,
+
+    out node_name text,
+    out node_location text,
+    out dead boolean
+) returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.get_member_info(1)
+--
+--      Get member list for the set.
+--
+-- Parameters:
+--      i_set_name  - set name
+--
+-- Returns:
+--      node_name       - node name
+--      node_location   - libpq connect string for the node
+--      dead            - whether the node should be considered dead
+-- ----------------------------------------------------------------------
+begin
+    for node_name, node_location, dead in
+        select m.node_name, m.node_location, m.dead
+          from pgq_set.member_info m
+         where m.set_name = i_set_name
+    loop
+        return next;
+    end loop;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.get_node_info.sql b/sql/pgq_set/functions/pgq_set.get_node_info.sql
new file mode 100644 (file)
index 0000000..25ad463
--- /dev/null
@@ -0,0 +1,88 @@
+
+create or replace function pgq_set.get_node_info(
+    in i_set_name text,
+
+    out node_type text,
+    out node_name text,
+    out queue_name text,
+    out global_watermark bigint,
+    out local_watermark bigint,
+    out completed_tick bigint,
+
+    out provider_node text,
+    out provider_location text,
+    out paused boolean,
+    out resync boolean,
+    out up_to_date boolean,
+
+    out combined_set text,
+    out combined_type text,
+    out combined_queue text
+) returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.get_node_info(1)
+--
+--      Get local node info for set.
+--
+-- Parameters:
+--      i_set_name  - set name
+--
+-- Returns:
+--      node_type - local node type
+--      node_name - local node name
+--      queue_name - local queue name used for set
+--      global_watermark - set's global watermark
+--      local_watermark - set's local watermark, for this and below nodes
+--      completed_tick - last committed set's tick
+--      provider_node - provider node name
+--      provider_location - connect string to provider node
+--      paused - this node should not do any work
+--      resync - re-register on provider queue (???)
+--      up_to_date - if consumer has loaded last changes
+--      combined_set - target set name for merge-leaf
+--      combined_type - node type of target set
+--      combined_queue - queue name for target set
+-- ----------------------------------------------------------------------
+declare
+    sql text;
+begin
+    select n.node_type, n.node_name, t.tick_id, n.queue_name,
+           c.set_name, c.node_type, c.queue_name, n.global_watermark,
+           n.provider_node, n.paused, n.resync, n.up_to_date,
+           p.node_location
+      into node_type, node_name, completed_tick, queue_name,
+           combined_set, combined_type, combined_queue, global_watermark,
+           provider_node, paused, resync, up_to_date,
+           provider_location
+      from pgq_set.set_info n
+           left join pgq_set.completed_tick t on (t.set_name = n.set_name)
+           left join pgq_set.set_info c on (c.set_name = n.combined_set)
+           left join pgq_set.member_info p on (p.set_name = n.set_name and p.node_name = n.provider_node)
+      where n.set_name = i_set_name;
+
+    select min(u.tick_id) into local_watermark
+      from (select tick_id
+              from pgq_set.completed_tick
+             where set_name = i_set_name
+            union all
+            select local_watermark as tick_id
+              from pgq_set.subscriber_info
+             where set_name = i_set_name
+            union all
+            -- exclude watermark consumer
+            select last_tick as tick_id
+              from pgq.get_consumer_info(queue_name)
+             where consumer_name <> (i_set_name || '_watermark')
+            ) u;
+    if local_watermark is null and queue_name is not null then
+        select t.tick_id into local_watermark
+          from pgq.tick t, pgq.queue q
+         where t.queue_id = q.queue_id
+           and q.queue_name = queue_name
+         order by 1 desc
+         limit 1;
+    end if;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.remove_member.sql b/sql/pgq_set/functions/pgq_set.remove_member.sql
new file mode 100644 (file)
index 0000000..76d2f47
--- /dev/null
@@ -0,0 +1,34 @@
+
+create or replace function pgq_set.remove_member(
+    in i_set_name text,
+    in i_node_name text,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.remove_member(2)
+--
+--      Add new set member.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_name - node name
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - error description
+--
+-- Return Codes:
+--      200 - Ok
+-- ----------------------------------------------------------------------
+declare
+    o  record;
+begin
+    delete from pgq_set.member_info
+     where set_name = i_set_name
+       and node_name = i_node_name;
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.set_global_watermark.sql b/sql/pgq_set/functions/pgq_set.set_global_watermark.sql
new file mode 100644 (file)
index 0000000..445ca13
--- /dev/null
@@ -0,0 +1,51 @@
+
+create or replace function pgq_set.set_global_watermark(
+    i_set_name text,
+    i_watermark bigint)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_global_watermark(2)
+--
+--      Move global watermark on branch/leaf.
+--
+-- Parameters:
+--      i_set_name     - set name
+--      i_watermark    - global tick_id that is processed everywhere
+--
+-- Returns:
+--      nothing
+-- ----------------------------------------------------------------------
+declare
+    this        record;
+    wm_consumer text;
+begin
+    select node_type, queue_name into this
+        from pgq_set.set_info
+        where set_name = i_set_name
+        for update;
+    if not found then
+        raise exception 'set % not found', i_set_name;
+    end if;
+
+    update pgq_set.set_info
+       set global_watermark = i_watermark
+     where set_name = i_set_name;
+    if not found then
+        raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
+    end if;
+
+    -- move watermark on pgq
+    if this.queue_name is not null then
+        wm_consumer := i_set_name || '.watermark';
+        perform pgq.register_consumer_at(this.queue_name, wm_consumer, i_watermark);
+    end if;
+
+    if this.node_type in ('root', 'combined-root') then
+        perform pgq.insert_event(this.queue_name, 'global-watermark', i_watermark,
+                                 i_set_name, null, null, null);
+    end if;
+    return i_watermark;
+end;
+$$ language plpgsql security definer;
+
+
diff --git a/sql/pgq_set/functions/pgq_set.set_node_uptodate.sql b/sql/pgq_set/functions/pgq_set.set_node_uptodate.sql
new file mode 100644 (file)
index 0000000..ce99083
--- /dev/null
@@ -0,0 +1,29 @@
+
+create or replace function pgq_set.set_node_uptodate(
+    i_set_name text,
+    i_uptodate boolean)
+returns int4 as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_node_uptodate(2)
+--
+--      Set node uptodate flag.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_uptodate - new flag state
+--
+-- Returns:
+--      nothing
+-- ----------------------------------------------------------------------
+begin
+    update pgq_set.set_info
+       set up_to_date = i_uptodate
+     where set_name = i_set_name;
+    if not found then
+        raise exception 'no such set: %', i_set_name;
+    end if;
+    return 1;
+end;
+$$ language plpgsql security definer;
+
+
diff --git a/sql/pgq_set/functions/pgq_set.set_partition_watermark.sql b/sql/pgq_set/functions/pgq_set.set_partition_watermark.sql
new file mode 100644 (file)
index 0000000..e538c12
--- /dev/null
@@ -0,0 +1,47 @@
+
+create or replace function pgq_set.set_partition_watermark(
+    i_combined_set_name text,
+    i_part_set_name text,
+    i_watermark bigint)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_partition_watermark(3)
+--
+--      Move merge-leaf position on combined-branch.
+--
+-- Parameters:
+--      i_combined_set_name - local combined set name
+--      i_part_set_name     - local part set name (merge-leaf)
+--      i_watermark         - partition tick_id that came inside combined-root batch
+--
+-- Returns:
+--      nothing
+-- ----------------------------------------------------------------------
+declare
+    cnode   record;
+    pnode   record;
+begin
+    -- check if combined-branch exists
+    perform 1 from pgq_set.set_info c, pgq_set.set_info p
+        where p.set_name = i_part_set_name
+          and c.set_name = i_combined_set_name
+          and p.combined_set = c.set_name
+          and p.node_type = 'merge-leaf'
+          and c.node_type = 'combined-branch';
+    if not found then
+        raise exception 'combined-branch/merge-leaf pair not found (%/%)', i_combined_set_name, i_part_set_name;
+    end if;
+
+
+    update pgq_set.completed_tick
+       set tick_id = i_watermark
+     where set_name = i_part_set_name;
+    if not found then
+        raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
+    end if;
+
+    return i_watermark;
+end;
+$$ language plpgsql security definer;
+
+
diff --git a/sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql b/sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql
new file mode 100644 (file)
index 0000000..3f4d85f
--- /dev/null
@@ -0,0 +1,56 @@
+
+create or replace function pgq_set.set_subscriber_watermark(
+    i_set_name text,
+    i_node_name text,
+    i_watermark bigint)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_subscriber_watermark(3)
+--
+--      Notify provider about subscribers lowest watermark.  On root
+--      node, changes global_watermark and sends event about that
+--      to the queue.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_node_name - subscriber node name
+--      i_watermark - tick_id
+--
+-- Returns:
+--      nothing
+-- ----------------------------------------------------------------------
+declare
+    m       record;
+    cur_wm  bigint;
+begin
+    select node_type, global_watermark, local_queue
+      into m
+      from pgq_set.local_node
+     where set_name = i_set_name
+       for update;
+    if not found then
+        raise exception 'no such set: %', i_set_name;
+    end if;
+
+    update pgq_set.subscriber_info
+       set local_watermark = i_watermark
+     where set_name = i_set_name
+       and node_name = i_node_name;
+    if not found then
+        raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
+    end if;
+
+    if m.node_type in ('root', 'combined-root') then
+        cur_wm := pgq_set.get_local_watermark(i_set_name);
+        if cur_wm > m.global_watermark then
+            update pgq_set.local_node set global_watermark = cur_wm
+                where set_name = i_set_name;
+            perform pgq.insert_event(m.local_queue, 'global-watermark', cur_wm);
+        end if;
+    end if;
+
+    return i_watermark;
+end;
+$$ language plpgsql security definer;
+
+
diff --git a/sql/pgq_set/functions/pgq_set.subscribe_node.sql b/sql/pgq_set/functions/pgq_set.subscribe_node.sql
new file mode 100644 (file)
index 0000000..15cc539
--- /dev/null
@@ -0,0 +1,47 @@
+
+create or replace function pgq_set.subscribe_node(
+    in i_set_name text,
+    in i_remote_node_name text,
+    in i_remote_worker_name text,
+    out ret_code int4,
+    out ret_note text,
+    out global_watermark bigint)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.subscribe_node(2)
+--
+--      Subscribe remote node to local node.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_remote_node_name - node name
+--      i_remote_worker_name - remote process job_name
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - description
+--      global_watermark - minimal watermark, also subscription pos
+-- ----------------------------------------------------------------------
+declare
+    n record;
+begin
+    select s.node_type, s.global_watermark, s.queue_name into n
+      from pgq_set.set_info s where s.set_name = i_set_name;
+    global_watermark := n.global_watermark;
+
+    if n.node_type in ('leaf', 'merge-leaf') then
+        select 401, 'Cannot subscribe to ' || n.node_type || ' node'
+          into ret_code, ret_note;
+        return;
+    end if;
+
+    perform pgq.register_consumer_at(n.queue_name, i_remote_worker_name, n.global_watermark);
+
+    insert into pgq_set.subscriber_info (set_name, node_name, local_watermark, worker_name)
+    values (i_set_name, i_remote_node_name, n.global_watermark, i_remote_worker_name);
+
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/functions/pgq_set.unsubscribe_node.sql b/sql/pgq_set/functions/pgq_set.unsubscribe_node.sql
new file mode 100644 (file)
index 0000000..73ea4ae
--- /dev/null
@@ -0,0 +1,55 @@
+
+create or replace function pgq_set.unsubscribe_node(
+    in i_set_name text,
+    in i_remote_node_name text,
+    out ret_code int4,
+    out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.unsubscribe_node(2)
+--
+--      Unsubscribe remote node from local node.
+--
+-- Parameters:
+--      i_set_name - set name
+--      i_remote_node_name - node name
+--
+-- Returns:
+--      ret_code - error code
+--      ret_note - description
+-- ----------------------------------------------------------------------
+declare
+    s record;
+    n record;
+begin
+    -- fetch node info
+    select queue_name, node_type into n from pgq_set.set_info
+     where set_name = i_set_name;
+    if not found then
+        select 404, 'No such set: '||i_set_name into ret_code, ret_note;
+        return;
+    end if;
+
+    -- fetch subscription info
+    select node_name, worker_name into s from pgq_set.subscriber_info
+     where set_name = i_set_name and node_name = i_remote_node_name
+       for update;
+    if not found then
+        select 404, 'No such subscriber: '||i_remote_node_name into ret_code, ret_note;
+        return;
+    end if;
+
+    -- unregister from queue
+    perform pgq.unregister_consumer(n.queue_name, s.worker_name);
+
+    -- drop subscription
+    delete from pgq_set.subscriber_info
+     where set_name = i_set_name
+       and node_name = i_remote_node_name;
+
+    -- done
+    select 200, 'Ok' into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_set/sql/pgq_set.sql b/sql/pgq_set/sql/pgq_set.sql
new file mode 100644 (file)
index 0000000..5ef4666
--- /dev/null
@@ -0,0 +1,21 @@
+
+\set ECHO none
+\i ../txid/txid.sql
+\i ../pgq/pgq.sql
+\i ../londiste/londiste.sql
+\i structure/pgq_set.sql
+\i structure/functions.sql
+\set ECHO all
+
+select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false);
+select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false);
+select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false);
+select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false);
+select * from pgq_set.get_member_info('aset');
+
+select * from pgq_set.remove_member('aset', 'node4');
+select * from pgq_set.get_member_info('aset');
+
+select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null);
+
+select * from pgq_set.get_node_info('aset');
diff --git a/sql/pgq_set/structure/functions.sql b/sql/pgq_set/structure/functions.sql
new file mode 100644 (file)
index 0000000..6cf0dac
--- /dev/null
@@ -0,0 +1,22 @@
+
+-- Group: Global Node Map
+\i functions/pgq_set.add_member.sql
+\i functions/pgq_set.remove_member.sql
+\i functions/pgq_set.get_member_info.sql
+
+-- Group: Node manipulation
+\i functions/pgq_set.create_node.sql
+\i functions/pgq_set.drop_node.sql
+\i functions/pgq_set.subscribe_node.sql
+\i functions/pgq_set.unsubscribe_node.sql
+\i functions/pgq_set.set_node_uptodate.sql
+
+-- Group: Node Info
+\i functions/pgq_set.get_node_info.sql
+
+-- Group: Watermark tracking
+\i functions/pgq_set.set_subscriber_watermark.sql
+\i functions/pgq_set.set_global_watermark.sql
+\i functions/pgq_set.set_partition_watermark.sql
+
+
diff --git a/sql/pgq_set/structure/pgq_set.sql b/sql/pgq_set/structure/pgq_set.sql
new file mode 100644 (file)
index 0000000..25f85d5
--- /dev/null
@@ -0,0 +1,112 @@
+
+create schema pgq_set;
+grant usage on schema pgq_set to public;
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.member_info
+--
+--      Static table that just lists all members in set.
+--
+-- Columns:
+--      set_name        - set name
+--      node_name       - node name
+--      node_location   - libpq connect string for connecting to node
+--      online          - whether the node is available
+-- ----------------------------------------------------------------------
+create table pgq_set.member_info (
+    set_name        text not null,
+    node_name       text not null,
+    node_location   text not null,
+    dead            boolean not null default false,
+
+    primary key (set_name, node_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.local_node
+--
+--      Local node info.
+--
+-- Columns:
+--      set_name            - set name
+--      node_type           - local node type
+--      node_name           - local node name
+--      queue_name          - local queue name for set, NULL on leaf
+--      provider_node       - provider node name
+--      combined_set        - on 'merge-leaf' the target combined set name
+--      global_watermark    - set's global watermark, set by root node
+--      paused              - true if worker for this node should sleep
+--      resync              - true if worker for this node needs to re-register itself on provider queue
+--      up_to_date          - true if worker for this node has seen table changes
+--
+-- Node types:
+--      root            - data + batches is generated here
+--      branch          - replicates full queue contents and maybe contains some tables
+--      leaf            - does not replicate queue
+--      combined-root   - data from several partitions is merged here
+--      combined-branch - can take over the role of combined-root
+--      merge-leaf      - this node in part set is linked to combined-root/branch
+-- ----------------------------------------------------------------------
+create table pgq_set.set_info (
+    set_name        text not null primary key,
+    node_type       text not null,
+    node_name       text not null,
+    queue_name      text,
+    provider_node   text,
+    combined_set    text,
+
+    global_watermark bigint not null,
+
+    paused          boolean not null default false,
+    resync          boolean not null default false,
+    up_to_date      boolean not null default false,
+
+    foreign key (set_name, node_name) references pgq_set.member_info,
+    foreign key (set_name, provider_node) references pgq_set.member_info,
+    check (node_type in ('root', 'branch', 'leaf', 'combined-root', 'combined-branch', 'merge-leaf')),
+    check (case when node_type = 'root'              then (queue_name is not null and provider_node is null     and combined_set is null)
+                when node_type = 'branch'            then (queue_name is not null and provider_node is not null and combined_set is null)
+                when node_type = 'leaf'              then (queue_name is null     and provider_node is not null and combined_set is null)
+                when node_type = 'combined-root'     then (queue_name is not null and provider_node is null     and combined_set is null)
+                when node_type = 'combined-branch'   then (queue_name is not null and provider_node is not null and combined_set is null)
+                when node_type = 'merge-leaf'        then (queue_name is null     and provider_node is not null and combined_set is not null)
+                else false end)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.subscriber_info
+--
+--      Contains subscribers for a set.
+--
+-- Columns:
+--      set_name        - set's name
+--      node_name       - node name
+--      worker_name     - consumer_name for node
+--      local_watermark - watermark for node and it's subscribers
+-- ----------------------------------------------------------------------
+create table pgq_set.subscriber_info (
+    set_name        text not null,
+    node_name       text not null,
+    worker_name     text not null,
+    local_watermark bigint not null,
+
+    primary key (set_name, node_name),
+    foreign key (set_name, node_name) references pgq_set.member_info
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.completed_tick
+--
+--      Contains completed tick_id from provider.
+--
+-- Columns:
+--      set_name - set's name
+--      tick_id  - last committed tick id
+-- ----------------------------------------------------------------------
+create table pgq_set.completed_tick (
+    set_name        text not null primary key,
+    tick_id         bigint not null,
+
+    foreign key (set_name) references pgq_set.set_info
+);
+