From d64fffbb3a0acc314ff208bbc75ea17ba7f7a2f2 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Thu, 27 Oct 2011 23:39:15 +0300 Subject: [PATCH] pgq.Consumer: support .consumer_filter with lazy_fetch --- python/pgq/consumer.py | 9 +-- sql/pgq/expected/pgq_core.out | 8 +++ sql/pgq/functions/pgq.get_batch_cursor.sql | 64 ++++++++++++++++++++-- sql/pgq/sql/pgq_core.sql | 3 + 4 files changed, 76 insertions(+), 8 deletions(-) diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index a21f14d1..76470b72 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -39,7 +39,7 @@ class _BatchWalker(object): - one for loop over events - len() after that """ - def __init__(self, curs, batch_id, queue_name, fetch_size = 300): + def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None): self.queue_name = queue_name self.fetch_size = fetch_size self.sql_cursor = "batch_walker" @@ -48,14 +48,15 @@ class _BatchWalker(object): self.status_map = {} self.batch_id = batch_id self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done + self.consumer_filter = consumer_filter def __iter__(self): if self.fetch_status: raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status) self.fetch_status = 1 - q = "select * from pgq.get_batch_cursor(%s, %s, %s)" - self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size]) + q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)" + self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter]) # this will return first batch of rows q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor) @@ -306,7 +307,7 @@ class Consumer(skytools.DBScript): """Fetch all events for this batch.""" if self.pgq_lazy_fetch: - return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch) + return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter) else: return self._load_batch_events_old(curs, batch_id) diff --git a/sql/pgq/expected/pgq_core.out b/sql/pgq/expected/pgq_core.out index 48c02bd8..a0c760fa 100644 --- a/sql/pgq/expected/pgq_core.out +++ b/sql/pgq/expected/pgq_core.out @@ -225,6 +225,14 @@ select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 2 | | r2 | data | extra1 | extra2 | extra3 | extra4 (2 rows) +close acurs; +select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 + from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1'); + ev_id | ev_retry | ev_type | ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 +-------+----------+---------+---------+-----------+-----------+-----------+----------- + 1 | | r1 | data | | | | +(1 row) + close acurs; end; select pgq.event_retry(3, 2, 0); diff --git a/sql/pgq/functions/pgq.get_batch_cursor.sql b/sql/pgq/functions/pgq.get_batch_cursor.sql index 8cc7d1fd..9ecb4b02 100644 --- a/sql/pgq/functions/pgq.get_batch_cursor.sql +++ b/sql/pgq/functions/pgq.get_batch_cursor.sql @@ -3,6 +3,7 @@ create or replace function pgq.get_batch_cursor( in i_batch_id bigint, in i_cursor_name text, in i_quick_limit int4, + in i_extra_where text, out ev_id bigint, out ev_time timestamptz, @@ -16,7 +17,7 @@ create or replace function pgq.get_batch_cursor( out ev_extra4 text) returns setof record as $$ -- ---------------------------------------------------------------------- --- Function: pgq.get_batch_cursor(3) +-- Function: pgq.get_batch_cursor(4) -- -- Get events in batch using a cursor. -- @@ -24,19 +25,32 @@ returns setof record as $$ -- i_batch_id - ID of active batch. -- i_cursor_name - Name for new cursor -- i_quick_limit - Number of events to return immediately +-- i_extra_where - optional where clause to filter events -- -- Returns: -- List of events. -- ---------------------------------------------------------------------- declare _cname text; + _sql text; begin + if i_batch_id is null or i_cursor_name is null or i_quick_limit is null then + return; + end if; + _cname := quote_ident(i_cursor_name); + _sql := pgq.batch_event_sql(i_batch_id); + + -- apply extra where + if i_extra_where is not null then + _sql := replace(_sql, ' order by 1', ''); + _sql := 'select * from (' || _sql + || ') _evs where ' || i_extra_where + || ' order by 1'; + end if; -- create cursor - execute 'declare ' || _cname - || ' no scroll cursor for ' - || pgq.batch_event_sql(i_batch_id); + execute 'declare ' || _cname || ' no scroll cursor for ' || _sql; -- if no events wanted, don't bother with execute if i_quick_limit <= 0 then @@ -53,5 +67,47 @@ begin return; end; +$$ language plpgsql; -- no perms needed + +create or replace function pgq.get_batch_cursor( + in i_batch_id bigint, + in i_cursor_name text, + in i_quick_limit int4, + + out ev_id bigint, + out ev_time timestamptz, + out ev_txid bigint, + out ev_retry int4, + out ev_type text, + out ev_data text, + out ev_extra1 text, + out ev_extra2 text, + out ev_extra3 text, + out ev_extra4 text) +returns setof record as $$ +-- ---------------------------------------------------------------------- +-- Function: pgq.get_batch_cursor(3) +-- +-- Get events in batch using a cursor. +-- +-- Parameters: +-- i_batch_id - ID of active batch. +-- i_cursor_name - Name for new cursor +-- i_quick_limit - Number of events to return immediately +-- +-- Returns: +-- List of events. +-- ---------------------------------------------------------------------- +begin + for ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, + ev_extra1, ev_extra2, ev_extra3, ev_extra4 + in + select * from pgq.get_batch_cursor(i_batch_id, + i_cursor_name, i_quick_limit, null) + loop + return next; + end loop; + return; +end; $$ language plpgsql strict; -- no perms needed diff --git a/sql/pgq/sql/pgq_core.sql b/sql/pgq/sql/pgq_core.sql index 53375030..66cdb56e 100644 --- a/sql/pgq/sql/pgq_core.sql +++ b/sql/pgq/sql/pgq_core.sql @@ -57,6 +57,9 @@ close acurs; select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 from pgq.get_batch_cursor(3, 'acurs', 2); close acurs; +select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 + from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1'); +close acurs; end; select pgq.event_retry(3, 2, 0); -- 2.39.5