Add subconsumer_id to all tables and function params.
SRCS = $(wildcard functions/*.sql structure/*.sql)
-REGRESS = test_pgq_ext
+REGRESS = test_pgq_ext test_upgrade
REGRESS_OPTS = --load-language=plpgsql
CATSQL = ../../scripts/catsql.py
\set ECHO off
+ upgrade_schema
+----------------
+ 0
+(1 row)
+
--
-- test batch tracking
--
(1 row)
select * from pgq_ext.completed_event order by 1,2;
- consumer_id | batch_id | event_id
--------------+----------+----------
- c | 3 | 101
+ consumer_id | subconsumer_id | batch_id | event_id
+-------------+----------------+----------+----------
+ c | | 3 | 101
(1 row)
--
--- /dev/null
+\set ECHO off
+ upgrade_schema
+----------------
+ 4
+(1 row)
+
+--
+-- test batch tracking
+--
+select pgq_ext.is_batch_done('c', 1);
+ is_batch_done
+---------------
+ f
+(1 row)
+
+select pgq_ext.set_batch_done('c', 1);
+ set_batch_done
+----------------
+ t
+(1 row)
+
+select pgq_ext.is_batch_done('c', 1);
+ is_batch_done
+---------------
+ t
+(1 row)
+
+select pgq_ext.set_batch_done('c', 1);
+ set_batch_done
+----------------
+ f
+(1 row)
+
+select pgq_ext.is_batch_done('c', 2);
+ is_batch_done
+---------------
+ f
+(1 row)
+
+select pgq_ext.set_batch_done('c', 2);
+ set_batch_done
+----------------
+ t
+(1 row)
+
+--
+-- test event tracking
+--
+select pgq_ext.is_batch_done('c', 3);
+ is_batch_done
+---------------
+ f
+(1 row)
+
+select pgq_ext.is_event_done('c', 3, 101);
+ is_event_done
+---------------
+ f
+(1 row)
+
+select pgq_ext.set_event_done('c', 3, 101);
+ set_event_done
+----------------
+ t
+(1 row)
+
+select pgq_ext.is_event_done('c', 3, 101);
+ is_event_done
+---------------
+ t
+(1 row)
+
+select pgq_ext.set_event_done('c', 3, 101);
+ set_event_done
+----------------
+ f
+(1 row)
+
+select pgq_ext.set_batch_done('c', 3);
+ set_batch_done
+----------------
+ t
+(1 row)
+
+select * from pgq_ext.completed_event order by 1,2;
+ consumer_id | batch_id | event_id | subconsumer_id
+-------------+----------+----------+----------------
+ c | 3 | 101 |
+(1 row)
+
+--
+-- test tick tracking
+--
+select pgq_ext.get_last_tick('c');
+ get_last_tick
+---------------
+
+(1 row)
+
+select pgq_ext.set_last_tick('c', 1);
+ set_last_tick
+---------------
+ 1
+(1 row)
+
+select pgq_ext.get_last_tick('c');
+ get_last_tick
+---------------
+ 1
+(1 row)
+
+select pgq_ext.set_last_tick('c', 2);
+ set_last_tick
+---------------
+ 1
+(1 row)
+
+select pgq_ext.get_last_tick('c');
+ get_last_tick
+---------------
+ 2
+(1 row)
+
+select pgq_ext.set_last_tick('c', NULL);
+ set_last_tick
+---------------
+ 1
+(1 row)
+
+select pgq_ext.get_last_tick('c');
+ get_last_tick
+---------------
+
+(1 row)
+
-create or replace function pgq_ext.get_last_tick(a_consumer text)
+create or replace function pgq_ext.get_last_tick(a_consumer text, a_subconsumer text)
returns int8 as $$
declare
res int8;
begin
select last_tick_id into res
from pgq_ext.completed_tick
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
return res;
end;
$$ language plpgsql security definer;
+create or replace function pgq_ext.get_last_tick(a_consumer text)
+returns int8 as $$
+begin
+ return pgq_ext.get_last_tick(a_consumer, '');
+end;
+$$ language plpgsql;
+
create or replace function pgq_ext.is_batch_done(
- a_consumer text, a_batch_id bigint)
+ a_consumer text,
+ a_subconsumer text,
+ a_batch_id bigint)
returns boolean as $$
declare
res boolean;
begin
select last_batch_id = a_batch_id
into res from pgq_ext.completed_batch
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
if not found then
return false;
end if;
end;
$$ language plpgsql security definer;
+create or replace function pgq_ext.is_batch_done(
+ a_consumer text,
+ a_batch_id bigint)
+returns boolean as $$
+begin
+ return pgq_ext.is_batch_done(a_consumer, '', a_batch_id);
+end;
+$$ language plpgsql;
+
create or replace function pgq_ext.is_event_done(
a_consumer text,
- a_batch_id bigint, a_event_id bigint)
+ a_subconsumer text,
+ a_batch_id bigint,
+ a_event_id bigint)
returns boolean as $$
declare
res bigint;
begin
perform 1 from pgq_ext.completed_event
where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer
and batch_id = a_batch_id
and event_id = a_event_id;
return found;
end;
$$ language plpgsql security definer;
+create or replace function pgq_ext.is_event_done(
+ a_consumer text,
+ a_batch_id bigint,
+ a_event_id bigint)
+returns boolean as $$
+begin
+ return pgq_ext.is_event_done(a_consumer, '', a_batch_id, a_event_id);
+end;
+$$ language plpgsql;
+
create or replace function pgq_ext.set_batch_done(
- a_consumer text, a_batch_id bigint)
+ a_consumer text,
+ a_subconsumer text,
+ a_batch_id bigint)
returns boolean as $$
begin
- if pgq_ext.is_batch_done(a_consumer, a_batch_id) then
+ if pgq_ext.is_batch_done(a_consumer, a_subconsumer, a_batch_id) then
return false;
end if;
if a_batch_id > 0 then
update pgq_ext.completed_batch
set last_batch_id = a_batch_id
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
if not found then
- insert into pgq_ext.completed_batch (consumer_id, last_batch_id)
- values (a_consumer, a_batch_id);
+ insert into pgq_ext.completed_batch (consumer_id, subconsumer_id, last_batch_id)
+ values (a_consumer, a_subconsumer, a_batch_id);
end if;
end if;
end;
$$ language plpgsql security definer;
+create or replace function pgq_ext.set_batch_done(
+ a_consumer text,
+ a_batch_id bigint)
+returns boolean as $$
+begin
+ return pgq_ext.set_batch_done(a_consumer, '', a_batch_id);
+end;
+$$ language plpgsql;
+
create or replace function pgq_ext.set_event_done(
- a_consumer text, a_batch_id bigint, a_event_id bigint)
+ a_consumer text,
+ a_subconsumer text,
+ a_batch_id bigint,
+ a_event_id bigint)
returns boolean as $$
declare
old_batch bigint;
-- check if done
perform 1 from pgq_ext.completed_event
where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer
and batch_id = a_batch_id
and event_id = a_event_id;
if found then
-- if batch changed, do cleanup
select cur_batch_id into old_batch
from pgq_ext.partial_batch
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
if not found then
-- first time here
insert into pgq_ext.partial_batch
- (consumer_id, cur_batch_id)
- values (a_consumer, a_batch_id);
+ (consumer_id, subconsumer_id, cur_batch_id)
+ values (a_consumer, a_subconsumer, a_batch_id);
elsif old_batch <> a_batch_id then
-- batch changed, that means old is finished on queue db
-- thus the tagged events are not needed anymore
delete from pgq_ext.completed_event
where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer
and batch_id = old_batch;
-- remember current one
update pgq_ext.partial_batch
set cur_batch_id = a_batch_id
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
end if;
-- tag as done
- insert into pgq_ext.completed_event (consumer_id, batch_id, event_id)
- values (a_consumer, a_batch_id, a_event_id);
+ insert into pgq_ext.completed_event
+ (consumer_id, subconsumer_id, batch_id, event_id)
+ values (a_consumer, a_subconsumer, a_batch_id, a_event_id);
return true;
end;
$$ language plpgsql security definer;
+create or replace function pgq_ext.set_event_done(
+ a_consumer text,
+ a_batch_id bigint,
+ a_event_id bigint)
+returns boolean as $$
+begin
+ return pgq_ext.set_event_done(a_consumer, '', a_batch_id, a_event_id);
+end;
+$$ language plpgsql;
+
-create or replace function pgq_ext.set_last_tick(a_consumer text, a_tick_id bigint)
+create or replace function pgq_ext.set_last_tick(
+ a_consumer text,
+ a_subconsumer text,
+ a_tick_id bigint)
returns integer as $$
begin
if a_tick_id is null then
delete from pgq_ext.completed_tick
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
else
update pgq_ext.completed_tick
set last_tick_id = a_tick_id
- where consumer_id = a_consumer;
+ where consumer_id = a_consumer
+ and subconsumer_id = a_subconsumer;
if not found then
- insert into pgq_ext.completed_tick (consumer_id, last_tick_id)
- values (a_consumer, a_tick_id);
+ insert into pgq_ext.completed_tick
+ (consumer_id, subconsumer_id, last_tick_id)
+ values (a_consumer, a_subconsumer, a_tick_id);
end if;
end if;
end;
$$ language plpgsql security definer;
+create or replace function pgq_ext.set_last_tick(
+ a_consumer text,
+ a_tick_id bigint)
+returns integer as $$
+begin
+ return pgq_ext.set_last_tick(a_consumer, '', a_tick_id);
+end;
+$$ language plpgsql;
+
--- /dev/null
+
+create or replace function pgq_ext.upgrade_schema()
+returns int4 as $$
+-- updates table structure if necessary
+declare
+ cnt int4 = 0;
+begin
+ -- pgq_ext.completed_batch: subconsumer_id
+ perform 1 from information_schema.columns
+ where table_schema = 'pgq_ext'
+ and table_name = 'completed_batch'
+ and column_name = 'subconsumer_id';
+ if not found then
+ alter table pgq_ext.completed_batch
+ add column subconsumer_id text;
+ update pgq_ext.completed_batch
+ set subconsumer_id = '';
+ alter table pgq_ext.completed_batch
+ alter column subconsumer_id set not null;
+ alter table pgq_ext.completed_batch
+ drop constraint completed_batch_pkey;
+ alter table pgq_ext.completed_batch
+ add constraint completed_batch_pkey
+ primary key (consumer_id, subconsumer_id);
+ cnt := cnt + 1;
+ end if;
+
+ -- pgq_ext.completed_tick: subconsumer_id
+ perform 1 from information_schema.columns
+ where table_schema = 'pgq_ext'
+ and table_name = 'completed_tick'
+ and column_name = 'subconsumer_id';
+ if not found then
+ alter table pgq_ext.completed_tick
+ add column subconsumer_id text;
+ update pgq_ext.completed_tick
+ set subconsumer_id = '';
+ alter table pgq_ext.completed_tick
+ alter column subconsumer_id set not null;
+ alter table pgq_ext.completed_tick
+ drop constraint completed_tick_pkey;
+ alter table pgq_ext.completed_tick
+ add constraint completed_tick_pkey
+ primary key (consumer_id, subconsumer_id);
+ cnt := cnt + 1;
+ end if;
+
+ -- pgq_ext.partial_batch: subconsumer_id
+ perform 1 from information_schema.columns
+ where table_schema = 'pgq_ext'
+ and table_name = 'partial_batch'
+ and column_name = 'subconsumer_id';
+ if not found then
+ alter table pgq_ext.partial_batch
+ add column subconsumer_id text;
+ update pgq_ext.partial_batch
+ set subconsumer_id = '';
+ alter table pgq_ext.partial_batch
+ alter column subconsumer_id set not null;
+ alter table pgq_ext.partial_batch
+ drop constraint partial_batch_pkey;
+ alter table pgq_ext.partial_batch
+ add constraint partial_batch_pkey
+ primary key (consumer_id, subconsumer_id);
+ cnt := cnt + 1;
+ end if;
+
+ -- pgq_ext.completed_event: subconsumer_id
+ perform 1 from information_schema.columns
+ where table_schema = 'pgq_ext'
+ and table_name = 'completed_event'
+ and column_name = 'subconsumer_id';
+ if not found then
+ alter table pgq_ext.completed_event
+ add column subconsumer_id text;
+ update pgq_ext.completed_event
+ set subconsumer_id = '';
+ alter table pgq_ext.completed_event
+ alter column subconsumer_id set not null;
+ alter table pgq_ext.completed_event
+ drop constraint completed_event_pkey;
+ alter table pgq_ext.completed_event
+ add constraint completed_event_pkey
+ primary key (consumer_id, subconsumer_id, batch_id, event_id);
+ cnt := cnt + 1;
+ end if;
+
+ return cnt;
+end;
+$$ language plpgsql;
+
+
--- /dev/null
+
+
+
+set client_min_messages = 'warning';
+set default_with_oids = 'off';
+
+create schema pgq_ext;
+grant usage on schema pgq_ext to public;
+
+
+--
+-- batch tracking
+--
+create table pgq_ext.completed_batch (
+ consumer_id text not null,
+ last_batch_id bigint not null,
+
+ primary key (consumer_id)
+);
+
+
+--
+-- event tracking
+--
+create table pgq_ext.completed_event (
+ consumer_id text not null,
+ batch_id bigint not null,
+ event_id bigint not null,
+
+ primary key (consumer_id, batch_id, event_id)
+);
+
+create table pgq_ext.partial_batch (
+ consumer_id text not null,
+ cur_batch_id bigint not null,
+
+ primary key (consumer_id)
+);
+
+--
+-- tick tracking for SerialConsumer()
+-- no access functions provided here
+--
+create table pgq_ext.completed_tick (
+ consumer_id text not null,
+ last_tick_id bigint not null,
+
+ primary key (consumer_id)
+);
+
+
+
+
+
+create or replace function pgq_ext.is_batch_done(
+ a_consumer text, a_batch_id bigint)
+returns boolean as $$
+declare
+ res boolean;
+begin
+ select last_batch_id = a_batch_id
+ into res from pgq_ext.completed_batch
+ where consumer_id = a_consumer;
+ if not found then
+ return false;
+ end if;
+ return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_ext.set_batch_done(
+ a_consumer text, a_batch_id bigint)
+returns boolean as $$
+begin
+ if pgq_ext.is_batch_done(a_consumer, a_batch_id) then
+ return false;
+ end if;
+
+ if a_batch_id > 0 then
+ update pgq_ext.completed_batch
+ set last_batch_id = a_batch_id
+ where consumer_id = a_consumer;
+ if not found then
+ insert into pgq_ext.completed_batch (consumer_id, last_batch_id)
+ values (a_consumer, a_batch_id);
+ end if;
+ end if;
+
+ return true;
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function pgq_ext.is_event_done(
+ a_consumer text,
+ a_batch_id bigint, a_event_id bigint)
+returns boolean as $$
+declare
+ res bigint;
+begin
+ perform 1 from pgq_ext.completed_event
+ where consumer_id = a_consumer
+ and batch_id = a_batch_id
+ and event_id = a_event_id;
+ return found;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_ext.set_event_done(
+ a_consumer text, a_batch_id bigint, a_event_id bigint)
+returns boolean as $$
+declare
+ old_batch bigint;
+begin
+ -- check if done
+ perform 1 from pgq_ext.completed_event
+ where consumer_id = a_consumer
+ and batch_id = a_batch_id
+ and event_id = a_event_id;
+ if found then
+ return false;
+ end if;
+
+ -- if batch changed, do cleanup
+ select cur_batch_id into old_batch
+ from pgq_ext.partial_batch
+ where consumer_id = a_consumer;
+ if not found then
+ -- first time here
+ insert into pgq_ext.partial_batch
+ (consumer_id, cur_batch_id)
+ values (a_consumer, a_batch_id);
+ elsif old_batch <> a_batch_id then
+ -- batch changed, that means old is finished on queue db
+ -- thus the tagged events are not needed anymore
+ delete from pgq_ext.completed_event
+ where consumer_id = a_consumer
+ and batch_id = old_batch;
+ -- remember current one
+ update pgq_ext.partial_batch
+ set cur_batch_id = a_batch_id
+ where consumer_id = a_consumer;
+ end if;
+
+ -- tag as done
+ insert into pgq_ext.completed_event (consumer_id, batch_id, event_id)
+ values (a_consumer, a_batch_id, a_event_id);
+
+ return true;
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function pgq_ext.get_last_tick(a_consumer text)
+returns int8 as $$
+declare
+ res int8;
+begin
+ select last_tick_id into res
+ from pgq_ext.completed_tick
+ where consumer_id = a_consumer;
+ return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function pgq_ext.set_last_tick(a_consumer text, a_tick_id bigint)
+returns integer as $$
+begin
+ if a_tick_id is null then
+ delete from pgq_ext.completed_tick
+ where consumer_id = a_consumer;
+ else
+ update pgq_ext.completed_tick
+ set last_tick_id = a_tick_id
+ where consumer_id = a_consumer;
+ if not found then
+ insert into pgq_ext.completed_tick (consumer_id, last_tick_id)
+ values (a_consumer, a_tick_id);
+ end if;
+ end if;
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function pgq_ext.version()
+returns text as $$
+begin
+ return '3.0.0.1';
+end;
+$$ language plpgsql;
+
+
+
+
+
+
--- /dev/null
+
+\set ECHO off
+
+set log_error_verbosity = 'terse';
+set client_min_messages = 'fatal';
+create language plpgsql;
+set client_min_messages = 'warning';
+
+drop schema pgq_ext cascade;
+\i sql/old_ext.sql
+\i structure/upgrade.sql
+\set ECHO all
+
+--
+-- test batch tracking
+--
+select pgq_ext.is_batch_done('c', 1);
+select pgq_ext.set_batch_done('c', 1);
+select pgq_ext.is_batch_done('c', 1);
+select pgq_ext.set_batch_done('c', 1);
+select pgq_ext.is_batch_done('c', 2);
+select pgq_ext.set_batch_done('c', 2);
+
+--
+-- test event tracking
+--
+select pgq_ext.is_batch_done('c', 3);
+select pgq_ext.is_event_done('c', 3, 101);
+select pgq_ext.set_event_done('c', 3, 101);
+select pgq_ext.is_event_done('c', 3, 101);
+select pgq_ext.set_event_done('c', 3, 101);
+select pgq_ext.set_batch_done('c', 3);
+select * from pgq_ext.completed_event order by 1,2;
+
+--
+-- test tick tracking
+--
+select pgq_ext.get_last_tick('c');
+select pgq_ext.set_last_tick('c', 1);
+select pgq_ext.get_last_tick('c');
+select pgq_ext.set_last_tick('c', 2);
+select pgq_ext.get_last_tick('c');
+select pgq_ext.set_last_tick('c', NULL);
+select pgq_ext.get_last_tick('c');
+
-- batch tracking
--
create table pgq_ext.completed_batch (
- consumer_id text not null,
- last_batch_id bigint not null,
+ consumer_id text not null,
+ subconsumer_id text not null,
+ last_batch_id bigint not null,
- primary key (consumer_id)
+ primary key (consumer_id, subconsumer_id)
);
-- event tracking
--
create table pgq_ext.completed_event (
- consumer_id text not null,
- batch_id bigint not null,
- event_id bigint not null,
+ consumer_id text not null,
+ subconsumer_id text not null,
+ batch_id bigint not null,
+ event_id bigint not null,
- primary key (consumer_id, batch_id, event_id)
+ primary key (consumer_id, subconsumer_id, batch_id, event_id)
);
create table pgq_ext.partial_batch (
- consumer_id text not null,
- cur_batch_id bigint not null,
+ consumer_id text not null,
+ subconsumer_id text not null,
+ cur_batch_id bigint not null,
- primary key (consumer_id)
+ primary key (consumer_id, subconsumer_id)
);
--
-- no access functions provided here
--
create table pgq_ext.completed_tick (
- consumer_id text not null,
- last_tick_id bigint not null,
+ consumer_id text not null,
+ subconsumer_id text not null,
+ last_tick_id bigint not null,
- primary key (consumer_id)
+ primary key (consumer_id, subconsumer_id)
);
+\i functions/pgq_ext.upgrade_schema.sql
+
+select pgq_ext.upgrade_schema();
+
\i functions/pgq_ext.is_batch_done.sql
\i functions/pgq_ext.set_batch_done.sql