Separated queue pause from external ticker.
authorMartin Pihlak <martin.pihlak@gmail.com>
Wed, 8 Apr 2009 15:37:45 +0000 (18:37 +0300)
committerMartin Pihlak <martin.pihlak@gmail.com>
Wed, 8 Apr 2009 15:37:45 +0000 (18:37 +0300)
python/newadm.py
sql/pgq/functions/pgq.force_tick.sql
sql/pgq/functions/pgq.get_queue_info.sql
sql/pgq/functions/pgq.set_queue_config.sql
sql/pgq/functions/pgq.ticker.sql
sql/pgq/structure/tables.sql
sql/ticker/connection.c

index bbe91a9d8a47cd911d35ef6253e8797ea6f9f096..01f1e00eeb170bfcd6e7d6f06722ada64ee85a3e 100755 (executable)
@@ -282,7 +282,7 @@ w_qargs = WList(
     SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))),
     SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))),
     SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))),
-    SWord('paused', EQ(Value(w_qargs2, name = 'external_ticker'))))
+    SWord('paused', EQ(Value(w_qargs2, name = 'ticker_paused'))))
 
 w_qargs2.set_real(WList(
     w_done,
@@ -716,7 +716,7 @@ class AdminConsole:
             "queue_ticker_max_count as max_cnt",
             "queue_ticker_max_lag as max_lag",
             "queue_ticker_idle_period as idle_period",
-            "queue_external_ticker as paused",
+            "queue_ticker_paused as paused",
             "ticker_lag",
         ]
         pfx = "select " + ",".join(fields)
index c1550d9a9746eccc8503f18f1f98353f42cce444..6cd1ecc9a37df43a09b079a7e51d8bb81cfee72d 100644 (file)
@@ -30,7 +30,8 @@ begin
                                    + queue_ticker_max_count * 2 + 1000) as tmp
       into q from pgq.queue
      where queue_name = i_queue_name
-       and not queue_external_ticker;
+       and not queue_external_ticker
+       and not queue_ticker_paused;
 
     --if not found then
     --    raise notice 'queue not found or ticks not allowed';
index 14906770915ebefe7d53a3f12362672050d9dede..dcc34c2cc7fa0d7f9353941d51683f78565d36c0 100644 (file)
@@ -5,6 +5,7 @@ create or replace function pgq.get_queue_info(
     out queue_rotation_period       interval,
     out queue_switch_time           timestamptz,
     out queue_external_ticker       boolean,
+    out queue_ticker_paused         boolean,
     out queue_ticker_max_count      integer,
     out queue_ticker_max_lag        interval,
     out queue_ticker_idle_period    interval,
@@ -20,12 +21,14 @@ returns setof record as $$
 -- ----------------------------------------------------------------------
 begin
     for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
-        queue_switch_time, queue_external_ticker, queue_ticker_max_count,
-        queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag
+        queue_switch_time, queue_external_ticker, queue_ticker_paused,
+        queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
+        ticker_lag
     in select
         f.queue_name, f.queue_ntables, f.queue_cur_table, f.queue_rotation_period,
-        f.queue_switch_time, f.queue_external_ticker, f.queue_ticker_max_count,
-        f.queue_ticker_max_lag, f.queue_ticker_idle_period, f.ticker_lag
+        f.queue_switch_time, f.queue_external_ticker, f.queue_ticker_paused,
+        f.queue_ticker_max_count, f.queue_ticker_max_lag, f.queue_ticker_idle_period,
+        f.ticker_lag
         from pgq.get_queue_info(null) f
     loop
         return next;
@@ -42,6 +45,7 @@ create or replace function pgq.get_queue_info(
     out queue_rotation_period       interval,
     out queue_switch_time           timestamptz,
     out queue_external_ticker       boolean,
+    out queue_ticker_paused         boolean,
     out queue_ticker_max_count      integer,
     out queue_ticker_max_lag        interval,
     out queue_ticker_idle_period    interval,
@@ -57,12 +61,13 @@ returns setof record as $$
 -- ----------------------------------------------------------------------
 begin
     for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
-        queue_switch_time, queue_external_ticker, queue_ticker_max_count,
-        queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag
+        queue_switch_time, queue_external_ticker, queue_ticker_paused,
+        queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
+        ticker_lag
     in select
         q.queue_name, q.queue_ntables, q.queue_cur_table,
         q.queue_rotation_period, q.queue_switch_time,
-        q.queue_external_ticker,
+        q.queue_external_ticker, q.queue_ticker_paused,
         q.queue_ticker_max_count, q.queue_ticker_max_lag,
         q.queue_ticker_idle_period,
         (select current_timestamp - tick_time
index dce20f897097606688f3d5b9f5dbfddf8ac2b936..7a14f00567c3ed4f1282a2e5928ccd2a4ff0eb78 100644 (file)
@@ -38,6 +38,7 @@ begin
         'queue_ticker_max_count',
         'queue_ticker_max_lag',
         'queue_ticker_idle_period',
+        'queue_ticker_paused',
         'queue_rotation_period',
         'queue_external_ticker')
     then
index ebfc907c397a3aceb4ae93c593d86b30788ea657..01391e77837c63f4b7f08fa97c95b7237c1d56da 100644 (file)
@@ -17,9 +17,10 @@ begin
     select queue_id, i_tick_id, i_orig_timestamp, i_event_seq
         from pgq.queue
         where queue_name = i_queue_name
-          and queue_external_ticker;
+          and queue_external_ticker
+          and not queue_ticker_paused;
     if not found then
-        raise exception 'queue not found or external ticker disabled: %', i_queue_name;
+        raise exception 'queue not found or ticker disabled: %', i_queue_name;
     end if;
 
     -- make sure seqs stay current
@@ -57,7 +58,8 @@ begin
     select queue_id, queue_tick_seq, queue_external_ticker,
             queue_ticker_max_count, queue_ticker_max_lag,
             queue_ticker_idle_period, queue_event_seq,
-            pgq.seq_getval(queue_event_seq) as event_seq
+            pgq.seq_getval(queue_event_seq) as event_seq,
+            queue_ticker_paused
         into q
         from pgq.queue where queue_name = i_queue_name;
     if not found then
@@ -68,6 +70,10 @@ begin
         raise exception 'This queue has external tick source.';
     end if;
 
+    if q.queue_ticker_paused then
+        raise exception 'Ticker has been paused for this queue';
+    end if;
+
     -- load state from last tick
     select now() - tick_time as lag,
            q.event_seq - tick_event_seq as new_events,
@@ -120,7 +126,7 @@ create or replace function pgq.ticker() returns bigint as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq.ticker(0)
 --
---     Creates ticks for all queues which dont have external ticker.
+--     Creates ticks for all unpaused queues which dont have external ticker.
 --
 -- Returns:
 --     Number of queues that were processed.
@@ -133,6 +139,7 @@ begin
     for q in
         select queue_name from pgq.queue
             where not queue_external_ticker
+                  and not queue_ticker_paused
             order by queue_name
     loop
         if pgq.ticker(q.queue_name) > 0 then
index 6e88c7c5145d026cd15d78c76f7655017700de56..f1a2db3c1512597212e2e90483cc9cb11edef79f 100644 (file)
@@ -59,6 +59,7 @@ create table pgq.consumer (
 --      queue_switch_step2          - tx after rotation was committed
 --      queue_switch_time           - time when switch happened
 --      queue_external_ticker       - ticks come from some external sources
+--      queue_ticker_paused         - ticker is paused
 --      queue_disable_insert        - disallow pgq.insert_event()
 --      queue_ticker_max_count      - batch should not contain more events
 --      queue_ticker_max_lag        - events should not age more
@@ -81,6 +82,7 @@ create table pgq.queue (
 
         queue_external_ticker       boolean     not null default false,
         queue_disable_insert        boolean     not null default false,
+        queue_ticker_paused         boolean     not null default false,
 
         queue_ticker_max_count      integer     not null default 500,
         queue_ticker_max_lag        interval    not null default '3 seconds',
index bdd3a4d783899db1ba99439ec63acdf02c610017..20a5c2177ac640dbcc20fb0f2fba6528915cca73 100644 (file)
@@ -12,6 +12,8 @@
 #define W_SOCK 1
 #define W_TIME 2
 
+typedef void (*libev_cb)(int sock, short flags, void *arg);
+
 struct PgSocket {
        struct event ev;
 
@@ -22,8 +24,6 @@ struct PgSocket {
        void *handler_arg;
 };
 
-typedef void (*libev_cb)(int sock, short flags, void *arg);
-
 static void send_event(struct PgSocket *db, enum PgEvent ev)
 {
        db->handler_func(db, db->handler_arg, ev, NULL);