install pgq | londiste;
create queue <qname>;
drop queue <qname>;
- show queue *;
+ show queue <qname | *>;
+ alter queue <qname | *> set param = , ...;
Following commands expect default queue:
show queue <qname>;
show batch <batch_id>;
show batch <consumer>;
-
-Only syntax:
-
- alter queue <qname> set param = , ...;
"""
# unimplemented:
w_qargs2 = Proxy()
w_qargs = WList(
- SWord('idle_period', EQ(Value(w_qargs2, name = 'idle_period'))),
- SWord('max_count', EQ(Value(w_qargs2, name = 'max_count'))),
- SWord('max_lag', EQ(Value(w_qargs2, name = 'max_lag'))))
+ SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))),
+ SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))),
+ SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))),
+ SWord('paused', EQ(Value(w_qargs2, name = 'external_ticker'))))
w_qargs2.set_real(WList(
w_done,
Symbol(',', w_qargs)))
-w_alter_q = Queue(Word('set', w_qargs), name = 'queue')
+w_set_q = Word('set', w_qargs)
+
+w_alter_q = WList(
+ Symbol('*', w_set_q, name = 'queue'),
+ Queue(w_set_q, name = 'queue'))
w_alter = Word('queue', w_alter_q, name = 'cmd2')
if not cmd:
print 'parse error: no command found'
return
+ runcmd = cmd
if cmd2:
- cmd = "%s_%s" % (cmd, cmd2)
+ runcmd += "_" + cmd2
#print 'RUN', repr(params)
- fn = getattr(self, 'cmd_' + cmd, self.bad_cmd)
+ fn = getattr(self, 'cmd_' + runcmd, self.bad_cmd)
try:
fn(params)
- #print 'OK'
+ if cmd != "show":
+ print cmd.upper()
except Exception, ex:
- print str(ex)
+ print "ERROR: %s" % str(ex).strip()
def bad_cmd(self, params):
print 'unimplemented command'
"queue_ticker_max_count as max_cnt",
"queue_ticker_max_lag as max_lag",
"queue_ticker_idle_period as idle_period",
+ "queue_external_ticker as paused",
"ticker_lag",
]
pfx = "select " + ",".join(fields)
q = "select * from pgq.drop_queue(%(queue)s)"
curs.execute(q, params)
+ def cmd_alter_queue(self, params):
+ """Alter queue parameters, accepts * for all queues"""
+ queue = params.get('queue')
+ curs = self.db.cursor()
+ if queue == '*':
+ # operate on list of queues
+ q = "select queue_name from pgq.get_queue_info()"
+ curs.execute(q)
+ qlist = [ r[0] for r in curs.fetchall() ]
+ else:
+ # just single queue specified
+ qlist = [ queue ]
+
+ for qname in qlist:
+ params['queue'] = qname
+
+ # loop through the parameters, passing any unrecognized
+ # key down pgq.set_queue_config
+ for k in params:
+ if k in ('queue', 'cmd', 'cmd2'):
+ continue
+
+ q = "select * from pgq.set_queue_config" \
+ "(%%(queue)s, '%s', %%(%s)s)" % (k, k)
+
+ curs.execute(q, params)
+
def cmd_show_help(self, params):
print __doc__
--- /dev/null
+
+create or replace function pgq.set_queue_config(
+ x_queue_name text,
+ x_param_name text,
+ x_param_value text)
+returns integer as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq.set_queue_config(3)
+--
+--
+-- Set configuration for specified queue.
+--
+-- Parameters:
+-- x_queue_name - Name of the queue to configure.
+-- x_param_name - Configuration parameter name.
+-- x_param_value - Configuration parameter value.
+--
+-- Returns:
+-- 0 if event was already in queue, 1 otherwise.
+-- ----------------------------------------------------------------------
+declare
+ v_param_name text;
+begin
+ -- discard NULL input
+ if x_queue_name is null or x_param_name is null then
+ raise exception 'Invalid NULL value';
+ end if;
+
+ -- check if queue exists
+ perform 1 from pgq.queue where queue_name = x_queue_name;
+ if not found then
+ raise exception 'No such event queue';
+ end if;
+
+ -- check if valid parameter name
+ v_param_name := 'queue_' || x_param_name;
+ if v_param_name not in (
+ 'queue_ticker_max_count',
+ 'queue_ticker_max_lag',
+ 'queue_ticker_idle_period',
+ 'queue_rotation_period',
+ 'queue_external_ticker')
+ then
+ raise exception 'cannot change parameter "%s"', x_param_name;
+ end if;
+
+ execute 'update pgq.queue set '
+ || v_param_name || ' = ' || quote_literal(x_param_value)
+ || ' where queue_name = ' || quote_literal(x_queue_name);
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+