From 39e85838be8ff8296a4a13354d5144ae16539dc8 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Wed, 5 Dec 2007 15:40:06 +0000 Subject: [PATCH] draft version of pgq_set sql code --- setup.py | 1 + sql/Makefile | 2 +- sql/pgq_set/Makefile | 53 +++++++++ sql/pgq_set/expected/pgq_set.out | 60 ++++++++++ sql/pgq_set/functions/pgq_set.add_member.sql | 50 ++++++++ sql/pgq_set/functions/pgq_set.create_node.sql | 91 ++++++++++++++ sql/pgq_set/functions/pgq_set.drop_node.sql | 53 +++++++++ .../functions/pgq_set.get_member_info.sql | 33 ++++++ .../functions/pgq_set.get_node_info.sql | 88 ++++++++++++++ .../functions/pgq_set.remove_member.sql | 34 ++++++ .../pgq_set.set_global_watermark.sql | 51 ++++++++ .../functions/pgq_set.set_node_uptodate.sql | 29 +++++ .../pgq_set.set_partition_watermark.sql | 47 ++++++++ .../pgq_set.set_subscriber_watermark.sql | 56 +++++++++ .../functions/pgq_set.subscribe_node.sql | 47 ++++++++ .../functions/pgq_set.unsubscribe_node.sql | 55 +++++++++ sql/pgq_set/sql/pgq_set.sql | 21 ++++ sql/pgq_set/structure/functions.sql | 22 ++++ sql/pgq_set/structure/pgq_set.sql | 112 ++++++++++++++++++ 19 files changed, 904 insertions(+), 1 deletion(-) create mode 100644 sql/pgq_set/Makefile create mode 100644 sql/pgq_set/expected/pgq_set.out create mode 100644 sql/pgq_set/functions/pgq_set.add_member.sql create mode 100644 sql/pgq_set/functions/pgq_set.create_node.sql create mode 100644 sql/pgq_set/functions/pgq_set.drop_node.sql create mode 100644 sql/pgq_set/functions/pgq_set.get_member_info.sql create mode 100644 sql/pgq_set/functions/pgq_set.get_node_info.sql create mode 100644 sql/pgq_set/functions/pgq_set.remove_member.sql create mode 100644 sql/pgq_set/functions/pgq_set.set_global_watermark.sql create mode 100644 sql/pgq_set/functions/pgq_set.set_node_uptodate.sql create mode 100644 sql/pgq_set/functions/pgq_set.set_partition_watermark.sql create mode 100644 sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql create mode 100644 sql/pgq_set/functions/pgq_set.subscribe_node.sql create mode 100644 sql/pgq_set/functions/pgq_set.unsubscribe_node.sql create mode 100644 sql/pgq_set/sql/pgq_set.sql create mode 100644 sql/pgq_set/structure/functions.sql create mode 100644 sql/pgq_set/structure/pgq_set.sql diff --git a/setup.py b/setup.py index 3eadc3da..fe8cedb3 100755 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ share_dup_files = [ 'sql/pgq/pgq.sql', 'sql/londiste/londiste.sql', 'sql/pgq_ext/pgq_ext.sql', + 'sql/pgq_set/pgq_set.sql', 'sql/logtriga/logtriga.sql', ] if os.path.isfile('sql/txid/txid.sql'): diff --git a/sql/Makefile b/sql/Makefile index a6545e79..5e2d508d 100644 --- a/sql/Makefile +++ b/sql/Makefile @@ -1,7 +1,7 @@ include ../config.mak -SUBDIRS = logtriga londiste pgq pgq_ext txid +SUBDIRS = logtriga londiste pgq pgq_ext pgq_set txid all install clean distclean installcheck: for dir in $(SUBDIRS); do \ diff --git a/sql/pgq_set/Makefile b/sql/pgq_set/Makefile new file mode 100644 index 00000000..2776378c --- /dev/null +++ b/sql/pgq_set/Makefile @@ -0,0 +1,53 @@ + +DATA_built = pgq_set.sql pgq_set.upgrade.sql + +SRCS = $(wildcard structure/*.sql) $(wildcard functions/*.sql) + +REGRESS = pgq_set +REGRESS_OPTS = --load-language=plpgsql + +PGXS = $(shell pg_config --pgxs) +include $(PGXS) + +NDOC = NaturalDocs +NDOCARGS = -r -o html docs/html -p docs -i docs/sql +CATSQL = ../../scripts/catsql.py + +# +# combined SQL files +# + +pgq_set.sql: $(SRCS) + $(CATSQL) structure/pgq_set.sql structure/functions.sql > $@ + +pgq_set.upgrade.sql: $(SRCS) + $(CATSQL) structure/functions.sql > $@ + +# +# docs +# +dox: cleandox $(SRCS) + mkdir -p docs/html + mkdir -p docs/sql + $(CATSQL) --ndoc structure/pgq_set.sql > docs/sql/pgq_set.sql + $(CATSQL) --ndoc structure/functions.sql > docs/sql/functions.sql + $(NDOC) $(NDOCARGS) + +cleandox: + rm -rf docs/html docs/Data docs/sql + +clean: cleandox + +upload: dox + rsync -az --delete docs/html/* data1:public_html/pgq-set/ + +# +# regtest shortcuts +# + +test: pgq_set.sql + $(MAKE) installcheck || { less regression.diffs; exit 1; } + +ack: + cp results/*.out expected/ + diff --git a/sql/pgq_set/expected/pgq_set.out b/sql/pgq_set/expected/pgq_set.out new file mode 100644 index 00000000..cfb58c86 --- /dev/null +++ b/sql/pgq_set/expected/pgq_set.out @@ -0,0 +1,60 @@ +\set ECHO none +select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false); + ret_code | ret_note +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false); + ret_code | ret_note +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false); + ret_code | ret_note +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false); + ret_code | ret_note +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_set.get_member_info('aset'); + node_name | node_location | dead +-----------+---------------+------ + node1 | dbname=node1 | f + node2 | dbname=node2 | f + node3 | dbname=node3 | f + node4 | dbname=node4 | f +(4 rows) + +select * from pgq_set.remove_member('aset', 'node4'); + ret_code | ret_note +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_set.get_member_info('aset'); + node_name | node_location | dead +-----------+---------------+------ + node1 | dbname=node1 | f + node2 | dbname=node2 | f + node3 | dbname=node3 | f +(3 rows) + +select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null); + ret_code | ret_desc +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_set.get_node_info('aset'); + node_type | node_name | queue_name | global_watermark | local_watermark | completed_tick | provider_node | paused | resync | up_to_date | combined_set | combined_type | combined_queue +-----------+-----------+------------+------------------+-----------------+----------------+---------------+--------+--------+------------+--------------+---------------+---------------- + root | node1 | aset | 1 | | | | f | f | f | | | +(1 row) + diff --git a/sql/pgq_set/functions/pgq_set.add_member.sql b/sql/pgq_set/functions/pgq_set.add_member.sql new file mode 100644 index 00000000..064b65e8 --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.add_member.sql @@ -0,0 +1,50 @@ + +create or replace function pgq_set.add_member( + in i_set_name text, + in i_node_name text, + in i_node_location text, + in i_dead boolean, + out ret_code int4, + out ret_note text) +returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.add_member(3) +-- +-- Add new set member. +-- +-- Parameters: +-- i_set_name - set name +-- i_node_name - node name +-- i_node_location - node connect string +-- i_dead - dead flag for node +-- +-- Returns: +-- ret_code - error code +-- ret_note - error description +-- +-- Return Codes: +-- 200 - Ok +-- 404 - No such set +-- ---------------------------------------------------------------------- +declare + o record; +begin + select node_location into o + from pgq_set.member_info + where set_name = i_set_name + and node_name = i_node_name; + if found then + update pgq_set.member_info + set node_location = i_node_location, + dead = i_dead + where set_name = i_set_name + and node_name = i_node_name; + else + insert into pgq_set.member_info (set_name, node_name, node_location, dead) + values (i_set_name, i_node_name, i_node_location, i_dead); + end if; + select 200, 'Ok' into ret_code, ret_note; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.create_node.sql b/sql/pgq_set/functions/pgq_set.create_node.sql new file mode 100644 index 00000000..322dca1d --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.create_node.sql @@ -0,0 +1,91 @@ + +create or replace function pgq_set.create_node( + in i_set_name text, + in i_node_type text, + in i_node_name text, + in i_provider_name text, + in i_global_watermark bigint, + in i_combined_set text, + out ret_code int4, + out ret_desc text) +returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.create_node(6) +-- +-- Initialize node. +-- +-- Parameters: +-- i_set_name - set name +-- i_node_type - node type +-- i_node_name - node name +-- i_provider_name - provider node name for non-root nodes +-- i_global_watermark - global lowest tick_id +-- i_combined_set - merge-leaf: target set +-- +-- Returns: +-- desc +-- +-- Node Types: +-- root - master node +-- branch - subscriber node that can be provider to others +-- leaf - subscriber node that cannot be provider to others +-- combined-root - root node for combined set +-- combined-branch - failover node for combined set +-- merge-leaf - leaf node on partition set that will be merged into combined-root +-- ---------------------------------------------------------------------- +declare + _queue_name text; + _wm_consumer text; + _global_wm bigint; +begin + if i_node_type in ('root', 'combined-root') then + if coalesce(i_provider_name, i_global_watermark::text, + i_combined_set) is not null then + raise exception 'unexpected args for %', i_node_type; + end if; + + _queue_name := i_set_name; + _wm_consumer := i_set_name || '.watermark'; + perform pgq.create_queue(_queue_name); + perform pgq.register_consumer(_queue_name, _wm_consumer); + _global_wm := (select last_tick from pgq.get_consumer_info(_queue_name, _wm_consumer)); + elsif i_node_type in ('branch', 'combined-branch') then + if i_provider_name is null then + raise exception 'provider not set for %', i_node_type; + end if; + if i_global_watermark is null then + raise exception 'global_watermark not set for %', i_node_type; + end if; + if i_node_type = 'combined-branch' and i_combined_set is null then + raise exception 'combined-set not given for %', i_node_type; + end if; + _queue_name := i_set_name; + _wm_consumer := i_set_name || '.watermark'; + perform pgq.create_queue(_queue_name); + update pgq.queue set queue_external_ticker = true where queue_name = _queue_name; + if i_global_watermark > 1 then + perform pgq.ticker(_queue_name, i_global_watermark); + end if; + perform pgq.register_consumer_at(_queue_name, _wm_consumer, i_global_watermark); + _global_wm := i_global_watermark; + elsif i_node_type in ('leaf', 'merge-leaf') then + _queue_name := null; + _global_wm := i_global_watermark; + end if; + + insert into pgq_set.set_info + (set_name, node_type, node_name, queue_name, + provider_node, combined_set, global_watermark) + values (i_set_name, i_node_type, i_node_name, _queue_name, + i_provider_name, i_combined_set, _global_wm); + + if i_node_type not in ('root', 'combined-root') then + insert into pgq_set.completed_tick (set_name, tick_id) + values (i_set_name, _global_wm); + end if; + + select 200, 'Ok' into ret_code, ret_desc; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.drop_node.sql b/sql/pgq_set/functions/pgq_set.drop_node.sql new file mode 100644 index 00000000..e32b8e37 --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.drop_node.sql @@ -0,0 +1,53 @@ + +create or replace function pgq_set.drop_node( + in i_set_name text, + out ret_code int4, + out ret_note text) +returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.drop_node(1) +-- +-- Drop node. +-- +-- Parameters: +-- i_set_name - set name +-- +-- Returns: +-- ret_code - error code +-- ret_note - error description +-- +-- Return Codes: +-- 200 - Ok +-- 404 - No such set +-- ---------------------------------------------------------------------- +declare + _queue_name text; + _wm_consumer text; + _global_wm bigint; + sub record; +begin + perform 1 from pgq_set.set_info + where set_name = i_set_name; + if not found then + select 404, 'No such set: ' || i_set_name into ret_code, ret_note; + return; + end if; + + perform pgq_set.unsubscribe_node(s.set_name, s.node_name) + from pgq_set.subscriber_info s + where set_name = i_set_name; + + delete from pgq_set.completed_tick + where set_name = i_set_name; + + delete from pgq_set.set_info + where set_name = i_set_name; + + delete from pgq_set.member_info + where set_name = i_set_name; + + select 200, 'Ok' into ret_code, ret_note; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.get_member_info.sql b/sql/pgq_set/functions/pgq_set.get_member_info.sql new file mode 100644 index 00000000..64ba024a --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.get_member_info.sql @@ -0,0 +1,33 @@ + +create or replace function pgq_set.get_member_info( + in i_set_name text, + + out node_name text, + out node_location text, + out dead boolean +) returns setof record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.get_member_info(1) +-- +-- Get member list for the set. +-- +-- Parameters: +-- i_set_name - set name +-- +-- Returns: +-- node_name - node name +-- node_location - libpq connect string for the node +-- dead - whether the node should be considered dead +-- ---------------------------------------------------------------------- +begin + for node_name, node_location, dead in + select m.node_name, m.node_location, m.dead + from pgq_set.member_info m + where m.set_name = i_set_name + loop + return next; + end loop; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.get_node_info.sql b/sql/pgq_set/functions/pgq_set.get_node_info.sql new file mode 100644 index 00000000..25ad463d --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.get_node_info.sql @@ -0,0 +1,88 @@ + +create or replace function pgq_set.get_node_info( + in i_set_name text, + + out node_type text, + out node_name text, + out queue_name text, + out global_watermark bigint, + out local_watermark bigint, + out completed_tick bigint, + + out provider_node text, + out provider_location text, + out paused boolean, + out resync boolean, + out up_to_date boolean, + + out combined_set text, + out combined_type text, + out combined_queue text +) returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.get_node_info(1) +-- +-- Get local node info for set. +-- +-- Parameters: +-- i_set_name - set name +-- +-- Returns: +-- node_type - local node type +-- node_name - local node name +-- queue_name - local queue name used for set +-- global_watermark - set's global watermark +-- local_watermark - set's local watermark, for this and below nodes +-- completed_tick - last committed set's tick +-- provider_node - provider node name +-- provider_location - connect string to provider node +-- paused - this node should not do any work +-- resync - re-register on provider queue (???) +-- up_to_date - if consumer has loaded last changes +-- combined_set - target set name for merge-leaf +-- combined_type - node type of target set +-- combined_queue - queue name for target set +-- ---------------------------------------------------------------------- +declare + sql text; +begin + select n.node_type, n.node_name, t.tick_id, n.queue_name, + c.set_name, c.node_type, c.queue_name, n.global_watermark, + n.provider_node, n.paused, n.resync, n.up_to_date, + p.node_location + into node_type, node_name, completed_tick, queue_name, + combined_set, combined_type, combined_queue, global_watermark, + provider_node, paused, resync, up_to_date, + provider_location + from pgq_set.set_info n + left join pgq_set.completed_tick t on (t.set_name = n.set_name) + left join pgq_set.set_info c on (c.set_name = n.combined_set) + left join pgq_set.member_info p on (p.set_name = n.set_name and p.node_name = n.provider_node) + where n.set_name = i_set_name; + + select min(u.tick_id) into local_watermark + from (select tick_id + from pgq_set.completed_tick + where set_name = i_set_name + union all + select local_watermark as tick_id + from pgq_set.subscriber_info + where set_name = i_set_name + union all + -- exclude watermark consumer + select last_tick as tick_id + from pgq.get_consumer_info(queue_name) + where consumer_name <> (i_set_name || '_watermark') + ) u; + if local_watermark is null and queue_name is not null then + select t.tick_id into local_watermark + from pgq.tick t, pgq.queue q + where t.queue_id = q.queue_id + and q.queue_name = queue_name + order by 1 desc + limit 1; + end if; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.remove_member.sql b/sql/pgq_set/functions/pgq_set.remove_member.sql new file mode 100644 index 00000000..76d2f477 --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.remove_member.sql @@ -0,0 +1,34 @@ + +create or replace function pgq_set.remove_member( + in i_set_name text, + in i_node_name text, + out ret_code int4, + out ret_note text) +returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.remove_member(2) +-- +-- Add new set member. +-- +-- Parameters: +-- i_set_name - set name +-- i_node_name - node name +-- +-- Returns: +-- ret_code - error code +-- ret_note - error description +-- +-- Return Codes: +-- 200 - Ok +-- ---------------------------------------------------------------------- +declare + o record; +begin + delete from pgq_set.member_info + where set_name = i_set_name + and node_name = i_node_name; + select 200, 'Ok' into ret_code, ret_note; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.set_global_watermark.sql b/sql/pgq_set/functions/pgq_set.set_global_watermark.sql new file mode 100644 index 00000000..445ca13b --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.set_global_watermark.sql @@ -0,0 +1,51 @@ + +create or replace function pgq_set.set_global_watermark( + i_set_name text, + i_watermark bigint) +returns bigint as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.set_global_watermark(2) +-- +-- Move global watermark on branch/leaf. +-- +-- Parameters: +-- i_set_name - set name +-- i_watermark - global tick_id that is processed everywhere +-- +-- Returns: +-- nothing +-- ---------------------------------------------------------------------- +declare + this record; + wm_consumer text; +begin + select node_type, queue_name into this + from pgq_set.set_info + where set_name = i_set_name + for update; + if not found then + raise exception 'set % not found', i_set_name; + end if; + + update pgq_set.set_info + set global_watermark = i_watermark + where set_name = i_set_name; + if not found then + raise exception 'node % not subscribed to set %', i_node_name, i_set_name; + end if; + + -- move watermark on pgq + if this.queue_name is not null then + wm_consumer := i_set_name || '.watermark'; + perform pgq.register_consumer_at(this.queue_name, wm_consumer, i_watermark); + end if; + + if this.node_type in ('root', 'combined-root') then + perform pgq.insert_event(this.queue_name, 'global-watermark', i_watermark, + i_set_name, null, null, null); + end if; + return i_watermark; +end; +$$ language plpgsql security definer; + + diff --git a/sql/pgq_set/functions/pgq_set.set_node_uptodate.sql b/sql/pgq_set/functions/pgq_set.set_node_uptodate.sql new file mode 100644 index 00000000..ce99083a --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.set_node_uptodate.sql @@ -0,0 +1,29 @@ + +create or replace function pgq_set.set_node_uptodate( + i_set_name text, + i_uptodate boolean) +returns int4 as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.set_node_uptodate(2) +-- +-- Set node uptodate flag. +-- +-- Parameters: +-- i_set_name - set name +-- i_uptodate - new flag state +-- +-- Returns: +-- nothing +-- ---------------------------------------------------------------------- +begin + update pgq_set.set_info + set up_to_date = i_uptodate + where set_name = i_set_name; + if not found then + raise exception 'no such set: %', i_set_name; + end if; + return 1; +end; +$$ language plpgsql security definer; + + diff --git a/sql/pgq_set/functions/pgq_set.set_partition_watermark.sql b/sql/pgq_set/functions/pgq_set.set_partition_watermark.sql new file mode 100644 index 00000000..e538c12f --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.set_partition_watermark.sql @@ -0,0 +1,47 @@ + +create or replace function pgq_set.set_partition_watermark( + i_combined_set_name text, + i_part_set_name text, + i_watermark bigint) +returns bigint as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.set_partition_watermark(3) +-- +-- Move merge-leaf position on combined-branch. +-- +-- Parameters: +-- i_combined_set_name - local combined set name +-- i_part_set_name - local part set name (merge-leaf) +-- i_watermark - partition tick_id that came inside combined-root batch +-- +-- Returns: +-- nothing +-- ---------------------------------------------------------------------- +declare + cnode record; + pnode record; +begin + -- check if combined-branch exists + perform 1 from pgq_set.set_info c, pgq_set.set_info p + where p.set_name = i_part_set_name + and c.set_name = i_combined_set_name + and p.combined_set = c.set_name + and p.node_type = 'merge-leaf' + and c.node_type = 'combined-branch'; + if not found then + raise exception 'combined-branch/merge-leaf pair not found (%/%)', i_combined_set_name, i_part_set_name; + end if; + + + update pgq_set.completed_tick + set tick_id = i_watermark + where set_name = i_part_set_name; + if not found then + raise exception 'node % not subscribed to set %', i_node_name, i_set_name; + end if; + + return i_watermark; +end; +$$ language plpgsql security definer; + + diff --git a/sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql b/sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql new file mode 100644 index 00000000..3f4d85f9 --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.set_subscriber_watermark.sql @@ -0,0 +1,56 @@ + +create or replace function pgq_set.set_subscriber_watermark( + i_set_name text, + i_node_name text, + i_watermark bigint) +returns bigint as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.set_subscriber_watermark(3) +-- +-- Notify provider about subscribers lowest watermark. On root +-- node, changes global_watermark and sends event about that +-- to the queue. +-- +-- Parameters: +-- i_set_name - set name +-- i_node_name - subscriber node name +-- i_watermark - tick_id +-- +-- Returns: +-- nothing +-- ---------------------------------------------------------------------- +declare + m record; + cur_wm bigint; +begin + select node_type, global_watermark, local_queue + into m + from pgq_set.local_node + where set_name = i_set_name + for update; + if not found then + raise exception 'no such set: %', i_set_name; + end if; + + update pgq_set.subscriber_info + set local_watermark = i_watermark + where set_name = i_set_name + and node_name = i_node_name; + if not found then + raise exception 'node % not subscribed to set %', i_node_name, i_set_name; + end if; + + if m.node_type in ('root', 'combined-root') then + cur_wm := pgq_set.get_local_watermark(i_set_name); + if cur_wm > m.global_watermark then + update pgq_set.local_node set global_watermark = cur_wm + where set_name = i_set_name; + perform pgq.insert_event(m.local_queue, 'global-watermark', cur_wm); + end if; + end if; + + return i_watermark; +end; +$$ language plpgsql security definer; + + diff --git a/sql/pgq_set/functions/pgq_set.subscribe_node.sql b/sql/pgq_set/functions/pgq_set.subscribe_node.sql new file mode 100644 index 00000000..15cc5397 --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.subscribe_node.sql @@ -0,0 +1,47 @@ + +create or replace function pgq_set.subscribe_node( + in i_set_name text, + in i_remote_node_name text, + in i_remote_worker_name text, + out ret_code int4, + out ret_note text, + out global_watermark bigint) +returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.subscribe_node(2) +-- +-- Subscribe remote node to local node. +-- +-- Parameters: +-- i_set_name - set name +-- i_remote_node_name - node name +-- i_remote_worker_name - remote process job_name +-- +-- Returns: +-- ret_code - error code +-- ret_note - description +-- global_watermark - minimal watermark, also subscription pos +-- ---------------------------------------------------------------------- +declare + n record; +begin + select s.node_type, s.global_watermark, s.queue_name into n + from pgq_set.set_info s where s.set_name = i_set_name; + global_watermark := n.global_watermark; + + if n.node_type in ('leaf', 'merge-leaf') then + select 401, 'Cannot subscribe to ' || n.node_type || ' node' + into ret_code, ret_note; + return; + end if; + + perform pgq.register_consumer_at(n.queue_name, i_remote_worker_name, n.global_watermark); + + insert into pgq_set.subscriber_info (set_name, node_name, local_watermark, worker_name) + values (i_set_name, i_remote_node_name, n.global_watermark, i_remote_worker_name); + + select 200, 'Ok' into ret_code, ret_note; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/functions/pgq_set.unsubscribe_node.sql b/sql/pgq_set/functions/pgq_set.unsubscribe_node.sql new file mode 100644 index 00000000..73ea4aee --- /dev/null +++ b/sql/pgq_set/functions/pgq_set.unsubscribe_node.sql @@ -0,0 +1,55 @@ + +create or replace function pgq_set.unsubscribe_node( + in i_set_name text, + in i_remote_node_name text, + out ret_code int4, + out ret_note text) +returns record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq_set.unsubscribe_node(2) +-- +-- Unsubscribe remote node from local node. +-- +-- Parameters: +-- i_set_name - set name +-- i_remote_node_name - node name +-- +-- Returns: +-- ret_code - error code +-- ret_note - description +-- ---------------------------------------------------------------------- +declare + s record; + n record; +begin + -- fetch node info + select queue_name, node_type into n from pgq_set.set_info + where set_name = i_set_name; + if not found then + select 404, 'No such set: '||i_set_name into ret_code, ret_note; + return; + end if; + + -- fetch subscription info + select node_name, worker_name into s from pgq_set.subscriber_info + where set_name = i_set_name and node_name = i_remote_node_name + for update; + if not found then + select 404, 'No such subscriber: '||i_remote_node_name into ret_code, ret_note; + return; + end if; + + -- unregister from queue + perform pgq.unregister_consumer(n.queue_name, s.worker_name); + + -- drop subscription + delete from pgq_set.subscriber_info + where set_name = i_set_name + and node_name = i_remote_node_name; + + -- done + select 200, 'Ok' into ret_code, ret_note; + return; +end; +$$ language plpgsql security definer; + diff --git a/sql/pgq_set/sql/pgq_set.sql b/sql/pgq_set/sql/pgq_set.sql new file mode 100644 index 00000000..5ef46662 --- /dev/null +++ b/sql/pgq_set/sql/pgq_set.sql @@ -0,0 +1,21 @@ + +\set ECHO none +\i ../txid/txid.sql +\i ../pgq/pgq.sql +\i ../londiste/londiste.sql +\i structure/pgq_set.sql +\i structure/functions.sql +\set ECHO all + +select * from pgq_set.add_member('aset', 'node1', 'dbname=node1', false); +select * from pgq_set.add_member('aset', 'node2', 'dbname=node2', false); +select * from pgq_set.add_member('aset', 'node3', 'dbname=node3', false); +select * from pgq_set.add_member('aset', 'node4', 'dbname=node4', false); +select * from pgq_set.get_member_info('aset'); + +select * from pgq_set.remove_member('aset', 'node4'); +select * from pgq_set.get_member_info('aset'); + +select * from pgq_set.create_node('aset', 'root', 'node1', null, null, null); + +select * from pgq_set.get_node_info('aset'); diff --git a/sql/pgq_set/structure/functions.sql b/sql/pgq_set/structure/functions.sql new file mode 100644 index 00000000..6cf0dac5 --- /dev/null +++ b/sql/pgq_set/structure/functions.sql @@ -0,0 +1,22 @@ + +-- Group: Global Node Map +\i functions/pgq_set.add_member.sql +\i functions/pgq_set.remove_member.sql +\i functions/pgq_set.get_member_info.sql + +-- Group: Node manipulation +\i functions/pgq_set.create_node.sql +\i functions/pgq_set.drop_node.sql +\i functions/pgq_set.subscribe_node.sql +\i functions/pgq_set.unsubscribe_node.sql +\i functions/pgq_set.set_node_uptodate.sql + +-- Group: Node Info +\i functions/pgq_set.get_node_info.sql + +-- Group: Watermark tracking +\i functions/pgq_set.set_subscriber_watermark.sql +\i functions/pgq_set.set_global_watermark.sql +\i functions/pgq_set.set_partition_watermark.sql + + diff --git a/sql/pgq_set/structure/pgq_set.sql b/sql/pgq_set/structure/pgq_set.sql new file mode 100644 index 00000000..25f85d5b --- /dev/null +++ b/sql/pgq_set/structure/pgq_set.sql @@ -0,0 +1,112 @@ + +create schema pgq_set; +grant usage on schema pgq_set to public; + +-- ---------------------------------------------------------------------- +-- Table: pgq_set.member_info +-- +-- Static table that just lists all members in set. +-- +-- Columns: +-- set_name - set name +-- node_name - node name +-- node_location - libpq connect string for connecting to node +-- online - whether the node is available +-- ---------------------------------------------------------------------- +create table pgq_set.member_info ( + set_name text not null, + node_name text not null, + node_location text not null, + dead boolean not null default false, + + primary key (set_name, node_name) +); + +-- ---------------------------------------------------------------------- +-- Table: pgq_set.local_node +-- +-- Local node info. +-- +-- Columns: +-- set_name - set name +-- node_type - local node type +-- node_name - local node name +-- queue_name - local queue name for set, NULL on leaf +-- provider_node - provider node name +-- combined_set - on 'merge-leaf' the target combined set name +-- global_watermark - set's global watermark, set by root node +-- paused - true if worker for this node should sleep +-- resync - true if worker for this node needs to re-register itself on provider queue +-- up_to_date - true if worker for this node has seen table changes +-- +-- Node types: +-- root - data + batches is generated here +-- branch - replicates full queue contents and maybe contains some tables +-- leaf - does not replicate queue +-- combined-root - data from several partitions is merged here +-- combined-branch - can take over the role of combined-root +-- merge-leaf - this node in part set is linked to combined-root/branch +-- ---------------------------------------------------------------------- +create table pgq_set.set_info ( + set_name text not null primary key, + node_type text not null, + node_name text not null, + queue_name text, + provider_node text, + combined_set text, + + global_watermark bigint not null, + + paused boolean not null default false, + resync boolean not null default false, + up_to_date boolean not null default false, + + foreign key (set_name, node_name) references pgq_set.member_info, + foreign key (set_name, provider_node) references pgq_set.member_info, + check (node_type in ('root', 'branch', 'leaf', 'combined-root', 'combined-branch', 'merge-leaf')), + check (case when node_type = 'root' then (queue_name is not null and provider_node is null and combined_set is null) + when node_type = 'branch' then (queue_name is not null and provider_node is not null and combined_set is null) + when node_type = 'leaf' then (queue_name is null and provider_node is not null and combined_set is null) + when node_type = 'combined-root' then (queue_name is not null and provider_node is null and combined_set is null) + when node_type = 'combined-branch' then (queue_name is not null and provider_node is not null and combined_set is null) + when node_type = 'merge-leaf' then (queue_name is null and provider_node is not null and combined_set is not null) + else false end) +); + +-- ---------------------------------------------------------------------- +-- Table: pgq_set.subscriber_info +-- +-- Contains subscribers for a set. +-- +-- Columns: +-- set_name - set's name +-- node_name - node name +-- worker_name - consumer_name for node +-- local_watermark - watermark for node and it's subscribers +-- ---------------------------------------------------------------------- +create table pgq_set.subscriber_info ( + set_name text not null, + node_name text not null, + worker_name text not null, + local_watermark bigint not null, + + primary key (set_name, node_name), + foreign key (set_name, node_name) references pgq_set.member_info +); + +-- ---------------------------------------------------------------------- +-- Table: pgq_set.completed_tick +-- +-- Contains completed tick_id from provider. +-- +-- Columns: +-- set_name - set's name +-- tick_id - last committed tick id +-- ---------------------------------------------------------------------- +create table pgq_set.completed_tick ( + set_name text not null primary key, + tick_id bigint not null, + + foreign key (set_name) references pgq_set.set_info +); + -- 2.39.5