rsync -rtlz api/* $(web)/api
cd ../sql/pgq && rm -rf docs/html && $(MAKE) dox
rsync -rtlz ../sql/pgq/docs/html/* $(web)/pgq/
+ cd ../sql/pgq_coop && rm -rf docs/html && $(MAKE) dox
+ rsync -rtlz ../sql/pgq_coop/docs/html/* $(web)/pgq_coop/
cd ../sql/pgq_node && rm -rf docs/html && $(MAKE) dox
rsync -rtlz ../sql/pgq_node/docs/html/* $(web)/pgq_node/
cd ../sql/londiste && rm -rf docs/html && $(MAKE) dox
include ../config.mak
-SUBDIRS = londiste pgq pgq_ext pgq_node txid
+SUBDIRS = londiste pgq pgq_coop pgq_ext pgq_node txid
all install clean distclean installcheck:
for dir in $(SUBDIRS); do \
--- /dev/null
+
+DATA_built = pgq_coop.sql pgq_coop.upgrade.sql
+
+SQL_FULL = structure/schema.sql structure/functions.sql
+SQL_UPGRADE = structure/functions.sql
+
+FUNCS = $(shell sed -n -e '/^\\/{s/\\i //;p}' structure/functions.sql)
+SRCS = $(SQL_FULL) $(FUNCS)
+
+REGRESS = pgq_coop_test
+REGRESS_OPTS = --load-language=plpgsql
+
+include ../../config.mak
+include $(PGXS)
+
+NDOC = NaturalDocs
+NDOCARGS = -r -o html docs/html -p docs -i docs/sql
+CATSQL = ../../scripts/catsql.py
+
+#
+# combined SQL files
+#
+
+pgq_coop.sql: $(SRCS)
+ $(CATSQL) $(SQL_FULL) > $@
+
+pgq_coop.upgrade.sql: $(SRCS)
+ $(CATSQL) $(SQL_UPGRADE) > $@
+
+#
+# docs
+#
+dox: cleandox $(SRCS)
+ mkdir -p docs/html
+ mkdir -p docs/sql
+ $(CATSQL) structure/functions.sql > docs/sql/functions.sql
+ $(NDOC) $(NDOCARGS)
+
+cleandox:
+ rm -rf docs/html docs/Data docs/sql
+
+clean: cleandox
+
+upload: dox
+ rsync -az --delete docs/html/* data1:public_html/pgq-set/
+
+#
+# regtest shortcuts
+#
+
+test: pgq_coop.sql
+ $(MAKE) installcheck || { less regression.diffs; exit 1; }
+
+ack:
+ cp results/*.out expected/
+
--- /dev/null
+\set ECHO none
+select pgq.create_queue('testqueue');
+ create_queue
+--------------
+ 1
+(1 row)
+
+update pgq.queue set queue_ticker_max_count = 1 where queue_name = 'testqueue';
+-- register
+select pgq.register_consumer('testqueue', 'maincons');
+ register_consumer
+-------------------
+ 1
+(1 row)
+
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+ register_subconsumer
+----------------------
+ 1
+(1 row)
+
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+ register_subconsumer
+----------------------
+ 0
+(1 row)
+
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons2');
+ register_subconsumer
+----------------------
+ 1
+(1 row)
+
+-- process events
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+ next_batch
+------------
+
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev0', 'data');
+ insert_event
+--------------
+ 1
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev1', 'data');
+ insert_event
+--------------
+ 2
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev2', 'data');
+ insert_event
+--------------
+ 3
+(1 row)
+
+select pgq.ticker();
+ ticker
+--------
+ 1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+ next_batch
+------------
+ 1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+ next_batch
+------------
+ 1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+ next_batch
+------------
+
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev3', 'data');
+ insert_event
+--------------
+ 4
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev4', 'data');
+ insert_event
+--------------
+ 5
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev5', 'data');
+ insert_event
+--------------
+ 6
+(1 row)
+
+select pgq.ticker();
+ ticker
+--------
+ 1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+ next_batch
+------------
+ 2
+(1 row)
+
+select pgq_coop.finish_batch(2);
+ finish_batch
+--------------
+ 1
+(1 row)
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+ERROR: subconsumer has active batch
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 1);
+ unregister_subconsumer
+------------------------
+ 1
+(1 row)
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+ unregister_subconsumer
+------------------------
+ 0
+(1 row)
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0);
+ unregister_subconsumer
+------------------------
+ 1
+(1 row)
+
--- /dev/null
+create or replace function pgq_coop.finish_batch(
+ i_batch_id bigint)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.finish_batch(1)
+--
+-- Closes a batch.
+--
+-- Parameters:
+-- i_batch_id - id of the batch to be closed
+--
+-- Returns:
+-- 1 if success (batch was found), 0 otherwise
+-- ----------------------------------------------------------------------
+begin
+ -- we are dealing with subconsumer, so nullify all tick info
+ -- tick columns for master consumer contain adequate data
+ update pgq.subscription
+ set sub_active = now(),
+ sub_last_tick = null,
+ sub_next_tick = null,
+ sub_batch = null
+ where sub_batch = i_batch_id;
+ if not found then
+ raise warning 'coop_finish_batch: batch % not found', i_batch_id;
+ return 0;
+ else
+ return 1;
+ end if;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+create or replace function pgq_coop.next_batch(
+ i_queue_name text,
+ i_consumer_name text,
+ i_subconsumer_name text)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.next_batch(3)
+--
+-- Makes next block of events active
+--
+-- NULL means nothing to work with, for a moment
+--
+-- Parameters:
+-- i_queue_name - Name of the queue
+-- i_consumer_name - Name of the consumer
+-- i_subconsumer_name - Name of the subconsumer
+-- ----------------------------------------------------------------------
+declare
+ _queue_id integer;
+ _consumer_id integer;
+ _subcon_id integer;
+ _batch_id bigint;
+ _prev_tick bigint;
+ _cur_tick bigint;
+begin
+ -- fetch master consumer details, lock the row
+ select q.queue_id, c.co_id
+ into _queue_id, _consumer_id
+ from pgq.queue q, pgq.consumer c, pgq.subscription s
+ where q.queue_name = i_queue_name
+ and c.co_name = i_consumer_name
+ and s.sub_queue = q.queue_id
+ and s.sub_consumer = c.co_id
+ for update of s;
+ if not found then
+ raise exception 'main consumer not found';
+ end if;
+
+ -- fetch subconsumer details
+ select s.sub_batch, sc.co_id
+ into _batch_id, _subcon_id
+ from pgq.subscription s, pgq.consumer sc
+ where sub_queue = _queue_id
+ and sub_consumer = sc.co_id
+ and sc.co_name = i_consumer_name || '.' || i_subconsumer_name;
+ if not found then
+ raise exception 'subconsumer not found';
+ end if;
+
+ -- is there a batch already active
+ if _batch_id is not null then
+ return _batch_id;
+ end if;
+
+ -- get a new batch for the main consumer
+ select batch_id, cur_tick_id, prev_tick_id
+ into _batch_id, _cur_tick, _prev_tick
+ from pgq.next_batch_info(i_queue_name, i_consumer_name);
+ if _batch_id is null then
+ return null;
+ end if;
+
+ -- close batch for main consumer
+ update pgq.subscription
+ set sub_batch = null,
+ sub_active = now(),
+ sub_last_tick = sub_next_tick,
+ sub_next_tick = null
+ where sub_queue = _queue_id
+ and sub_consumer = _consumer_id;
+
+ -- copy state into subconsumer row
+ update pgq.subscription
+ set sub_batch = _batch_id,
+ sub_last_tick = _prev_tick,
+ sub_next_tick = _cur_tick,
+ sub_active = now()
+ where sub_queue = _queue_id
+ and sub_consumer = _subcon_id;
+
+ return _batch_id;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+create or replace function pgq_coop.register_subconsumer(
+ i_queue_name text,
+ i_consumer_name text,
+ i_subconsumer_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.register_subconsumer(3)
+--
+-- Subscribe a subconsumer on a queue.
+--
+-- Subconsumer will be registered as another consumer on queue,
+-- whose name will be i_consumer_name and i_subconsumer_name
+-- combined.
+--
+-- Returns:
+-- 0 - if already registered
+-- 1 - if this is a new registration
+-- ----------------------------------------------------------------------
+declare
+ _subcon_name text; -- consumer + subconsumer
+ _queue_id integer;
+ _consumer_id integer;
+ _subcon_id integer;
+ _consumer_sub_id integer;
+ _subcon_result integer;
+ r record;
+begin
+ _subcon_name := i_consumer_name || '.' || i_subconsumer_name;
+ -- er... shouldn't we lock the subscription table?
+
+ -- just go and register the subconsumer as a regular consumer
+ _subcon_result := pgq.register_consumer(i_queue_name, _subcon_name);
+
+ -- if it is a new registration
+ if _subcon_result = 1 then
+ select q.queue_id, mainc.co_id as main_consumer_id,
+ s.sub_id as main_consumer_sub_id,
+ subc.co_id as sub_consumer_id
+ into r
+ from pgq.queue q, pgq.subscription s, pgq.consumer mainc, pgq.consumer subc
+ where mainc.co_name = i_consumer_name
+ and subc.co_name = _subcon_name
+ and q.queue_name = i_queue_name
+ and s.sub_queue = q.queue_id
+ and s.sub_consumer = mainc.co_id;
+ if not found then
+ raise exception 'main consumer not found';
+ end if;
+
+ -- duplicate the sub_id of consumer to the subconsumer
+ update pgq.subscription s
+ set sub_id = r.main_consumer_sub_id,
+ sub_last_tick = null,
+ sub_next_tick = null
+ where sub_queue = r.queue_id
+ and sub_consumer = r.sub_consumer_id;
+ end if;
+
+ return _subcon_result;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+create or replace function pgq_coop.unregister_subconsumer(
+ i_queue_name text,
+ i_consumer_name text,
+ i_subconsumer_name text,
+ i_batch_handling integer)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.unregister_subconsumer(4)
+--
+-- Unregisters subconsumer from the queue.
+--
+-- If consumer has active batch, then behviour depends on
+-- i_batch_handling parameter.
+--
+-- Values for i_batch_handling:
+-- 0 - Fail with an exception.
+-- 1 - Close the batch, ignoring the events.
+--
+-- Returns:
+-- nothing?
+-- ----------------------------------------------------------------------
+declare
+ _current_batch bigint;
+ _queue_id integer;
+ _subcon_id integer;
+begin
+ select q.queue_id, c.co_id, sub_batch
+ into _queue_id, _subcon_id, _current_batch
+ from pgq.queue q, pgq.consumer c, pgq.subscription s
+ where c.co_name = i_consumer_name || '.' || i_subconsumer_name
+ and q.queue_name = i_queue_name
+ and s.sub_queue = q.queue_id
+ and s.sub_consumer = c.co_id;
+ if not found then
+ return 0;
+ end if;
+
+ if _current_batch is not null then
+ if i_batch_handling = 1 then
+ -- ignore active batch
+ else
+ raise exception 'subconsumer has active batch';
+ end if;
+ end if;
+
+ delete from pgq.subscription
+ where sub_queue = _queue_id
+ and sub_consumer = _subcon_id;
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
--- /dev/null
+
+\set ECHO none
+\i ../pgq/pgq.sql
+\i structure/schema.sql
+\i structure/functions.sql
+\set ECHO all
+
+select pgq.create_queue('testqueue');
+update pgq.queue set queue_ticker_max_count = 1 where queue_name = 'testqueue';
+
+-- register
+select pgq.register_consumer('testqueue', 'maincons');
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons2');
+
+-- process events
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+select pgq.insert_event('testqueue', 'ev0', 'data');
+select pgq.insert_event('testqueue', 'ev1', 'data');
+select pgq.insert_event('testqueue', 'ev2', 'data');
+select pgq.ticker();
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+
+select pgq.insert_event('testqueue', 'ev3', 'data');
+select pgq.insert_event('testqueue', 'ev4', 'data');
+select pgq.insert_event('testqueue', 'ev5', 'data');
+select pgq.ticker();
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+
+select pgq_coop.finish_batch(2);
+
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 1);
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0);
+
--- /dev/null
+-- Section: Functions
+
+-- Group: Subconsumer registration
+\i functions/pgq_coop.register_subconsumer.sql
+\i functions/pgq_coop.unregister_subconsumer.sql
+
+-- Group: Event processing
+\i functions/pgq_coop.next_batch.sql
+\i functions/pgq_coop.finish_batch.sql
+
--- /dev/null
+
+create schema pgq_coop;
+