pgq.Consumer: support .consumer_filter with lazy_fetch
authorMarko Kreen <markokr@gmail.com>
Thu, 27 Oct 2011 20:39:15 +0000 (23:39 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 27 Oct 2011 20:39:15 +0000 (23:39 +0300)
python/pgq/consumer.py
sql/pgq/expected/pgq_core.out
sql/pgq/functions/pgq.get_batch_cursor.sql
sql/pgq/sql/pgq_core.sql

index a21f14d1b34095141de133f9801710672d29eb0a..76470b724eb149624d45cecc38f85e690d9ea4bf 100644 (file)
@@ -39,7 +39,7 @@ class _BatchWalker(object):
      - one for loop over events
      - len() after that
     """
-    def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
+    def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
         self.queue_name = queue_name
         self.fetch_size = fetch_size
         self.sql_cursor = "batch_walker"
@@ -48,14 +48,15 @@ class _BatchWalker(object):
         self.status_map = {}
         self.batch_id = batch_id
         self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
+        self.consumer_filter = consumer_filter
 
     def __iter__(self):
         if self.fetch_status:
             raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
         self.fetch_status = 1
 
-        q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
-        self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size])
+        q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)"
+        self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter])
         # this will return first batch of rows
 
         q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor)
@@ -306,7 +307,7 @@ class Consumer(skytools.DBScript):
         """Fetch all events for this batch."""
 
         if self.pgq_lazy_fetch:
-            return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch)
+            return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)
         else:
             return self._load_batch_events_old(curs, batch_id)
 
index 48c02bd87683ebcfd69e0b600dc062db461aa02a..a0c760fae272d79f79fd446e9c697c160a6ac0f0 100644 (file)
@@ -225,6 +225,14 @@ select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
      2 |          | r2      | data    | extra1    | extra2    | extra3    | extra4
 (2 rows)
 
+close acurs;
+select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
+    from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1');
+ ev_id | ev_retry | ev_type | ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 
+-------+----------+---------+---------+-----------+-----------+-----------+-----------
+     1 |          | r1      | data    |           |           |           | 
+(1 row)
+
 close acurs;
 end;
 select pgq.event_retry(3, 2, 0);
index 8cc7d1fdcbf09a03c32dbec31f09fa5806582b00..9ecb4b025e3cbb4f4961f1eb6177e51f9762cfaa 100644 (file)
@@ -3,6 +3,7 @@ create or replace function pgq.get_batch_cursor(
     in i_batch_id       bigint,
     in i_cursor_name    text,
     in i_quick_limit    int4,
+    in i_extra_where    text,
 
     out ev_id       bigint,
     out ev_time     timestamptz,
@@ -16,7 +17,7 @@ create or replace function pgq.get_batch_cursor(
     out ev_extra4   text)
 returns setof record as $$
 -- ----------------------------------------------------------------------
--- Function: pgq.get_batch_cursor(3)
+-- Function: pgq.get_batch_cursor(4)
 --
 --      Get events in batch using a cursor.
 --
@@ -24,19 +25,32 @@ returns setof record as $$
 --      i_batch_id      - ID of active batch.
 --      i_cursor_name   - Name for new cursor
 --      i_quick_limit   - Number of events to return immediately
+--      i_extra_where   - optional where clause to filter events
 --
 -- Returns:
 --      List of events.
 -- ----------------------------------------------------------------------
 declare
     _cname  text;
+    _sql    text;
 begin
+    if i_batch_id is null or i_cursor_name is null or i_quick_limit is null then
+        return;
+    end if;
+
     _cname := quote_ident(i_cursor_name);
+    _sql := pgq.batch_event_sql(i_batch_id);
+
+    -- apply extra where
+    if i_extra_where is not null then
+        _sql := replace(_sql, ' order by 1', '');
+        _sql := 'select * from (' || _sql
+            || ') _evs where ' || i_extra_where
+            || ' order by 1';
+    end if;
 
     -- create cursor
-    execute 'declare ' || _cname
-        || ' no scroll cursor for '
-        || pgq.batch_event_sql(i_batch_id);
+    execute 'declare ' || _cname || ' no scroll cursor for ' || _sql;
 
     -- if no events wanted, don't bother with execute
     if i_quick_limit <= 0 then
@@ -53,5 +67,47 @@ begin
 
     return;
 end;
+$$ language plpgsql; -- no perms needed
+
+create or replace function pgq.get_batch_cursor(
+    in i_batch_id       bigint,
+    in i_cursor_name    text,
+    in i_quick_limit    int4,
+
+    out ev_id       bigint,
+    out ev_time     timestamptz,
+    out ev_txid     bigint,
+    out ev_retry    int4,
+    out ev_type     text,
+    out ev_data     text,
+    out ev_extra1   text,
+    out ev_extra2   text,
+    out ev_extra3   text,
+    out ev_extra4   text)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.get_batch_cursor(3)
+--
+--      Get events in batch using a cursor.
+--
+-- Parameters:
+--      i_batch_id      - ID of active batch.
+--      i_cursor_name   - Name for new cursor
+--      i_quick_limit   - Number of events to return immediately
+--
+-- Returns:
+--      List of events.
+-- ----------------------------------------------------------------------
+begin
+    for ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data,
+        ev_extra1, ev_extra2, ev_extra3, ev_extra4
+    in
+        select * from pgq.get_batch_cursor(i_batch_id,
+            i_cursor_name, i_quick_limit, null)
+    loop
+        return next;
+    end loop;
+    return;
+end;
 $$ language plpgsql strict; -- no perms needed
 
index 533750305bb8d93a7b4ef5e136ed724e604eb2b1..66cdb56e58cd11139113453318eb2c56d9d9d354 100644 (file)
@@ -57,6 +57,9 @@ close acurs;
 select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
     from pgq.get_batch_cursor(3, 'acurs', 2);
 close acurs;
+select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
+    from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1');
+close acurs;
 end;
 
 select pgq.event_retry(3, 2, 0);