- added pgq.set_queue_config function
authorMartin Pihlak <martin.pihlak@gmail.com>
Fri, 20 Mar 2009 12:09:44 +0000 (14:09 +0200)
committerMartin Pihlak <martin.pihlak@gmail.com>
Fri, 20 Mar 2009 12:50:50 +0000 (14:50 +0200)
- almost usable ALTER QUEUE
- simple feedback for non-show commands

python/newadm.py
sql/pgq/functions/pgq.set_queue_config.sql [new file with mode: 0644]
sql/pgq/structure/func_public.sql

index 5f8bf989ef2e7c7370e4525737e4429bbaa6bfb7..3dee23f1db67bf884f5248e5fab25921119e1c76 100755 (executable)
@@ -7,7 +7,8 @@
     install pgq | londiste;
     create queue <qname>;
     drop queue <qname>;
-    show queue *;
+    show queue <qname | *>;
+    alter queue <qname | *> set param = , ...;
 
 Following commands expect default queue:
 
@@ -17,10 +18,6 @@ Following commands expect default queue:
     show queue <qname>;
     show batch <batch_id>;
     show batch <consumer>;
-
-Only syntax:
-
-    alter queue <qname> set param = , ...;
 """
 
 # unimplemented:
@@ -267,15 +264,20 @@ w_install = WList(
 w_qargs2 = Proxy()
 
 w_qargs = WList(
-    SWord('idle_period', EQ(Value(w_qargs2, name = 'idle_period'))),
-    SWord('max_count', EQ(Value(w_qargs2, name = 'max_count'))),
-    SWord('max_lag', EQ(Value(w_qargs2, name = 'max_lag'))))
+    SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))),
+    SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))),
+    SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))),
+    SWord('paused', EQ(Value(w_qargs2, name = 'external_ticker'))))
 
 w_qargs2.set_real(WList(
     w_done,
     Symbol(',', w_qargs)))
 
-w_alter_q = Queue(Word('set', w_qargs), name = 'queue')
+w_set_q = Word('set', w_qargs)
+
+w_alter_q = WList(
+        Symbol('*', w_set_q, name = 'queue'),
+        Queue(w_set_q, name = 'queue'))
 
 w_alter = Word('queue', w_alter_q, name = 'cmd2')
 
@@ -607,15 +609,17 @@ class AdminConsole:
         if not cmd:
             print 'parse error: no command found'
             return
+        runcmd = cmd
         if cmd2:
-            cmd = "%s_%s" % (cmd, cmd2)
+            runcmd += "_" + cmd2
         #print 'RUN', repr(params)
-        fn = getattr(self, 'cmd_' + cmd, self.bad_cmd)
+        fn = getattr(self, 'cmd_' + runcmd, self.bad_cmd)
         try:
             fn(params)
-            #print 'OK'
+            if cmd != "show":
+                print cmd.upper()
         except Exception, ex:
-            print str(ex)
+            print "ERROR: %s" % str(ex).strip()
 
     def bad_cmd(self, params):
         print 'unimplemented command'
@@ -699,6 +703,7 @@ class AdminConsole:
             "queue_ticker_max_count as max_cnt",
             "queue_ticker_max_lag as max_lag",
             "queue_ticker_idle_period as idle_period",
+            "queue_external_ticker as paused",
             "ticker_lag",
         ]
         pfx = "select " + ",".join(fields)
@@ -767,6 +772,33 @@ class AdminConsole:
         q = "select * from pgq.drop_queue(%(queue)s)"
         curs.execute(q, params)
 
+    def cmd_alter_queue(self, params):
+        """Alter queue parameters, accepts * for all queues"""
+        queue = params.get('queue')
+        curs = self.db.cursor()
+        if queue == '*':
+            # operate on list of queues
+            q = "select queue_name from pgq.get_queue_info()"
+            curs.execute(q)
+            qlist = [ r[0] for r in curs.fetchall() ]
+        else:
+            # just single queue specified
+            qlist = [ queue ]
+
+        for qname in qlist:
+            params['queue'] = qname
+
+            # loop through the parameters, passing any unrecognized
+            # key down pgq.set_queue_config
+            for k in params:
+                if k in ('queue', 'cmd', 'cmd2'):
+                    continue
+
+                q = "select * from pgq.set_queue_config" \
+                    "(%%(queue)s, '%s', %%(%s)s)" % (k, k)
+
+                curs.execute(q, params)
+
     def cmd_show_help(self, params):
         print __doc__
 
diff --git a/sql/pgq/functions/pgq.set_queue_config.sql b/sql/pgq/functions/pgq.set_queue_config.sql
new file mode 100644 (file)
index 0000000..dce20f8
--- /dev/null
@@ -0,0 +1,54 @@
+
+create or replace function pgq.set_queue_config(
+    x_queue_name    text,
+    x_param_name    text,
+    x_param_value   text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.set_queue_config(3)
+--
+--
+--     Set configuration for specified queue.
+--
+-- Parameters:
+--      x_queue_name    - Name of the queue to configure.
+--      x_param_name    - Configuration parameter name.
+--      x_param_value   - Configuration parameter value.
+--  
+-- Returns:
+--     0 if event was already in queue, 1 otherwise.
+-- ----------------------------------------------------------------------
+declare
+    v_param_name    text;
+begin
+    -- discard NULL input
+    if x_queue_name is null or x_param_name is null then
+        raise exception 'Invalid NULL value';
+    end if;
+
+    -- check if queue exists
+    perform 1 from pgq.queue where queue_name = x_queue_name;
+    if not found then
+        raise exception 'No such event queue';
+    end if;
+
+    -- check if valid parameter name
+    v_param_name := 'queue_' || x_param_name;
+    if v_param_name not in (
+        'queue_ticker_max_count',
+        'queue_ticker_max_lag',
+        'queue_ticker_idle_period',
+        'queue_rotation_period',
+        'queue_external_ticker')
+    then
+        raise exception 'cannot change parameter "%s"', x_param_name;
+    end if;
+
+    execute 'update pgq.queue set ' 
+        || v_param_name || ' = ' || quote_literal(x_param_value)
+        || ' where queue_name = ' || quote_literal(x_queue_name);
+
+    return 1;
+end;
+$$ language plpgsql security definer;
+
index 4440a22bde6a9cf5f218fee07f1eb4cf47204a9c..86be2b39940686ae7e3b3d067f1a6c87165262c2 100644 (file)
@@ -4,6 +4,7 @@
 
 \i functions/pgq.create_queue.sql
 \i functions/pgq.drop_queue.sql
+\i functions/pgq.set_queue_config.sql
 
 -- Group: Event publishing