newadm: more commands work now
authorMarko Kreen <markokr@gmail.com>
Wed, 11 Mar 2009 13:53:05 +0000 (15:53 +0200)
committerMarko Kreen <markokr@gmail.com>
Thu, 12 Mar 2009 11:54:23 +0000 (13:54 +0200)
- register
- unregister
- create queue
- drop queue
- show batch
- show queue

python/newadm.py

index 00af5bf6af7d249443ffa3e1b627c34e947f3fbb..54fa1fef0fdc5f8e4929549ce38fdb58cf818570 100755 (executable)
@@ -9,32 +9,29 @@ connect queue=.. node=..;
 install pgq;
 install londiste;
 
----------------------
-show all queues | consumers;
-
-show queue ..;
-show consumer ..;
-show batch events <>;
-show batch info <>;
+create queue <qname>;
+drop queue <qname>;
 
-show_queue_info <q>;
-show_queue_stats <q>;
-show_consumer_batch <cons>;
-show_batch_info <bid>;
-show_batch_events <bid> [ev_id];
+show queue *;
 
----------------------
-alter queue <qname> set param = , ...;
+// following cmds expect default queue
 
-create queue <qname>; // db
 register consumer foo;
-unregister consumer foo
-drop queue <q>;
+unregister consumer foo;
+
+show queue <qname>;
+show batch <batch_id>;
+show batch <consumer>;
+
+// only syntax
+
+alter queue <qname> set param = , ...;
 
-------------
+---------------------
+show consumers;
+show_queue_stats <q>;
 create node <foo>; // 
 create node <qname>.<foo>; // 
-
 add location <node> <loc>; // db, queue
 ----------------
 
@@ -82,8 +79,39 @@ def unquote_any(self, s):
         elif c == '"':
             s = skytools.unquote_ident(c)
         # extquote?
+        else:
+            s = s.lower()
     return s
 
+def display_result(curs, desc, fields = []):
+    """Display multirow query as a table."""
+
+    rows = curs.fetchall()
+
+    if not fields:
+        fields = [f[0] for f in curs.description]
+
+    widths = [15] * len(fields)
+    for i, f in enumerate(fields):
+        rlen = len(f)
+        widths[i] = widths[i] > rlen and widths[i] or rlen
+    for row in rows:
+        for i, k in enumerate(fields):
+            rlen = row[k] and len(row) or 0
+            widths[i] = widths[i] > rlen and widths[i] or rlen
+    widths = [w + 2 for w in widths]
+
+    fmt = '%%-%ds' * (len(widths) - 1) + '%%s'
+    fmt = fmt % tuple(widths[:-1])
+    if desc:
+        print(desc)
+    print(fmt % tuple(fields))
+    print(fmt % tuple(['-'*15] * len(fields)))
+
+    for row in rows:
+        print(fmt % tuple([row[k] for k in fields]))
+    print('\n')
+
 class Node:
     def __init__(self, **kwargs):
         self.name = kwargs.get('name')
@@ -92,6 +120,10 @@ class Node:
     def get_completions(self, params):
         return []
 
+##
+## Token classes
+##
+
 class Proxy(Node):
     def set_real(self, node):
         self.get_next = node.get_next
@@ -101,49 +133,50 @@ class WList(Node):
     c_append = ' '
     def __init__(self, *args, **kwargs):
         Node.__init__(self, **kwargs)
-        self.wlist = args
-
-    def get_wlist(self):
-        return self.wlist
+        self.tok_list = args
 
     def get_next(self, typ, word, params):
         cw = word.lower()
-        for w in self.get_wlist():
-            if w.word == cw:
-                if self.name:
-                    params[self.name] = cw
-                return w.next
+        for w in self.tok_list:
+            n = w.get_next(typ, word, params)
+            if n:
+                if self.name: # and not self.name in params:
+                    params[self.name] = word
+                return n
         return None
 
     def get_completions(self, params):
-        wlist = self.get_wlist()
         comp_list = []
-        for w in wlist:
+        for w in self.tok_list:
             comp_list += w.get_completions(params)
         return comp_list
 
-class DynList(Node):
-    tk_type = ('ident',)
+class Word(Node):
+    tk_type = ("ident",)
     c_append = ' '
-    def __init__(self, next, **kwargs):
+    def __init__(self, word, next, **kwargs):
         Node.__init__(self, **kwargs)
+        self.word = word
         self.next = next
-
     def get_wlist(self):
-        return []
-
+        return [self.word]
     def get_next(self, typ, word, params):
         if typ not in self.tk_type:
             return None
-        if self.name:
+        if self.word and word != self.word:
+            return None
+        if self.name: # and not self.name in params:
             params[self.name] = word
         return self.next
-
     def get_completions(self, params):
         wlist = self.get_wlist()
         comp_list = [w + self.c_append for w in wlist]
         return comp_list
 
+class DynList(Word):
+    def __init__(self, next, **kwargs):
+        Word.__init__(self, None, next, **kwargs)
+
 class Queue(DynList):
     def get_wlist(self):
         return script.get_queue_list()
@@ -164,25 +197,20 @@ class User(DynList):
     def get_wlist(self):
         return script.get_user_list()
 
+class Consumer(DynList):
+    def get_wlist(self):
+        return script.get_consumer_list()
+
+class BatchId(DynList):
+    tk_type = ("num",)
+    def get_wlist(self):
+        return script.get_batch_list()
+
 class Port(DynList):
     tk_type = ("num",)
     def get_wlist(self):
         return ['5432', '6432']
 
-class Word(Node):
-    tk_type = ("ident",)
-    c_append = ' '
-    def __init__(self, word, next, **kwargs):
-        Node.__init__(self, **kwargs)
-        self.word = word
-        self.next = next
-    def get_next(self, typ, word, params):
-        if typ in self.tk_type and word == self.word:
-            return self.next
-        return None
-    def get_completions(self, params):
-        return [self.word + self.c_append]
-
 class SWord(Word):
     c_append = '='
 
@@ -194,18 +222,9 @@ class EQ(Symbol):
     def __init__(self, next):
         Symbol.__init__(self, '=', next)
 
-class Value(Node):
+class Value(DynList):
     tk_type = ("str", "num", "ident")
-    def __init__(self, next, **kwargs):
-        Node.__init__(self, **kwargs)
-        self.next = next
-    def get_next(self, typ, word, params):
-        if typ not in self.tk_type:
-            return None
-        if self.name:
-            params[self.name] = word
-        return self.next
-    def get_completions(self, params):
+    def get_wlist(self):
         return []
 
 ##
@@ -230,30 +249,58 @@ w_connect.set_real(
         SWord('node', EQ(DBNode(w_connect, name = 'node'))),
         w_done))
 
-w_set = WList(
-    SWord('queue', EQ(Queue(w_done, name = 'value'))),
-    SWord('consumer', EQ(Value(w_done, name = 'value'))),
-    name = "param")
+w_show_batch = WList(
+    BatchId(w_done, name = 'batch_id'),
+    Consumer(w_done, name = 'consumer'))
+
+w_show_queue = WList(
+    Symbol('*', w_done, name = 'queue'),
+    Queue(w_done, name = 'queue'),
+    w_done)
 
 w_show = WList(
-    Word('queues', w_done),
-    Word('databases', w_done),
-    Word('consumers', w_done),
-    Word('stats', w_done),
-    name = "show")
+    Word('batch', w_show_batch),
+    Word('queue', w_show_queue),
+    name = "cmd2")
 
 w_install = WList(
     Word('pgq', w_done),
     Word('londiste', w_done),
     name = 'module')
 
-w_create = Word('queue', Value(w_done, name = 'queue'))
+w_qargs2 = Proxy()
+
+w_qargs = WList(
+    SWord('idle_period', EQ(Value(w_qargs2, name = 'idle_period'))),
+    SWord('max_count', EQ(Value(w_qargs2, name = 'max_count'))),
+    SWord('max_lag', EQ(Value(w_qargs2, name = 'max_lag'))))
+
+w_qargs2.set_real(WList(
+    w_done,
+    Symbol(',', w_qargs)))
+
+w_alter_q = Queue(Word('set', w_qargs), name = 'queue')
+
+w_alter = Word('queue', w_alter_q, name = 'cmd2')
+
+w_create = Word('queue', Queue(w_done, name = 'queue'),
+        name = 'cmd2')
+
+w_drop = Word('queue', Queue(w_done, name = 'queue'),
+        name = 'cmd2')
+
+w_cons_name = Word('consumer',
+        Consumer(w_done, name = 'consumer'),
+        name = 'cmd2')
 
 w_top = WList(
+    Word('alter', w_alter),
     Word('connect', w_connect),
     Word('create', w_create),
+    Word('drop', w_drop),
     Word('install', w_install),
-    Word('set', w_set),
+    Word('register', w_cons_name),
+    Word('unregister', w_cons_name),
     Word('show', w_show),
     name = "cmd")
 
@@ -302,6 +349,14 @@ class AdminConsole:
         q = "select distinct node_name from pgq_node.node_location order by 1"
         return self._ccache('node_list', q, 'pgq_node')
 
+    def get_batch_list(self):
+        if not self.cur_queue:
+            return []
+        qname = skytools.quote_literal(self.cur_queue)
+        q = "select current_batch::text from pgq.get_consumer_info(%s)"\
+            " where current_batch is not null order by 1" % qname
+        return self._ccache('batch_list', q, 'pgq')
+
     def _ccache(self, cname, q, req_schema = None):
         if not self.db:
             return []
@@ -350,7 +405,7 @@ class AdminConsole:
         return clist
 
     def parse_cmdline(self, argv):
-        switches = "c:h:p:d:U:f:"
+        switches = "c:h:p:d:U:f:Q:"
         lswitches = ['help', 'version']
         try:
             opts, args = getopt.getopt(argv, switches, lswitches)
@@ -382,6 +437,8 @@ class AdminConsole:
                 cstr_map['dbname'] = a
             elif o == "-U":
                 cstr_map['user'] = a
+            elif o == "-Q":
+                self.cur_queue = a
             elif o == "-c":
                 self.cmd_str = a
             elif o == "-f":
@@ -494,21 +551,6 @@ class AdminConsole:
     def reset_comp_cache(self):
         self.comp_cache = {}
 
-    def find_suggestions_real(self, pfx, params):
-        # find level
-        node = top_level
-        for typ, w in self.sql_words(pfx):
-            w = w.lower()
-            node = node.get_next(typ, w, params)
-            if not node:
-                break
-
-        # find possible matches
-        if node:
-            return node.get_completions(params)
-        else:
-            return []
-
     def find_suggestions(self, pfx, curword, params = {}):
         c_pfx = self.comp_cache.get('comp_pfx')
         c_list = self.comp_cache.get('comp_list', [])
@@ -524,6 +566,21 @@ class AdminConsole:
                 res.append(cword)
         return res
 
+    def find_suggestions_real(self, pfx, params):
+        # find level
+        node = top_level
+        for typ, w in self.sql_words(pfx):
+            w = w.lower()
+            node = node.get_next(typ, w, params)
+            if not node:
+                break
+
+        # find possible matches
+        if node:
+            return node.get_completions(params)
+        else:
+            return []
+
     def exec_string(self, ln, eof = False):
         node = top_level
         params = {}
@@ -547,13 +604,21 @@ class AdminConsole:
             print "multi-line commands not supported:", repr(ln)
 
     def exec_params(self, params):
+        print 'RUN', params
         cmd = params.get('cmd')
+        cmd2 = params.get('cmd2')
         if not cmd:
             print 'parse error: no command found'
             return
+        if cmd2:
+            cmd = "%s_%s" % (cmd, cmd2)
         #print 'RUN', repr(params)
         fn = getattr(self, 'cmd_' + cmd, self.bad_cmd)
-        fn(params)
+        try:
+            fn(params)
+            print 'OK'
+        except Exception, ex:
+            print str(ex)
 
     def bad_cmd(self, params):
         print 'unimplemented command'
@@ -623,6 +688,88 @@ class AdminConsole:
         skytools.db_install(curs, objs, None)
         print "%s installed" % mod_name
 
+    def cmd_show_queue(self, params):
+        queue = params.get('queue')
+        if queue is None:
+            queue = self.cur_queue
+            if not queue:
+                print 'No default queue'
+                return
+        curs = self.db.cursor()
+        fields = [
+            "queue_name",
+            "queue_cur_table || '/' || queue_ntables as tables",
+            "queue_ticker_max_count as max_cnt",
+            "queue_ticker_max_lag as max_lag",
+            "queue_ticker_idle_period as idle_period",
+            "ticker_lag",
+        ]
+        pfx = "select " + ",".join(fields)
+
+        if queue == '*':
+            q = pfx + " from pgq.get_queue_info()"
+            curs.execute(q)
+        else:
+            q = pfx + " from pgq.get_queue_info(%s)"
+            curs.execute(q, [queue])
+
+        display_result(curs, 'Queues')
+
+    def cmd_show_batch(self, params):
+        batch_id = params.get('batch_id')
+        consumer = params.get('consumer')
+        queue = self.cur_queue
+        if not queue:
+            print 'No default queue'
+            return
+        curs = self.db.cursor()
+        if consumer:
+            q = "select current_batch from pgq.get_consumer_info(%s, %s)"
+            curs.execute(q, [queue, consumer])
+            res = curs.fetchall()
+            if len != 1:
+                print 'no such consumer'
+                return
+            batch_id = res[0]['current_batch']
+            if batch_id is None:
+                print 'consumer has no open batch'
+                return
+
+        q = "select * from pgq.get_batch_events(%s)"
+        curs.execute(q, [batch_id])
+
+        display_result(curs, 'Batch events')
+
+    def cmd_register_consumer(self, params):
+        queue = self.cur_queue
+        if not queue:
+            print 'No default queue'
+            return
+        consumer = params['consumer']
+        curs = self.db.cursor()
+        q = "select * from pgq.register_consumer(%s, %s)"
+        curs.execute(q, [queue, consumer])
+
+    def cmd_unregister_consumer(self, params):
+        queue = self.cur_queue
+        if not queue:
+            print 'No default queue'
+            return
+        consumer = params['consumer']
+        curs = self.db.cursor()
+        q = "select * from pgq.unregister_consumer(%s, %s)"
+        curs.execute(q, [queue, consumer])
+
+    def cmd_create_queue(self, params):
+        curs = self.db.cursor()
+        q = "select * from pgq.create_queue(%(queue)s)"
+        curs.execute(q, params)
+
+    def cmd_drop_queue(self, params):
+        curs = self.db.cursor()
+        q = "select * from pgq.drop_queue(%(queue)s)"
+        curs.execute(q, params)
+
 def main():
     global script