- one for loop over events
- len() after that
"""
- def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
+ def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
self.queue_name = queue_name
self.fetch_size = fetch_size
self.sql_cursor = "batch_walker"
self.status_map = {}
self.batch_id = batch_id
self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
+ self.consumer_filter = consumer_filter
def __iter__(self):
if self.fetch_status:
raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
self.fetch_status = 1
- q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
- self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size])
+ q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)"
+ self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter])
# this will return first batch of rows
q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor)
"""Fetch all events for this batch."""
if self.pgq_lazy_fetch:
- return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch)
+ return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)
else:
return self._load_batch_events_old(curs, batch_id)
2 | | r2 | data | extra1 | extra2 | extra3 | extra4
(2 rows)
+close acurs;
+select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
+ from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1');
+ ev_id | ev_retry | ev_type | ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4
+-------+----------+---------+---------+-----------+-----------+-----------+-----------
+ 1 | | r1 | data | | | |
+(1 row)
+
close acurs;
end;
select pgq.event_retry(3, 2, 0);
in i_batch_id bigint,
in i_cursor_name text,
in i_quick_limit int4,
+ in i_extra_where text,
out ev_id bigint,
out ev_time timestamptz,
out ev_extra4 text)
returns setof record as $$
-- ----------------------------------------------------------------------
--- Function: pgq.get_batch_cursor(3)
+-- Function: pgq.get_batch_cursor(4)
--
-- Get events in batch using a cursor.
--
-- i_batch_id - ID of active batch.
-- i_cursor_name - Name for new cursor
-- i_quick_limit - Number of events to return immediately
+-- i_extra_where - optional where clause to filter events
--
-- Returns:
-- List of events.
-- ----------------------------------------------------------------------
declare
_cname text;
+ _sql text;
begin
+ if i_batch_id is null or i_cursor_name is null or i_quick_limit is null then
+ return;
+ end if;
+
_cname := quote_ident(i_cursor_name);
+ _sql := pgq.batch_event_sql(i_batch_id);
+
+ -- apply extra where
+ if i_extra_where is not null then
+ _sql := replace(_sql, ' order by 1', '');
+ _sql := 'select * from (' || _sql
+ || ') _evs where ' || i_extra_where
+ || ' order by 1';
+ end if;
-- create cursor
- execute 'declare ' || _cname
- || ' no scroll cursor for '
- || pgq.batch_event_sql(i_batch_id);
+ execute 'declare ' || _cname || ' no scroll cursor for ' || _sql;
-- if no events wanted, don't bother with execute
if i_quick_limit <= 0 then
return;
end;
+$$ language plpgsql; -- no perms needed
+
+create or replace function pgq.get_batch_cursor(
+ in i_batch_id bigint,
+ in i_cursor_name text,
+ in i_quick_limit int4,
+
+ out ev_id bigint,
+ out ev_time timestamptz,
+ out ev_txid bigint,
+ out ev_retry int4,
+ out ev_type text,
+ out ev_data text,
+ out ev_extra1 text,
+ out ev_extra2 text,
+ out ev_extra3 text,
+ out ev_extra4 text)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.get_batch_cursor(3)
+--
+-- Get events in batch using a cursor.
+--
+-- Parameters:
+-- i_batch_id - ID of active batch.
+-- i_cursor_name - Name for new cursor
+-- i_quick_limit - Number of events to return immediately
+--
+-- Returns:
+-- List of events.
+-- ----------------------------------------------------------------------
+begin
+ for ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data,
+ ev_extra1, ev_extra2, ev_extra3, ev_extra4
+ in
+ select * from pgq.get_batch_cursor(i_batch_id,
+ i_cursor_name, i_quick_limit, null)
+ loop
+ return next;
+ end loop;
+ return;
+end;
$$ language plpgsql strict; -- no perms needed
select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
from pgq.get_batch_cursor(3, 'acurs', 2);
close acurs;
+select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
+ from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1');
+close acurs;
end;
select pgq.event_retry(3, 2, 0);