sql/pgq_coop: Cooperative consuming
authorMarko Kreen <markokr@gmail.com>
Wed, 8 Apr 2009 12:40:29 +0000 (15:40 +0300)
committerMarko Kreen <markokr@gmail.com>
Wed, 8 Apr 2009 16:42:31 +0000 (19:42 +0300)
This module allows several consumers (subconsumers)
to share events between them so each subconsumers
sees only part of .

Implementation uses design described here:

 http://lists.pgfoundry.org/pipermail/skytools-users/2008-July/000653.html
 http://lists.pgfoundry.org/pipermail/skytools-users/2008-July/000671.html

Except instead <group>.<consumer> terminology <consumer>.<subconsumer>
is used.  Having 'consumer' mean different things seemed confusing.

Implemented by Asko Tiidumaa

doc/Makefile
sql/Makefile
sql/pgq_coop/Makefile [new file with mode: 0644]
sql/pgq_coop/expected/pgq_coop_test.out [new file with mode: 0644]
sql/pgq_coop/functions/pgq_coop.finish_batch.sql [new file with mode: 0644]
sql/pgq_coop/functions/pgq_coop.next_batch.sql [new file with mode: 0644]
sql/pgq_coop/functions/pgq_coop.register_subconsumer.sql [new file with mode: 0644]
sql/pgq_coop/functions/pgq_coop.unregister_subconsumer.sql [new file with mode: 0644]
sql/pgq_coop/sql/pgq_coop_test.sql [new file with mode: 0644]
sql/pgq_coop/structure/functions.sql [new file with mode: 0644]
sql/pgq_coop/structure/schema.sql [new file with mode: 0644]

index aa599b6841197aa6c28e19d3280b406bcd5808f5..029bbe5f41b3c3ad1d75badf3e97b4c1df71af67 100644 (file)
@@ -70,6 +70,8 @@ apiupload: apidoc
        rsync -rtlz api/* $(web)/api
        cd ../sql/pgq && rm -rf docs/html && $(MAKE) dox
        rsync -rtlz ../sql/pgq/docs/html/* $(web)/pgq/
+       cd ../sql/pgq_coop && rm -rf docs/html && $(MAKE) dox
+       rsync -rtlz ../sql/pgq_coop/docs/html/* $(web)/pgq_coop/
        cd ../sql/pgq_node && rm -rf docs/html && $(MAKE) dox
        rsync -rtlz ../sql/pgq_node/docs/html/* $(web)/pgq_node/
        cd ../sql/londiste && rm -rf docs/html && $(MAKE) dox
index ed74b6461f45c15f4b3dfa12f2aa0d5f8185fba6..d7241e8d28ed98c0929993901a1c0df5d6e26a52 100644 (file)
@@ -1,7 +1,7 @@
 
 include ../config.mak
 
-SUBDIRS = londiste pgq pgq_ext pgq_node txid
+SUBDIRS = londiste pgq pgq_coop pgq_ext pgq_node txid
 
 all install clean distclean installcheck:
        for dir in $(SUBDIRS); do \
diff --git a/sql/pgq_coop/Makefile b/sql/pgq_coop/Makefile
new file mode 100644 (file)
index 0000000..5602d20
--- /dev/null
@@ -0,0 +1,56 @@
+
+DATA_built = pgq_coop.sql pgq_coop.upgrade.sql
+
+SQL_FULL = structure/schema.sql structure/functions.sql
+SQL_UPGRADE = structure/functions.sql
+
+FUNCS = $(shell sed -n -e '/^\\/{s/\\i //;p}' structure/functions.sql)
+SRCS = $(SQL_FULL) $(FUNCS)
+
+REGRESS = pgq_coop_test
+REGRESS_OPTS = --load-language=plpgsql
+
+include ../../config.mak
+include $(PGXS)
+
+NDOC = NaturalDocs
+NDOCARGS = -r -o html docs/html -p docs -i docs/sql
+CATSQL = ../../scripts/catsql.py
+
+#
+# combined SQL files
+#
+
+pgq_coop.sql: $(SRCS)
+       $(CATSQL) $(SQL_FULL) > $@
+
+pgq_coop.upgrade.sql: $(SRCS)
+       $(CATSQL) $(SQL_UPGRADE) > $@
+
+#
+# docs
+#
+dox: cleandox $(SRCS)
+       mkdir -p docs/html
+       mkdir -p docs/sql
+       $(CATSQL) structure/functions.sql > docs/sql/functions.sql
+       $(NDOC) $(NDOCARGS)
+
+cleandox:
+       rm -rf docs/html docs/Data docs/sql
+
+clean: cleandox
+
+upload: dox
+       rsync -az --delete docs/html/* data1:public_html/pgq-set/
+
+#
+# regtest shortcuts
+#
+
+test: pgq_coop.sql
+       $(MAKE) installcheck || { less regression.diffs; exit 1; }
+
+ack:
+       cp results/*.out expected/
+
diff --git a/sql/pgq_coop/expected/pgq_coop_test.out b/sql/pgq_coop/expected/pgq_coop_test.out
new file mode 100644 (file)
index 0000000..09fef29
--- /dev/null
@@ -0,0 +1,138 @@
+\set ECHO none
+select pgq.create_queue('testqueue');
+ create_queue 
+--------------
+            1
+(1 row)
+
+update pgq.queue set queue_ticker_max_count = 1 where queue_name = 'testqueue';
+-- register
+select pgq.register_consumer('testqueue', 'maincons');
+ register_consumer 
+-------------------
+                 1
+(1 row)
+
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+ register_subconsumer 
+----------------------
+                    1
+(1 row)
+
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+ register_subconsumer 
+----------------------
+                    0
+(1 row)
+
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons2');
+ register_subconsumer 
+----------------------
+                    1
+(1 row)
+
+-- process events
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+ next_batch 
+------------
+           
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev0', 'data');
+ insert_event 
+--------------
+            1
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev1', 'data');
+ insert_event 
+--------------
+            2
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev2', 'data');
+ insert_event 
+--------------
+            3
+(1 row)
+
+select pgq.ticker();
+ ticker 
+--------
+      1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+ next_batch 
+------------
+          1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+ next_batch 
+------------
+          1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+ next_batch 
+------------
+           
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev3', 'data');
+ insert_event 
+--------------
+            4
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev4', 'data');
+ insert_event 
+--------------
+            5
+(1 row)
+
+select pgq.insert_event('testqueue', 'ev5', 'data');
+ insert_event 
+--------------
+            6
+(1 row)
+
+select pgq.ticker();
+ ticker 
+--------
+      1
+(1 row)
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+ next_batch 
+------------
+          2
+(1 row)
+
+select pgq_coop.finish_batch(2);
+ finish_batch 
+--------------
+            1
+(1 row)
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+ERROR:  subconsumer has active batch
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 1);
+ unregister_subconsumer 
+------------------------
+                      1
+(1 row)
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+ unregister_subconsumer 
+------------------------
+                      0
+(1 row)
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0);
+ unregister_subconsumer 
+------------------------
+                      1
+(1 row)
+
diff --git a/sql/pgq_coop/functions/pgq_coop.finish_batch.sql b/sql/pgq_coop/functions/pgq_coop.finish_batch.sql
new file mode 100644 (file)
index 0000000..bfc7cb5
--- /dev/null
@@ -0,0 +1,32 @@
+create or replace function pgq_coop.finish_batch(
+    i_batch_id bigint)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.finish_batch(1)
+--
+--     Closes a batch.
+--
+-- Parameters:
+--     i_batch_id      - id of the batch to be closed
+--
+-- Returns:
+--     1 if success (batch was found), 0 otherwise
+-- ----------------------------------------------------------------------
+begin
+    -- we are dealing with subconsumer, so nullify all tick info
+    -- tick columns for master consumer contain adequate data
+    update pgq.subscription
+       set sub_active = now(),
+           sub_last_tick = null,
+           sub_next_tick = null,
+           sub_batch = null
+     where sub_batch = i_batch_id;
+    if not found then
+        raise warning 'coop_finish_batch: batch % not found', i_batch_id;
+        return 0;
+    else
+        return 1;
+    end if;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_coop/functions/pgq_coop.next_batch.sql b/sql/pgq_coop/functions/pgq_coop.next_batch.sql
new file mode 100644 (file)
index 0000000..dc8ca00
--- /dev/null
@@ -0,0 +1,84 @@
+create or replace function pgq_coop.next_batch(
+    i_queue_name text,
+    i_consumer_name text,
+    i_subconsumer_name text)
+returns bigint as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.next_batch(3)
+--
+--     Makes next block of events active
+--
+--     NULL means nothing to work with, for a moment
+--
+-- Parameters:
+--     i_queue_name            - Name of the queue
+--     i_consumer_name         - Name of the consumer
+--     i_subconsumer_name      - Name of the subconsumer
+-- ----------------------------------------------------------------------
+declare
+    _queue_id integer;
+    _consumer_id integer;
+    _subcon_id integer;
+    _batch_id bigint;
+    _prev_tick bigint;
+    _cur_tick bigint;
+begin
+    -- fetch master consumer details, lock the row
+    select q.queue_id, c.co_id
+        into _queue_id, _consumer_id
+        from pgq.queue q, pgq.consumer c, pgq.subscription s
+        where q.queue_name = i_queue_name
+          and c.co_name = i_consumer_name
+          and s.sub_queue = q.queue_id
+          and s.sub_consumer = c.co_id
+        for update of s;
+    if not found then
+        raise exception 'main consumer not found';
+    end if;
+
+    -- fetch subconsumer details
+    select s.sub_batch, sc.co_id
+        into _batch_id, _subcon_id
+        from pgq.subscription s, pgq.consumer sc
+        where sub_queue = _queue_id
+          and sub_consumer = sc.co_id
+          and sc.co_name = i_consumer_name || '.' || i_subconsumer_name;
+    if not found then
+        raise exception 'subconsumer not found';
+    end if;
+
+    -- is there a batch already active
+    if _batch_id is not null then
+        return _batch_id;
+    end if;
+
+    -- get a new batch for the main consumer
+    select batch_id, cur_tick_id, prev_tick_id
+        into _batch_id, _cur_tick, _prev_tick
+        from pgq.next_batch_info(i_queue_name, i_consumer_name);
+    if _batch_id is null then
+        return null;
+    end if;
+
+    -- close batch for main consumer
+    update pgq.subscription
+       set sub_batch = null,
+           sub_active = now(),
+           sub_last_tick = sub_next_tick,
+           sub_next_tick = null
+     where sub_queue = _queue_id
+       and sub_consumer = _consumer_id;
+
+    -- copy state into subconsumer row
+    update pgq.subscription
+        set sub_batch = _batch_id,
+            sub_last_tick = _prev_tick,
+            sub_next_tick = _cur_tick,
+            sub_active = now()
+        where sub_queue = _queue_id
+          and sub_consumer = _subcon_id;
+
+    return _batch_id;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_coop/functions/pgq_coop.register_subconsumer.sql b/sql/pgq_coop/functions/pgq_coop.register_subconsumer.sql
new file mode 100644 (file)
index 0000000..422e2a5
--- /dev/null
@@ -0,0 +1,62 @@
+create or replace function pgq_coop.register_subconsumer(
+    i_queue_name text,
+    i_consumer_name text,
+    i_subconsumer_name text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.register_subconsumer(3)
+--
+--     Subscribe a subconsumer on a queue.
+--
+--      Subconsumer will be registered as another consumer on queue,
+--      whose name will be i_consumer_name and i_subconsumer_name
+--      combined.
+--
+-- Returns:
+--     0 - if already registered
+--     1 - if this is a new registration
+-- ----------------------------------------------------------------------
+declare
+    _subcon_name text; -- consumer + subconsumer
+    _queue_id integer;
+    _consumer_id integer;
+    _subcon_id integer;
+    _consumer_sub_id integer;
+    _subcon_result integer;
+    r record;
+begin
+    _subcon_name := i_consumer_name || '.' || i_subconsumer_name;
+    -- er... shouldn't we lock the subscription table?
+
+    -- just go and register the subconsumer as a regular consumer
+    _subcon_result := pgq.register_consumer(i_queue_name, _subcon_name);
+
+    -- if it is a new registration
+    if _subcon_result = 1 then
+        select q.queue_id, mainc.co_id as main_consumer_id,
+               s.sub_id as main_consumer_sub_id,
+               subc.co_id as sub_consumer_id
+            into r
+            from pgq.queue q, pgq.subscription s, pgq.consumer mainc, pgq.consumer subc
+            where mainc.co_name = i_consumer_name
+              and subc.co_name = _subcon_name
+              and q.queue_name = i_queue_name
+              and s.sub_queue = q.queue_id
+              and s.sub_consumer = mainc.co_id;
+        if not found then
+            raise exception 'main consumer not found';
+        end if;
+
+        -- duplicate the sub_id of consumer to the subconsumer
+        update pgq.subscription s
+            set sub_id = r.main_consumer_sub_id,
+                sub_last_tick = null,
+                sub_next_tick = null
+            where sub_queue = r.queue_id
+              and sub_consumer = r.sub_consumer_id;
+    end if;
+
+    return _subcon_result;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_coop/functions/pgq_coop.unregister_subconsumer.sql b/sql/pgq_coop/functions/pgq_coop.unregister_subconsumer.sql
new file mode 100644 (file)
index 0000000..3e95f07
--- /dev/null
@@ -0,0 +1,53 @@
+create or replace function pgq_coop.unregister_subconsumer(
+    i_queue_name text,
+    i_consumer_name text,
+    i_subconsumer_name text,
+    i_batch_handling integer)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_coop.unregister_subconsumer(4)
+--
+--      Unregisters subconsumer from the queue.
+--
+--      If consumer has active batch, then behviour depends on
+--      i_batch_handling parameter.
+--
+-- Values for i_batch_handling:
+--      0 - Fail with an exception.
+--      1 - Close the batch, ignoring the events.
+--
+-- Returns:
+--     nothing?
+-- ----------------------------------------------------------------------
+declare
+    _current_batch bigint;
+    _queue_id integer;
+    _subcon_id integer;
+begin
+    select q.queue_id, c.co_id, sub_batch
+        into _queue_id, _subcon_id, _current_batch
+        from pgq.queue q, pgq.consumer c, pgq.subscription s
+        where c.co_name = i_consumer_name || '.' || i_subconsumer_name
+          and q.queue_name = i_queue_name
+          and s.sub_queue = q.queue_id
+          and s.sub_consumer = c.co_id;
+    if not found then
+        return 0;
+    end if;
+
+    if _current_batch is not null then
+        if i_batch_handling = 1 then
+            -- ignore active batch
+        else
+            raise exception 'subconsumer has active batch';
+        end if;
+    end if;
+
+    delete from pgq.subscription
+        where sub_queue = _queue_id
+          and sub_consumer = _subcon_id;
+
+    return 1;
+end;
+$$ language plpgsql security definer;
+
diff --git a/sql/pgq_coop/sql/pgq_coop_test.sql b/sql/pgq_coop/sql/pgq_coop_test.sql
new file mode 100644 (file)
index 0000000..8323caf
--- /dev/null
@@ -0,0 +1,42 @@
+
+\set ECHO none
+\i ../pgq/pgq.sql
+\i structure/schema.sql
+\i structure/functions.sql
+\set ECHO all
+
+select pgq.create_queue('testqueue');
+update pgq.queue set queue_ticker_max_count = 1 where queue_name = 'testqueue';
+
+-- register
+select pgq.register_consumer('testqueue', 'maincons');
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1');
+select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons2');
+
+-- process events
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+select pgq.insert_event('testqueue', 'ev0', 'data');
+select pgq.insert_event('testqueue', 'ev1', 'data');
+select pgq.insert_event('testqueue', 'ev2', 'data');
+select pgq.ticker();
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1');
+
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+
+select pgq.insert_event('testqueue', 'ev3', 'data');
+select pgq.insert_event('testqueue', 'ev4', 'data');
+select pgq.insert_event('testqueue', 'ev5', 'data');
+select pgq.ticker();
+select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2');
+
+select pgq_coop.finish_batch(2);
+
+
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 1);
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0);
+select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0);
+
diff --git a/sql/pgq_coop/structure/functions.sql b/sql/pgq_coop/structure/functions.sql
new file mode 100644 (file)
index 0000000..e23c818
--- /dev/null
@@ -0,0 +1,10 @@
+-- Section: Functions
+
+-- Group: Subconsumer registration
+\i functions/pgq_coop.register_subconsumer.sql
+\i functions/pgq_coop.unregister_subconsumer.sql
+
+-- Group: Event processing
+\i functions/pgq_coop.next_batch.sql
+\i functions/pgq_coop.finish_batch.sql
+
diff --git a/sql/pgq_coop/structure/schema.sql b/sql/pgq_coop/structure/schema.sql
new file mode 100644 (file)
index 0000000..c62db54
--- /dev/null
@@ -0,0 +1,3 @@
+
+create schema pgq_coop;
+