pgq_ext: support coop consumers
authorMarko Kreen <markokr@gmail.com>
Wed, 14 Dec 2011 20:13:11 +0000 (22:13 +0200)
committerMarko Kreen <markokr@gmail.com>
Wed, 14 Dec 2011 21:25:50 +0000 (23:25 +0200)
Add subconsumer_id to all tables and function params.

14 files changed:
sql/pgq_ext/Makefile
sql/pgq_ext/expected/test_pgq_ext.out
sql/pgq_ext/expected/test_upgrade.out [new file with mode: 0644]
sql/pgq_ext/functions/pgq_ext.get_last_tick.sql
sql/pgq_ext/functions/pgq_ext.is_batch_done.sql
sql/pgq_ext/functions/pgq_ext.is_event_done.sql
sql/pgq_ext/functions/pgq_ext.set_batch_done.sql
sql/pgq_ext/functions/pgq_ext.set_event_done.sql
sql/pgq_ext/functions/pgq_ext.set_last_tick.sql
sql/pgq_ext/functions/pgq_ext.upgrade_schema.sql [new file with mode: 0644]
sql/pgq_ext/sql/old_ext.sql [new file with mode: 0644]
sql/pgq_ext/sql/test_upgrade.sql [new file with mode: 0644]
sql/pgq_ext/structure/tables.sql
sql/pgq_ext/structure/upgrade.sql

index e3aca0d9c7bd2acd86d33739faf73e9442802827..f123bcb0df7dcb836350a0f62332d397021c7228 100644 (file)
@@ -4,7 +4,7 @@ DATA_built = pgq_ext.sql pgq_ext.upgrade.sql
 
 SRCS = $(wildcard functions/*.sql structure/*.sql)
 
-REGRESS = test_pgq_ext
+REGRESS = test_pgq_ext test_upgrade
 REGRESS_OPTS = --load-language=plpgsql
 
 CATSQL = ../../scripts/catsql.py
index 1f7bf26fee9a2264fbe413956768f5de9956176f..ad1ea437a88c30b606ffaee98003ffc460862742 100644 (file)
@@ -1,4 +1,9 @@
 \set ECHO off
+ upgrade_schema 
+----------------
+              0
+(1 row)
+
 --
 -- test batch tracking
 --
@@ -78,9 +83,9 @@ select pgq_ext.set_batch_done('c', 3);
 (1 row)
 
 select * from pgq_ext.completed_event order by 1,2;
- consumer_id | batch_id | event_id 
--------------+----------+----------
- c           |        3 |      101
+ consumer_id | subconsumer_id | batch_id | event_id 
+-------------+----------------+----------+----------
+ c           |                |        3 |      101
 (1 row)
 
 --
diff --git a/sql/pgq_ext/expected/test_upgrade.out b/sql/pgq_ext/expected/test_upgrade.out
new file mode 100644 (file)
index 0000000..254b3d3
--- /dev/null
@@ -0,0 +1,135 @@
+\set ECHO off
+ upgrade_schema 
+----------------
+              4
+(1 row)
+
+--
+-- test batch tracking
+--
+select pgq_ext.is_batch_done('c', 1);
+ is_batch_done 
+---------------
+ f
+(1 row)
+
+select pgq_ext.set_batch_done('c', 1);
+ set_batch_done 
+----------------
+ t
+(1 row)
+
+select pgq_ext.is_batch_done('c', 1);
+ is_batch_done 
+---------------
+ t
+(1 row)
+
+select pgq_ext.set_batch_done('c', 1);
+ set_batch_done 
+----------------
+ f
+(1 row)
+
+select pgq_ext.is_batch_done('c', 2);
+ is_batch_done 
+---------------
+ f
+(1 row)
+
+select pgq_ext.set_batch_done('c', 2);
+ set_batch_done 
+----------------
+ t
+(1 row)
+
+--
+-- test event tracking
+--
+select pgq_ext.is_batch_done('c', 3);
+ is_batch_done 
+---------------
+ f
+(1 row)
+
+select pgq_ext.is_event_done('c', 3, 101);
+ is_event_done 
+---------------
+ f
+(1 row)
+
+select pgq_ext.set_event_done('c', 3, 101);
+ set_event_done 
+----------------
+ t
+(1 row)
+
+select pgq_ext.is_event_done('c', 3, 101);
+ is_event_done 
+---------------
+ t
+(1 row)
+
+select pgq_ext.set_event_done('c', 3, 101);
+ set_event_done 
+----------------
+ f
+(1 row)
+
+select pgq_ext.set_batch_done('c', 3);
+ set_batch_done 
+----------------
+ t
+(1 row)
+
+select * from pgq_ext.completed_event order by 1,2;
+ consumer_id | batch_id | event_id | subconsumer_id 
+-------------+----------+----------+----------------
+ c           |        3 |      101 | 
+(1 row)
+
+--
+-- test tick tracking
+--
+select pgq_ext.get_last_tick('c');
+ get_last_tick 
+---------------
+              
+(1 row)
+
+select pgq_ext.set_last_tick('c', 1);
+ set_last_tick 
+---------------
+             1
+(1 row)
+
+select pgq_ext.get_last_tick('c');
+ get_last_tick 
+---------------
+             1
+(1 row)
+
+select pgq_ext.set_last_tick('c', 2);
+ set_last_tick 
+---------------
+             1
+(1 row)
+
+select pgq_ext.get_last_tick('c');
+ get_last_tick 
+---------------
+             2
+(1 row)
+
+select pgq_ext.set_last_tick('c', NULL);
+ set_last_tick 
+---------------
+             1
+(1 row)
+
+select pgq_ext.get_last_tick('c');
+ get_last_tick 
+---------------
+              
+(1 row)
+
index 35e2a8903efb3830c74cf65c9f83349a32d4e07e..ed6e7acf6b2d32734b5f40552bd181c5cf4a17ff 100644 (file)
@@ -1,13 +1,21 @@
 
-create or replace function pgq_ext.get_last_tick(a_consumer text)
+create or replace function pgq_ext.get_last_tick(a_consumer text, a_subconsumer text)
 returns int8 as $$
 declare
     res   int8;
 begin
     select last_tick_id into res
       from pgq_ext.completed_tick
-     where consumer_id = a_consumer;
+     where consumer_id = a_consumer
+       and subconsumer_id = a_subconsumer;
     return res;
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_ext.get_last_tick(a_consumer text)
+returns int8 as $$
+begin
+    return pgq_ext.get_last_tick(a_consumer, '');
+end;
+$$ language plpgsql;
+
index 7b1ae624d795fd7d5122f0b0f55765cc376d3a5f..70bc3bde72864aa4dae0061605f6dab0132af5a4 100644 (file)
@@ -1,13 +1,16 @@
 
 create or replace function pgq_ext.is_batch_done(
-    a_consumer text, a_batch_id bigint)
+    a_consumer text,
+    a_subconsumer text,
+    a_batch_id bigint)
 returns boolean as $$
 declare
     res   boolean;
 begin
     select last_batch_id = a_batch_id
       into res from pgq_ext.completed_batch
-     where consumer_id = a_consumer;
+     where consumer_id = a_consumer
+       and subconsumer_id = a_subconsumer;
     if not found then
         return false;
     end if;
@@ -15,3 +18,12 @@ begin
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_ext.is_batch_done(
+    a_consumer text,
+    a_batch_id bigint)
+returns boolean as $$
+begin
+    return pgq_ext.is_batch_done(a_consumer, '', a_batch_id);
+end;
+$$ language plpgsql;
+
index 3d1ad8d9d515a6f9387b48092c9fab154d666d4c..adbf08167539bc39c4068a3299995c40180f484f 100644 (file)
@@ -1,16 +1,29 @@
 
 create or replace function pgq_ext.is_event_done(
     a_consumer text,
-    a_batch_id bigint, a_event_id bigint)
+    a_subconsumer text,
+    a_batch_id bigint,
+    a_event_id bigint)
 returns boolean as $$
 declare
     res   bigint;
 begin
     perform 1 from pgq_ext.completed_event
      where consumer_id = a_consumer
+       and subconsumer_id = a_subconsumer
        and batch_id = a_batch_id
        and event_id = a_event_id;
     return found;
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_ext.is_event_done(
+    a_consumer text,
+    a_batch_id bigint,
+    a_event_id bigint)
+returns boolean as $$
+begin
+    return pgq_ext.is_event_done(a_consumer, '', a_batch_id, a_event_id);
+end;
+$$ language plpgsql;
+
index 294ff76a70f23aadc88bebfdb80025c563e9e95b..0952c320e17bff9c01a8bb8d9347a10a8541e484 100644 (file)
@@ -1,19 +1,22 @@
 
 create or replace function pgq_ext.set_batch_done(
-    a_consumer text, a_batch_id bigint)
+    a_consumer text,
+    a_subconsumer text,
+    a_batch_id bigint)
 returns boolean as $$
 begin
-    if pgq_ext.is_batch_done(a_consumer, a_batch_id) then
+    if pgq_ext.is_batch_done(a_consumer, a_subconsumer, a_batch_id) then
         return false;
     end if;
 
     if a_batch_id > 0 then
         update pgq_ext.completed_batch
            set last_batch_id = a_batch_id
-         where consumer_id = a_consumer;
+         where consumer_id = a_consumer
+           and subconsumer_id = a_subconsumer;
         if not found then
-            insert into pgq_ext.completed_batch (consumer_id, last_batch_id)
-                values (a_consumer, a_batch_id);
+            insert into pgq_ext.completed_batch (consumer_id, subconsumer_id, last_batch_id)
+                values (a_consumer, a_subconsumer, a_batch_id);
         end if;
     end if;
 
@@ -21,3 +24,12 @@ begin
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_ext.set_batch_done(
+    a_consumer text,
+    a_batch_id bigint)
+returns boolean as $$
+begin
+    return pgq_ext.set_batch_done(a_consumer, '', a_batch_id);
+end;
+$$ language plpgsql;
+
index fe3afcb3c67349ad91844b912c1ee93d37259130..9fb276e85aedeaf307c06a81b4f7794cc24ccd2d 100644 (file)
@@ -1,6 +1,9 @@
 
 create or replace function pgq_ext.set_event_done(
-    a_consumer text, a_batch_id bigint, a_event_id bigint)
+    a_consumer text,
+    a_subconsumer text,
+    a_batch_id bigint,
+    a_event_id bigint)
 returns boolean as $$
 declare
     old_batch bigint;
@@ -8,6 +11,7 @@ begin
     -- check if done
     perform 1 from pgq_ext.completed_event
      where consumer_id = a_consumer
+       and subconsumer_id = a_subconsumer
        and batch_id = a_batch_id
        and event_id = a_event_id;
     if found then
@@ -17,29 +21,43 @@ begin
     -- if batch changed, do cleanup
     select cur_batch_id into old_batch
         from pgq_ext.partial_batch
-        where consumer_id = a_consumer;
+        where consumer_id = a_consumer
+          and subconsumer_id = a_subconsumer;
     if not found then
         -- first time here
         insert into pgq_ext.partial_batch
-            (consumer_id, cur_batch_id)
-            values (a_consumer, a_batch_id);
+            (consumer_id, subconsumer_id, cur_batch_id)
+            values (a_consumer, a_subconsumer, a_batch_id);
     elsif old_batch <> a_batch_id then
         -- batch changed, that means old is finished on queue db
         -- thus the tagged events are not needed anymore
         delete from pgq_ext.completed_event
             where consumer_id = a_consumer
+              and subconsumer_id = a_subconsumer
               and batch_id = old_batch;
         -- remember current one
         update pgq_ext.partial_batch
             set cur_batch_id = a_batch_id
-            where consumer_id = a_consumer;
+            where consumer_id = a_consumer
+              and subconsumer_id = a_subconsumer;
     end if;
 
     -- tag as done
-    insert into pgq_ext.completed_event (consumer_id, batch_id, event_id)
-      values (a_consumer, a_batch_id, a_event_id);
+    insert into pgq_ext.completed_event
+        (consumer_id, subconsumer_id, batch_id, event_id)
+        values (a_consumer, a_subconsumer, a_batch_id, a_event_id);
 
     return true;
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_ext.set_event_done(
+    a_consumer text,
+    a_batch_id bigint,
+    a_event_id bigint)
+returns boolean as $$
+begin
+    return pgq_ext.set_event_done(a_consumer, '', a_batch_id, a_event_id);
+end;
+$$ language plpgsql;
+
index 5e62c3137c9a6f300373ec5477d3816bb6c34ca8..c0ca059d07e4a6450b0f0cd772e0239474ce4b79 100644 (file)
@@ -1,17 +1,23 @@
 
-create or replace function pgq_ext.set_last_tick(a_consumer text, a_tick_id bigint)
+create or replace function pgq_ext.set_last_tick(
+    a_consumer text,
+    a_subconsumer text,
+    a_tick_id bigint)
 returns integer as $$
 begin
     if a_tick_id is null then
         delete from pgq_ext.completed_tick
-         where consumer_id = a_consumer;
+         where consumer_id = a_consumer
+           and subconsumer_id = a_subconsumer;
     else   
         update pgq_ext.completed_tick
            set last_tick_id = a_tick_id
-         where consumer_id = a_consumer;
+         where consumer_id = a_consumer
+           and subconsumer_id = a_subconsumer;
         if not found then
-            insert into pgq_ext.completed_tick (consumer_id, last_tick_id)
-                values (a_consumer, a_tick_id);
+            insert into pgq_ext.completed_tick
+                (consumer_id, subconsumer_id, last_tick_id)
+                values (a_consumer, a_subconsumer, a_tick_id);
         end if;
     end if;
 
@@ -19,3 +25,12 @@ begin
 end;
 $$ language plpgsql security definer;
 
+create or replace function pgq_ext.set_last_tick(
+    a_consumer text,
+    a_tick_id bigint)
+returns integer as $$
+begin
+    return pgq_ext.set_last_tick(a_consumer, '', a_tick_id);
+end;
+$$ language plpgsql;
+
diff --git a/sql/pgq_ext/functions/pgq_ext.upgrade_schema.sql b/sql/pgq_ext/functions/pgq_ext.upgrade_schema.sql
new file mode 100644 (file)
index 0000000..1fa9fe4
--- /dev/null
@@ -0,0 +1,92 @@
+
+create or replace function pgq_ext.upgrade_schema()
+returns int4 as $$
+-- updates table structure if necessary
+declare
+    cnt int4 = 0;
+begin
+    -- pgq_ext.completed_batch: subconsumer_id
+    perform 1 from information_schema.columns
+      where table_schema = 'pgq_ext'
+        and table_name = 'completed_batch'
+        and column_name = 'subconsumer_id';
+    if not found then
+        alter table pgq_ext.completed_batch
+            add column subconsumer_id text;
+        update pgq_ext.completed_batch
+            set subconsumer_id = '';
+        alter table pgq_ext.completed_batch
+            alter column subconsumer_id set not null;
+        alter table pgq_ext.completed_batch
+            drop constraint completed_batch_pkey;
+        alter table pgq_ext.completed_batch
+            add constraint completed_batch_pkey
+            primary key (consumer_id, subconsumer_id);
+        cnt := cnt + 1;
+    end if;
+
+    -- pgq_ext.completed_tick: subconsumer_id
+    perform 1 from information_schema.columns
+      where table_schema = 'pgq_ext'
+        and table_name = 'completed_tick'
+        and column_name = 'subconsumer_id';
+    if not found then
+        alter table pgq_ext.completed_tick
+            add column subconsumer_id text;
+        update pgq_ext.completed_tick
+            set subconsumer_id = '';
+        alter table pgq_ext.completed_tick
+            alter column subconsumer_id set not null;
+        alter table pgq_ext.completed_tick
+            drop constraint completed_tick_pkey;
+        alter table pgq_ext.completed_tick
+            add constraint completed_tick_pkey
+            primary key (consumer_id, subconsumer_id);
+        cnt := cnt + 1;
+    end if;
+
+    -- pgq_ext.partial_batch: subconsumer_id
+    perform 1 from information_schema.columns
+      where table_schema = 'pgq_ext'
+        and table_name = 'partial_batch'
+        and column_name = 'subconsumer_id';
+    if not found then
+        alter table pgq_ext.partial_batch
+            add column subconsumer_id text;
+        update pgq_ext.partial_batch
+            set subconsumer_id = '';
+        alter table pgq_ext.partial_batch
+            alter column subconsumer_id set not null;
+        alter table pgq_ext.partial_batch
+            drop constraint partial_batch_pkey;
+        alter table pgq_ext.partial_batch
+            add constraint partial_batch_pkey
+            primary key (consumer_id, subconsumer_id);
+        cnt := cnt + 1;
+    end if;
+
+    -- pgq_ext.completed_event: subconsumer_id
+    perform 1 from information_schema.columns
+      where table_schema = 'pgq_ext'
+        and table_name = 'completed_event'
+        and column_name = 'subconsumer_id';
+    if not found then
+        alter table pgq_ext.completed_event
+            add column subconsumer_id text;
+        update pgq_ext.completed_event
+            set subconsumer_id = '';
+        alter table pgq_ext.completed_event
+            alter column subconsumer_id set not null;
+        alter table pgq_ext.completed_event
+            drop constraint completed_event_pkey;
+        alter table pgq_ext.completed_event
+            add constraint completed_event_pkey
+            primary key (consumer_id, subconsumer_id, batch_id, event_id);
+        cnt := cnt + 1;
+    end if;
+
+    return cnt;
+end;
+$$ language plpgsql;
+
+
diff --git a/sql/pgq_ext/sql/old_ext.sql b/sql/pgq_ext/sql/old_ext.sql
new file mode 100644 (file)
index 0000000..e8368b3
--- /dev/null
@@ -0,0 +1,204 @@
+
+
+
+set client_min_messages = 'warning';
+set default_with_oids = 'off';
+
+create schema pgq_ext;
+grant usage on schema pgq_ext to public;
+
+
+--
+-- batch tracking
+--
+create table pgq_ext.completed_batch (
+    consumer_id   text not null,
+    last_batch_id bigint not null,
+
+    primary key (consumer_id)
+);
+
+
+--
+-- event tracking
+--
+create table pgq_ext.completed_event (
+    consumer_id   text not null,
+    batch_id      bigint not null,
+    event_id      bigint not null,
+
+    primary key (consumer_id, batch_id, event_id)
+);
+
+create table pgq_ext.partial_batch (
+    consumer_id   text not null,
+    cur_batch_id  bigint not null,
+
+    primary key (consumer_id)
+);
+
+--
+-- tick tracking for SerialConsumer()
+-- no access functions provided here
+--
+create table pgq_ext.completed_tick (
+    consumer_id   text not null,
+    last_tick_id  bigint not null,
+
+    primary key (consumer_id)
+);
+
+
+
+
+
+create or replace function pgq_ext.is_batch_done(
+    a_consumer text, a_batch_id bigint)
+returns boolean as $$
+declare
+    res   boolean;
+begin
+    select last_batch_id = a_batch_id
+      into res from pgq_ext.completed_batch
+     where consumer_id = a_consumer;
+    if not found then
+        return false;
+    end if;
+    return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_ext.set_batch_done(
+    a_consumer text, a_batch_id bigint)
+returns boolean as $$
+begin
+    if pgq_ext.is_batch_done(a_consumer, a_batch_id) then
+        return false;
+    end if;
+
+    if a_batch_id > 0 then
+        update pgq_ext.completed_batch
+           set last_batch_id = a_batch_id
+         where consumer_id = a_consumer;
+        if not found then
+            insert into pgq_ext.completed_batch (consumer_id, last_batch_id)
+                values (a_consumer, a_batch_id);
+        end if;
+    end if;
+
+    return true;
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function pgq_ext.is_event_done(
+    a_consumer text,
+    a_batch_id bigint, a_event_id bigint)
+returns boolean as $$
+declare
+    res   bigint;
+begin
+    perform 1 from pgq_ext.completed_event
+     where consumer_id = a_consumer
+       and batch_id = a_batch_id
+       and event_id = a_event_id;
+    return found;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_ext.set_event_done(
+    a_consumer text, a_batch_id bigint, a_event_id bigint)
+returns boolean as $$
+declare
+    old_batch bigint;
+begin
+    -- check if done
+    perform 1 from pgq_ext.completed_event
+     where consumer_id = a_consumer
+       and batch_id = a_batch_id
+       and event_id = a_event_id;
+    if found then
+        return false;
+    end if;
+
+    -- if batch changed, do cleanup
+    select cur_batch_id into old_batch
+        from pgq_ext.partial_batch
+        where consumer_id = a_consumer;
+    if not found then
+        -- first time here
+        insert into pgq_ext.partial_batch
+            (consumer_id, cur_batch_id)
+            values (a_consumer, a_batch_id);
+    elsif old_batch <> a_batch_id then
+        -- batch changed, that means old is finished on queue db
+        -- thus the tagged events are not needed anymore
+        delete from pgq_ext.completed_event
+            where consumer_id = a_consumer
+              and batch_id = old_batch;
+        -- remember current one
+        update pgq_ext.partial_batch
+            set cur_batch_id = a_batch_id
+            where consumer_id = a_consumer;
+    end if;
+
+    -- tag as done
+    insert into pgq_ext.completed_event (consumer_id, batch_id, event_id)
+      values (a_consumer, a_batch_id, a_event_id);
+
+    return true;
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function pgq_ext.get_last_tick(a_consumer text)
+returns int8 as $$
+declare
+    res   int8;
+begin
+    select last_tick_id into res
+      from pgq_ext.completed_tick
+     where consumer_id = a_consumer;
+    return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_ext.set_last_tick(a_consumer text, a_tick_id bigint)
+returns integer as $$
+begin
+    if a_tick_id is null then
+        delete from pgq_ext.completed_tick
+         where consumer_id = a_consumer;
+    else   
+        update pgq_ext.completed_tick
+           set last_tick_id = a_tick_id
+         where consumer_id = a_consumer;
+        if not found then
+            insert into pgq_ext.completed_tick (consumer_id, last_tick_id)
+                values (a_consumer, a_tick_id);
+        end if;
+    end if;
+
+    return 1;
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function pgq_ext.version()
+returns text as $$
+begin
+    return '3.0.0.1';
+end;
+$$ language plpgsql;
+
+
+
+
+
+
diff --git a/sql/pgq_ext/sql/test_upgrade.sql b/sql/pgq_ext/sql/test_upgrade.sql
new file mode 100644 (file)
index 0000000..791a032
--- /dev/null
@@ -0,0 +1,45 @@
+
+\set ECHO off
+
+set log_error_verbosity = 'terse';
+set client_min_messages = 'fatal';
+create language plpgsql;
+set client_min_messages = 'warning';
+
+drop schema pgq_ext cascade;
+\i sql/old_ext.sql
+\i structure/upgrade.sql
+\set ECHO all
+
+--
+-- test batch tracking
+--
+select pgq_ext.is_batch_done('c', 1);
+select pgq_ext.set_batch_done('c', 1);
+select pgq_ext.is_batch_done('c', 1);
+select pgq_ext.set_batch_done('c', 1);
+select pgq_ext.is_batch_done('c', 2);
+select pgq_ext.set_batch_done('c', 2);
+
+--
+-- test event tracking
+--
+select pgq_ext.is_batch_done('c', 3);
+select pgq_ext.is_event_done('c', 3, 101);
+select pgq_ext.set_event_done('c', 3, 101);
+select pgq_ext.is_event_done('c', 3, 101);
+select pgq_ext.set_event_done('c', 3, 101);
+select pgq_ext.set_batch_done('c', 3);
+select * from pgq_ext.completed_event order by 1,2;
+
+--
+-- test tick tracking
+--
+select pgq_ext.get_last_tick('c');
+select pgq_ext.set_last_tick('c', 1);
+select pgq_ext.get_last_tick('c');
+select pgq_ext.set_last_tick('c', 2);
+select pgq_ext.get_last_tick('c');
+select pgq_ext.set_last_tick('c', NULL);
+select pgq_ext.get_last_tick('c');
+
index 377353ba0c9a8ef31c87a5ee66403c04324e1734..220f51e1fb848906c060f67cd787e84375b5e71b 100644 (file)
@@ -10,10 +10,11 @@ grant usage on schema pgq_ext to public;
 -- batch tracking
 --
 create table pgq_ext.completed_batch (
-    consumer_id   text not null,
-    last_batch_id bigint not null,
+    consumer_id     text not null,
+    subconsumer_id  text not null,
+    last_batch_id   bigint not null,
 
-    primary key (consumer_id)
+    primary key (consumer_id, subconsumer_id)
 );
 
 
@@ -21,18 +22,20 @@ create table pgq_ext.completed_batch (
 -- event tracking
 --
 create table pgq_ext.completed_event (
-    consumer_id   text not null,
-    batch_id      bigint not null,
-    event_id      bigint not null,
+    consumer_id     text not null,
+    subconsumer_id  text not null,
+    batch_id        bigint not null,
+    event_id        bigint not null,
 
-    primary key (consumer_id, batch_id, event_id)
+    primary key (consumer_id, subconsumer_id, batch_id, event_id)
 );
 
 create table pgq_ext.partial_batch (
-    consumer_id   text not null,
-    cur_batch_id  bigint not null,
+    consumer_id     text not null,
+    subconsumer_id  text not null,
+    cur_batch_id    bigint not null,
 
-    primary key (consumer_id)
+    primary key (consumer_id, subconsumer_id)
 );
 
 --
@@ -40,9 +43,10 @@ create table pgq_ext.partial_batch (
 -- no access functions provided here
 --
 create table pgq_ext.completed_tick (
-    consumer_id   text not null,
-    last_tick_id  bigint not null,
+    consumer_id     text not null,
+    subconsumer_id  text not null,
+    last_tick_id    bigint not null,
 
-    primary key (consumer_id)
+    primary key (consumer_id, subconsumer_id)
 );
 
index 8e6792a5a5e0a7a7d0b8ed86fdbba815d9319185..1e6370bd08dc12c26d577ea60d9cfa0748313a4a 100644 (file)
@@ -1,3 +1,7 @@
+\i functions/pgq_ext.upgrade_schema.sql
+
+select pgq_ext.upgrade_schema();
+
 \i functions/pgq_ext.is_batch_done.sql
 \i functions/pgq_ext.set_batch_done.sql