'sql/pgq/pgq.sql',
'sql/londiste/londiste.sql',
'sql/pgq_ext/pgq_ext.sql',
+ 'sql/pgq_set/pgq_set.sql',
'sql/logtriga/logtriga.sql',
]
if os.path.isfile('sql/txid/txid.sql'):
include ../config.mak
-SUBDIRS = logtriga londiste pgq pgq_ext txid
+SUBDIRS = logtriga londiste pgq pgq_ext pgq_set txid
all install clean distclean installcheck:
for dir in $(SUBDIRS); do \
--- /dev/null
+
+DATA_built = pgq_set.sql pgq_set.upgrade.sql
+
+SRCS = $(wildcard structure/*.sql) $(wildcard functions/*.sql)
+
+REGRESS = pgq_set
+REGRESS_OPTS = --load-language=plpgsql
+
+PGXS = $(shell pg_config --pgxs)
+include $(PGXS)
+
+NDOC = NaturalDocs
+NDOCARGS = -r -o html docs/html -p docs -i docs/sql
+CATSQL = ../../scripts/catsql.py
+
+#
+# combined SQL files
+#
+
+pgq_set.sql: $(SRCS)
+ $(CATSQL) structure/pgq_set.sql structure/functions.sql > $@
+
+pgq_set.upgrade.sql: $(SRCS)
+ $(CATSQL) structure/functions.sql > $@
+
+#
+# docs
+#
+dox: cleandox $(SRCS)
+ mkdir -p docs/html
+ mkdir -p docs/sql
+ $(CATSQL) --ndoc structure/pgq_set.sql > docs/sql/pgq_set.sql
+ $(CATSQL) --ndoc structure/functions.sql > docs/sql/functions.sql
+ $(NDOC) $(NDOCARGS)
+
+cleandox:
+ rm -rf docs/html docs/Data docs/sql
+
+clean: cleandox
+
+upload: dox
+ rsync -az --delete docs/html/* data1:public_html/pgq-set/
+
+#
+# regtest shortcuts
+#
+
+test: pgq_set.sql
+ $(MAKE) installcheck || { less regression.diffs; exit 1; }
+
+ack:
+ cp results/*.out expected/
+
--- /dev/null
+\set ECHO none
+select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false);
+ ret_code | ret_note
+----------+----------
+ 200 | Ok
+(1 row)
+
+select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false);
+ ret_code | ret_note
+----------+----------
+ 200 | Ok
+(1 row)
+
+select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false);
+ ret_code | ret_note
+----------+----------
+ 200 | Ok
+(1 row)
+
+select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false);
+ ret_code | ret_note
+----------+----------
+ 200 | Ok
+(1 row)
+
+select * from pgq_set.get_member_info('aset');
+ node_name | node_location | dead
+-----------+---------------+------
+ node1 | dbname=node1 | f
+ node2 | dbname=node2 | f
+ node3 | dbname=node3 | f
+ node4 | dbname=node4 | f
+(4 rows)
+
+select * from pgq_set.remove_member('aset', 'node4');
+ ret_code | ret_note
+----------+----------
+ 200 | Ok
+(1 row)
+
+select * from pgq_set.get_member_info('aset');
+ node_name | node_location | dead
+-----------+---------------+------
+ node1 | dbname=node1 | f
+ node2 | dbname=node2 | f
+ node3 | dbname=node3 | f
+(3 rows)
+
+select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null);
+ ret_code | ret_desc
+----------+----------
+ 200 | Ok
+(1 row)
+
+select * from pgq_set.get_node_info('aset');
+ node_type | node_name | queue_name | global_watermark | local_watermark | completed_tick | provider_node | paused | resync | up_to_date | combined_set | combined_type | combined_queue
+-----------+-----------+------------+------------------+-----------------+----------------+---------------+--------+--------+------------+--------------+---------------+----------------
+ root | node1 | aset | 1 | | | | f | f | f | | |
+(1 row)
+
--- /dev/null
+
+create or replace function pgq_set.add_member(
+ in i_set_name text,
+ in i_node_name text,
+ in i_node_location text,
+ in i_dead boolean,
+ out ret_code int4,
+ out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.add_member(3)
+--
+-- Add new set member.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_node_name - node name
+-- i_node_location - node connect string
+-- i_dead - dead flag for node
+--
+-- Returns:
+-- ret_code - error code
+-- ret_note - error description
+--
+-- Return Codes:
+-- 200 - Ok
+-- 404 - No such set
+-- ----------------------------------------------------------------------
+declare
+ o record;
+begin
+ select node_location into o
+ from pgq_set.member_info
+ where set_name = i_set_name
+ and node_name = i_node_name;
+ if found then
+ update pgq_set.member_info
+ set node_location = i_node_location,
+ dead = i_dead
+ where set_name = i_set_name
+ and node_name = i_node_name;
+ else
+ insert into pgq_set.member_info (set_name, node_name, node_location, dead)
+ values (i_set_name, i_node_name, i_node_location, i_dead);
+ end if;
+ select 200, 'Ok' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.create_node(
+ in i_set_name text,
+ in i_node_type text,
+ in i_node_name text,
+ in i_provider_name text,
+ in i_global_watermark bigint,
+ in i_combined_set text,
+ out ret_code int4,
+ out ret_desc text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.create_node(6)
+--
+-- Initialize node.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_node_type - node type
+-- i_node_name - node name
+-- i_provider_name - provider node name for non-root nodes
+-- i_global_watermark - global lowest tick_id
+-- i_combined_set - merge-leaf: target set
+--
+-- Returns:
+-- desc
+--
+-- Node Types:
+-- root - master node
+-- branch - subscriber node that can be provider to others
+-- leaf - subscriber node that cannot be provider to others
+-- combined-root - root node for combined set
+-- combined-branch - failover node for combined set
+-- merge-leaf - leaf node on partition set that will be merged into combined-root
+-- ----------------------------------------------------------------------
+declare
+ _queue_name text;
+ _wm_consumer text;
+ _global_wm bigint;
+begin
+ if i_node_type in ('root', 'combined-root') then
+ if coalesce(i_provider_name, i_global_watermark::text,
+ i_combined_set) is not null then
+ raise exception 'unexpected args for %', i_node_type;
+ end if;
+
+ _queue_name := i_set_name;
+ _wm_consumer := i_set_name || '.watermark';
+ perform pgq.create_queue(_queue_name);
+ perform pgq.register_consumer(_queue_name, _wm_consumer);
+ _global_wm := (select last_tick from pgq.get_consumer_info(_queue_name, _wm_consumer));
+ elsif i_node_type in ('branch', 'combined-branch') then
+ if i_provider_name is null then
+ raise exception 'provider not set for %', i_node_type;
+ end if;
+ if i_global_watermark is null then
+ raise exception 'global_watermark not set for %', i_node_type;
+ end if;
+ if i_node_type = 'combined-branch' and i_combined_set is null then
+ raise exception 'combined-set not given for %', i_node_type;
+ end if;
+ _queue_name := i_set_name;
+ _wm_consumer := i_set_name || '.watermark';
+ perform pgq.create_queue(_queue_name);
+ update pgq.queue set queue_external_ticker = true where queue_name = _queue_name;
+ if i_global_watermark > 1 then
+ perform pgq.ticker(_queue_name, i_global_watermark);
+ end if;
+ perform pgq.register_consumer_at(_queue_name, _wm_consumer, i_global_watermark);
+ _global_wm := i_global_watermark;
+ elsif i_node_type in ('leaf', 'merge-leaf') then
+ _queue_name := null;
+ _global_wm := i_global_watermark;
+ end if;
+
+ insert into pgq_set.set_info
+ (set_name, node_type, node_name, queue_name,
+ provider_node, combined_set, global_watermark)
+ values (i_set_name, i_node_type, i_node_name, _queue_name,
+ i_provider_name, i_combined_set, _global_wm);
+
+ if i_node_type not in ('root', 'combined-root') then
+ insert into pgq_set.completed_tick (set_name, tick_id)
+ values (i_set_name, _global_wm);
+ end if;
+
+ select 200, 'Ok' into ret_code, ret_desc;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.drop_node(
+ in i_set_name text,
+ out ret_code int4,
+ out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.drop_node(1)
+--
+-- Drop node.
+--
+-- Parameters:
+-- i_set_name - set name
+--
+-- Returns:
+-- ret_code - error code
+-- ret_note - error description
+--
+-- Return Codes:
+-- 200 - Ok
+-- 404 - No such set
+-- ----------------------------------------------------------------------
+declare
+ _queue_name text;
+ _wm_consumer text;
+ _global_wm bigint;
+ sub record;
+begin
+ perform 1 from pgq_set.set_info
+ where set_name = i_set_name;
+ if not found then
+ select 404, 'No such set: ' || i_set_name into ret_code, ret_note;
+ return;
+ end if;
+
+ perform pgq_set.unsubscribe_node(s.set_name, s.node_name)
+ from pgq_set.subscriber_info s
+ where set_name = i_set_name;
+
+ delete from pgq_set.completed_tick
+ where set_name = i_set_name;
+
+ delete from pgq_set.set_info
+ where set_name = i_set_name;
+
+ delete from pgq_set.member_info
+ where set_name = i_set_name;
+
+ select 200, 'Ok' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.get_member_info(
+ in i_set_name text,
+
+ out node_name text,
+ out node_location text,
+ out dead boolean
+) returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.get_member_info(1)
+--
+-- Get member list for the set.
+--
+-- Parameters:
+-- i_set_name - set name
+--
+-- Returns:
+-- node_name - node name
+-- node_location - libpq connect string for the node
+-- dead - whether the node should be considered dead
+-- ----------------------------------------------------------------------
+begin
+ for node_name, node_location, dead in
+ select m.node_name, m.node_location, m.dead
+ from pgq_set.member_info m
+ where m.set_name = i_set_name
+ loop
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.get_node_info(
+ in i_set_name text,
+
+ out node_type text,
+ out node_name text,
+ out queue_name text,
+ out global_watermark bigint,
+ out local_watermark bigint,
+ out completed_tick bigint,
+
+ out provider_node text,
+ out provider_location text,
+ out paused boolean,
+ out resync boolean,
+ out up_to_date boolean,
+
+ out combined_set text,
+ out combined_type text,
+ out combined_queue text
+) returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.get_node_info(1)
+--
+-- Get local node info for set.
+--
+-- Parameters:
+-- i_set_name - set name
+--
+-- Returns:
+-- node_type - local node type
+-- node_name - local node name
+-- queue_name - local queue name used for set
+-- global_watermark - set's global watermark
+-- local_watermark - set's local watermark, for this and below nodes
+-- completed_tick - last committed set's tick
+-- provider_node - provider node name
+-- provider_location - connect string to provider node
+-- paused - this node should not do any work
+-- resync - re-register on provider queue (???)
+-- up_to_date - if consumer has loaded last changes
+-- combined_set - target set name for merge-leaf
+-- combined_type - node type of target set
+-- combined_queue - queue name for target set
+-- ----------------------------------------------------------------------
+declare
+ sql text;
+begin
+ select n.node_type, n.node_name, t.tick_id, n.queue_name,
+ c.set_name, c.node_type, c.queue_name, n.global_watermark,
+ n.provider_node, n.paused, n.resync, n.up_to_date,
+ p.node_location
+ into node_type, node_name, completed_tick, queue_name,
+ combined_set, combined_type, combined_queue, global_watermark,
+ provider_node, paused, resync, up_to_date,
+ provider_location
+ from pgq_set.set_info n
+ left join pgq_set.completed_tick t on (t.set_name = n.set_name)
+ left join pgq_set.set_info c on (c.set_name = n.combined_set)
+ left join pgq_set.member_info p on (p.set_name = n.set_name and p.node_name = n.provider_node)
+ where n.set_name = i_set_name;
+
+ select min(u.tick_id) into local_watermark
+ from (select tick_id
+ from pgq_set.completed_tick
+ where set_name = i_set_name
+ union all
+ select local_watermark as tick_id
+ from pgq_set.subscriber_info
+ where set_name = i_set_name
+ union all
+ -- exclude watermark consumer
+ select last_tick as tick_id
+ from pgq.get_consumer_info(queue_name)
+ where consumer_name <> (i_set_name || '_watermark')
+ ) u;
+ if local_watermark is null and queue_name is not null then
+ select t.tick_id into local_watermark
+ from pgq.tick t, pgq.queue q
+ where t.queue_id = q.queue_id
+ and q.queue_name = queue_name
+ order by 1 desc
+ limit 1;
+ end if;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.remove_member(
+ in i_set_name text,
+ in i_node_name text,
+ out ret_code int4,
+ out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.remove_member(2)
+--
+-- Add new set member.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_node_name - node name
+--
+-- Returns:
+-- ret_code - error code
+-- ret_note - error description
+--
+-- Return Codes:
+-- 200 - Ok
+-- ----------------------------------------------------------------------
+declare
+ o record;
+begin
+ delete from pgq_set.member_info
+ where set_name = i_set_name
+ and node_name = i_node_name;
+ select 200, 'Ok' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.set_global_watermark(
+ i_set_name text,
+ i_watermark bigint)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_global_watermark(2)
+--
+-- Move global watermark on branch/leaf.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_watermark - global tick_id that is processed everywhere
+--
+-- Returns:
+-- nothing
+-- ----------------------------------------------------------------------
+declare
+ this record;
+ wm_consumer text;
+begin
+ select node_type, queue_name into this
+ from pgq_set.set_info
+ where set_name = i_set_name
+ for update;
+ if not found then
+ raise exception 'set % not found', i_set_name;
+ end if;
+
+ update pgq_set.set_info
+ set global_watermark = i_watermark
+ where set_name = i_set_name;
+ if not found then
+ raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
+ end if;
+
+ -- move watermark on pgq
+ if this.queue_name is not null then
+ wm_consumer := i_set_name || '.watermark';
+ perform pgq.register_consumer_at(this.queue_name, wm_consumer, i_watermark);
+ end if;
+
+ if this.node_type in ('root', 'combined-root') then
+ perform pgq.insert_event(this.queue_name, 'global-watermark', i_watermark,
+ i_set_name, null, null, null);
+ end if;
+ return i_watermark;
+end;
+$$ language plpgsql security definer;
+
+
--- /dev/null
+
+create or replace function pgq_set.set_node_uptodate(
+ i_set_name text,
+ i_uptodate boolean)
+returns int4 as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_node_uptodate(2)
+--
+-- Set node uptodate flag.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_uptodate - new flag state
+--
+-- Returns:
+-- nothing
+-- ----------------------------------------------------------------------
+begin
+ update pgq_set.set_info
+ set up_to_date = i_uptodate
+ where set_name = i_set_name;
+ if not found then
+ raise exception 'no such set: %', i_set_name;
+ end if;
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
--- /dev/null
+
+create or replace function pgq_set.set_partition_watermark(
+ i_combined_set_name text,
+ i_part_set_name text,
+ i_watermark bigint)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_partition_watermark(3)
+--
+-- Move merge-leaf position on combined-branch.
+--
+-- Parameters:
+-- i_combined_set_name - local combined set name
+-- i_part_set_name - local part set name (merge-leaf)
+-- i_watermark - partition tick_id that came inside combined-root batch
+--
+-- Returns:
+-- nothing
+-- ----------------------------------------------------------------------
+declare
+ cnode record;
+ pnode record;
+begin
+ -- check if combined-branch exists
+ perform 1 from pgq_set.set_info c, pgq_set.set_info p
+ where p.set_name = i_part_set_name
+ and c.set_name = i_combined_set_name
+ and p.combined_set = c.set_name
+ and p.node_type = 'merge-leaf'
+ and c.node_type = 'combined-branch';
+ if not found then
+ raise exception 'combined-branch/merge-leaf pair not found (%/%)', i_combined_set_name, i_part_set_name;
+ end if;
+
+
+ update pgq_set.completed_tick
+ set tick_id = i_watermark
+ where set_name = i_part_set_name;
+ if not found then
+ raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
+ end if;
+
+ return i_watermark;
+end;
+$$ language plpgsql security definer;
+
+
--- /dev/null
+
+create or replace function pgq_set.set_subscriber_watermark(
+ i_set_name text,
+ i_node_name text,
+ i_watermark bigint)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.set_subscriber_watermark(3)
+--
+-- Notify provider about subscribers lowest watermark. On root
+-- node, changes global_watermark and sends event about that
+-- to the queue.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_node_name - subscriber node name
+-- i_watermark - tick_id
+--
+-- Returns:
+-- nothing
+-- ----------------------------------------------------------------------
+declare
+ m record;
+ cur_wm bigint;
+begin
+ select node_type, global_watermark, local_queue
+ into m
+ from pgq_set.local_node
+ where set_name = i_set_name
+ for update;
+ if not found then
+ raise exception 'no such set: %', i_set_name;
+ end if;
+
+ update pgq_set.subscriber_info
+ set local_watermark = i_watermark
+ where set_name = i_set_name
+ and node_name = i_node_name;
+ if not found then
+ raise exception 'node % not subscribed to set %', i_node_name, i_set_name;
+ end if;
+
+ if m.node_type in ('root', 'combined-root') then
+ cur_wm := pgq_set.get_local_watermark(i_set_name);
+ if cur_wm > m.global_watermark then
+ update pgq_set.local_node set global_watermark = cur_wm
+ where set_name = i_set_name;
+ perform pgq.insert_event(m.local_queue, 'global-watermark', cur_wm);
+ end if;
+ end if;
+
+ return i_watermark;
+end;
+$$ language plpgsql security definer;
+
+
--- /dev/null
+
+create or replace function pgq_set.subscribe_node(
+ in i_set_name text,
+ in i_remote_node_name text,
+ in i_remote_worker_name text,
+ out ret_code int4,
+ out ret_note text,
+ out global_watermark bigint)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.subscribe_node(2)
+--
+-- Subscribe remote node to local node.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_remote_node_name - node name
+-- i_remote_worker_name - remote process job_name
+--
+-- Returns:
+-- ret_code - error code
+-- ret_note - description
+-- global_watermark - minimal watermark, also subscription pos
+-- ----------------------------------------------------------------------
+declare
+ n record;
+begin
+ select s.node_type, s.global_watermark, s.queue_name into n
+ from pgq_set.set_info s where s.set_name = i_set_name;
+ global_watermark := n.global_watermark;
+
+ if n.node_type in ('leaf', 'merge-leaf') then
+ select 401, 'Cannot subscribe to ' || n.node_type || ' node'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ perform pgq.register_consumer_at(n.queue_name, i_remote_worker_name, n.global_watermark);
+
+ insert into pgq_set.subscriber_info (set_name, node_name, local_watermark, worker_name)
+ values (i_set_name, i_remote_node_name, n.global_watermark, i_remote_worker_name);
+
+ select 200, 'Ok' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+create or replace function pgq_set.unsubscribe_node(
+ in i_set_name text,
+ in i_remote_node_name text,
+ out ret_code int4,
+ out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_set.unsubscribe_node(2)
+--
+-- Unsubscribe remote node from local node.
+--
+-- Parameters:
+-- i_set_name - set name
+-- i_remote_node_name - node name
+--
+-- Returns:
+-- ret_code - error code
+-- ret_note - description
+-- ----------------------------------------------------------------------
+declare
+ s record;
+ n record;
+begin
+ -- fetch node info
+ select queue_name, node_type into n from pgq_set.set_info
+ where set_name = i_set_name;
+ if not found then
+ select 404, 'No such set: '||i_set_name into ret_code, ret_note;
+ return;
+ end if;
+
+ -- fetch subscription info
+ select node_name, worker_name into s from pgq_set.subscriber_info
+ where set_name = i_set_name and node_name = i_remote_node_name
+ for update;
+ if not found then
+ select 404, 'No such subscriber: '||i_remote_node_name into ret_code, ret_note;
+ return;
+ end if;
+
+ -- unregister from queue
+ perform pgq.unregister_consumer(n.queue_name, s.worker_name);
+
+ -- drop subscription
+ delete from pgq_set.subscriber_info
+ where set_name = i_set_name
+ and node_name = i_remote_node_name;
+
+ -- done
+ select 200, 'Ok' into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+\set ECHO none
+\i ../txid/txid.sql
+\i ../pgq/pgq.sql
+\i ../londiste/londiste.sql
+\i structure/pgq_set.sql
+\i structure/functions.sql
+\set ECHO all
+
+select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false);
+select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false);
+select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false);
+select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false);
+select * from pgq_set.get_member_info('aset');
+
+select * from pgq_set.remove_member('aset', 'node4');
+select * from pgq_set.get_member_info('aset');
+
+select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null);
+
+select * from pgq_set.get_node_info('aset');
--- /dev/null
+
+-- Group: Global Node Map
+\i functions/pgq_set.add_member.sql
+\i functions/pgq_set.remove_member.sql
+\i functions/pgq_set.get_member_info.sql
+
+-- Group: Node manipulation
+\i functions/pgq_set.create_node.sql
+\i functions/pgq_set.drop_node.sql
+\i functions/pgq_set.subscribe_node.sql
+\i functions/pgq_set.unsubscribe_node.sql
+\i functions/pgq_set.set_node_uptodate.sql
+
+-- Group: Node Info
+\i functions/pgq_set.get_node_info.sql
+
+-- Group: Watermark tracking
+\i functions/pgq_set.set_subscriber_watermark.sql
+\i functions/pgq_set.set_global_watermark.sql
+\i functions/pgq_set.set_partition_watermark.sql
+
+
--- /dev/null
+
+create schema pgq_set;
+grant usage on schema pgq_set to public;
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.member_info
+--
+-- Static table that just lists all members in set.
+--
+-- Columns:
+-- set_name - set name
+-- node_name - node name
+-- node_location - libpq connect string for connecting to node
+-- online - whether the node is available
+-- ----------------------------------------------------------------------
+create table pgq_set.member_info (
+ set_name text not null,
+ node_name text not null,
+ node_location text not null,
+ dead boolean not null default false,
+
+ primary key (set_name, node_name)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.local_node
+--
+-- Local node info.
+--
+-- Columns:
+-- set_name - set name
+-- node_type - local node type
+-- node_name - local node name
+-- queue_name - local queue name for set, NULL on leaf
+-- provider_node - provider node name
+-- combined_set - on 'merge-leaf' the target combined set name
+-- global_watermark - set's global watermark, set by root node
+-- paused - true if worker for this node should sleep
+-- resync - true if worker for this node needs to re-register itself on provider queue
+-- up_to_date - true if worker for this node has seen table changes
+--
+-- Node types:
+-- root - data + batches is generated here
+-- branch - replicates full queue contents and maybe contains some tables
+-- leaf - does not replicate queue
+-- combined-root - data from several partitions is merged here
+-- combined-branch - can take over the role of combined-root
+-- merge-leaf - this node in part set is linked to combined-root/branch
+-- ----------------------------------------------------------------------
+create table pgq_set.set_info (
+ set_name text not null primary key,
+ node_type text not null,
+ node_name text not null,
+ queue_name text,
+ provider_node text,
+ combined_set text,
+
+ global_watermark bigint not null,
+
+ paused boolean not null default false,
+ resync boolean not null default false,
+ up_to_date boolean not null default false,
+
+ foreign key (set_name, node_name) references pgq_set.member_info,
+ foreign key (set_name, provider_node) references pgq_set.member_info,
+ check (node_type in ('root', 'branch', 'leaf', 'combined-root', 'combined-branch', 'merge-leaf')),
+ check (case when node_type = 'root' then (queue_name is not null and provider_node is null and combined_set is null)
+ when node_type = 'branch' then (queue_name is not null and provider_node is not null and combined_set is null)
+ when node_type = 'leaf' then (queue_name is null and provider_node is not null and combined_set is null)
+ when node_type = 'combined-root' then (queue_name is not null and provider_node is null and combined_set is null)
+ when node_type = 'combined-branch' then (queue_name is not null and provider_node is not null and combined_set is null)
+ when node_type = 'merge-leaf' then (queue_name is null and provider_node is not null and combined_set is not null)
+ else false end)
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.subscriber_info
+--
+-- Contains subscribers for a set.
+--
+-- Columns:
+-- set_name - set's name
+-- node_name - node name
+-- worker_name - consumer_name for node
+-- local_watermark - watermark for node and it's subscribers
+-- ----------------------------------------------------------------------
+create table pgq_set.subscriber_info (
+ set_name text not null,
+ node_name text not null,
+ worker_name text not null,
+ local_watermark bigint not null,
+
+ primary key (set_name, node_name),
+ foreign key (set_name, node_name) references pgq_set.member_info
+);
+
+-- ----------------------------------------------------------------------
+-- Table: pgq_set.completed_tick
+--
+-- Contains completed tick_id from provider.
+--
+-- Columns:
+-- set_name - set's name
+-- tick_id - last committed tick id
+-- ----------------------------------------------------------------------
+create table pgq_set.completed_tick (
+ set_name text not null primary key,
+ tick_id bigint not null,
+
+ foreign key (set_name) references pgq_set.set_info
+);
+