qadmin: parser cleanup, make * optional.
authorMarko Kreen <markokr@gmail.com>
Tue, 24 Nov 2009 15:43:33 +0000 (17:43 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 24 Nov 2009 15:43:33 +0000 (17:43 +0200)
python/qadmin.py

index 0c8d625be0271c34b37b67cbfb3fda2664f11c7d..54abf8724c00b7f48ee1d0e71174a773b1e5773e 100755 (executable)
@@ -3,23 +3,22 @@
 """Commands that require only database connection:
 
     connect dbname=.. host=.. service=.. queue=..;
-    connect queue=.. [ node=.. ];
+    connect [ queue=.. ] [ node=.. ];
     install pgq | londiste;
 
     create queue <qname>;
     alter queue <qname | *> set param = , ...;
     drop queue <qname>;
-    show queue <qname | *>;
+    show queue [ <qname | *> ];
 
     register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ;
     unregister consumer foo [from <qname>];
-    show consumer <cname | *> [on <qname>];
+    show consumer [ <cname | *> [on <qname>] ];
 
-    show node <node | *> [on <qname>];
+    show node [ <node | *> [on <qname>] ];
 
 Following commands expect default queue:
 
-    show queue;
     show batch <batch_id>;
     show batch <consumer>;
 """
@@ -30,6 +29,13 @@ create <root | branch | leaf> node <node> location <loc> [on <qname>];
 drop node <node> [on <qname>];
 alter node <node> [location=<loc>]
 show_queue_stats <q>;
+
+change provider
+drop node
+status
+rename node
+
+
 """
 
 __version__ = '0.1'
@@ -53,11 +59,10 @@ General options:
     --version
 '''
 
-import sys, os, readline, getopt, re
+import sys, os, readline, getopt, re, psycopg2
 
 import pkgloader
 pkgloader.require('skytools', '3.0')
-
 import skytools
 
 script = None
@@ -92,7 +97,7 @@ def display_result(curs, desc, fields = []):
     if not fields:
         fields = [f[0] for f in curs.description]
 
-    widths = [15] * len(fields)
+    widths = [10] * len(fields)
     for i, f in enumerate(fields):
         rlen = len(f)
         if rlen > widths[i]:
@@ -113,29 +118,37 @@ def display_result(curs, desc, fields = []):
 
     for row in rows:
         print(fmt % tuple([row[k] for k in fields]))
-    print('\n')
+    print('')
+
+##
+## Base token classes
+##
 
-class Node:
+class Token:
+    """Base class for tokens.
+    
+    The optional 'param' kwarg will set corresponding key in
+    'params' dict to final token value.
+    """
     def __init__(self, **kwargs):
         self.name = kwargs.get('name')
     def get_next(self, typ, word, params):
+        """Return next token if 'word' matches this token."""
         return None
     def get_completions(self, params):
+        """Return list of all completions possible at this point."""
         return []
 
-##
-## Token classes
-##
-
-class Proxy(Node):
+class Proxy(Token):
+    """Forward def for Token, in case it needs to be referenced recursively."""
     def set_real(self, node):
         self.get_next = node.get_next
         self.get_completions = node.get_completions
 
-class WList(Node):
-    c_append = ' '
+class List(Token):
+    """List of Tokens, will be tried sequentially until one matches."""
     def __init__(self, *args, **kwargs):
-        Node.__init__(self, **kwargs)
+        Token.__init__(self, **kwargs)
         self.tok_list = args
 
     def get_next(self, typ, word, params):
@@ -143,7 +156,7 @@ class WList(Node):
         for w in self.tok_list:
             n = w.get_next(typ, word, params)
             if n:
-                if self.name: # and not self.name in params:
+                if self.name:
                     params[self.name] = word
                 return n
         return None
@@ -154,11 +167,18 @@ class WList(Node):
             comp_list += w.get_completions(params)
         return comp_list
 
-class Word(Node):
+##
+## Dynamic token classes
+##
+
+class Word(Token):
+    """Single fixed word."""
+    # token types to accept
     tk_type = ("ident",)
+    # string to append to completions
     c_append = ' '
     def __init__(self, word, next, **kwargs):
-        Node.__init__(self, **kwargs)
+        Token.__init__(self, **kwargs)
         self.word = word
         self.next = next
     def get_wlist(self):
@@ -177,14 +197,18 @@ class Word(Node):
         return comp_list
 
 class DynList(Word):
+    """Dynamically generated list of words."""
     def __init__(self, next, **kwargs):
         Word.__init__(self, None, next, **kwargs)
+    def get_wlist(self):
+        return []
 
 class DynIdentList(DynList):
+    """Accept quoted words."""
     def get_next(self, typ, word, params):
         """Allow quoted queue names"""
         next = DynList.get_next(self, typ, word, params)
-        if next:
+        if next and self.name:
             params[self.name] = unquote_any(word)
         return next
 
@@ -227,16 +251,16 @@ class Port(DynList):
     def get_wlist(self):
         return ['5432', '6432']
 
-class SWord(Word):
-    c_append = '='
-
 class Symbol(Word):
     tk_type = ("sym",)
     c_append = ''
 
-class EQ(Symbol):
-    def __init__(self, next):
-        Symbol.__init__(self, '=', next)
+class WordEQ(Word):
+    """Word that is followed by '='."""
+    c_append = '='
+    def __init__(self, word, next, **kwargs):
+        next = Symbol('=', next)
+        Word.__init__(self, word, next, **kwargs)
 
 class Value(DynList):
     tk_type = ("str", "num", "ident")
@@ -255,43 +279,43 @@ eq_val = Symbol('=', Value(w_done, name = 'value'))
 
 w_connect = Proxy()
 w_connect.set_real(
-    WList(
-        SWord('dbname', EQ(Database(w_connect, name = 'dbname'))),
-        SWord('host', EQ(Host(w_connect, name = 'host'))),
-        SWord('port', EQ(Port(w_connect, name = 'port'))),
-        SWord('user', EQ(User(w_connect, name = 'user'))),
-        SWord('password', EQ(Value(w_connect, name = 'password'))),
-        SWord('queue', EQ(Queue(w_connect, name = 'queue'))),
-        SWord('node', EQ(DBNode(w_connect, name = 'node'))),
+    List(
+        WordEQ('dbname', Database(w_connect, name = 'dbname')),
+        WordEQ('host', Host(w_connect, name = 'host')),
+        WordEQ('port', Port(w_connect, name = 'port')),
+        WordEQ('user', User(w_connect, name = 'user')),
+        WordEQ('password', Value(w_connect, name = 'password')),
+        WordEQ('queue', Queue(w_connect, name = 'queue')),
+        WordEQ('node', DBNode(w_connect, name = 'node')),
         w_done))
 
-w_show_batch = WList(
+w_show_batch = List(
     BatchId(w_done, name = 'batch_id'),
     Consumer(w_done, name = 'consumer'))
 
-w_show_queue = WList(
+w_show_queue = List(
     Symbol('*', w_done, name = 'queue'),
     Queue(w_done, name = 'queue'),
     w_done)
 
-w_show_on_queue = WList(
+w_show_on_queue = List(
     Symbol('*', w_done, name = 'queue'),
     Queue(w_done, name = 'queue'),
     )
 
-w_on_queue = WList(Word('on', w_show_on_queue), w_done)
+w_on_queue = List(Word('on', w_show_on_queue), w_done)
 
-w_show_consumer = WList(
+w_show_consumer = List(
     Symbol('*', w_on_queue, name = 'consumer'),
     Consumer(w_on_queue, name = 'consumer'),
-    )
+    w_done)
 
-w_show_node = WList(
+w_show_node = List(
     Symbol('*', w_on_queue, name = 'node'),
     DBNode(w_on_queue, name = 'node'),
-    )
+    w_done)
 
-w_show = WList(
+w_show = List(
     Word('batch', w_show_batch),
     Word('help', w_done),
     Word('queue', w_show_queue),
@@ -299,26 +323,26 @@ w_show = WList(
     Word('node', w_show_node),
     name = "cmd2")
 
-w_install = WList(
+w_install = List(
     Word('pgq', w_done),
     Word('londiste', w_done),
     name = 'module')
 
 w_qargs2 = Proxy()
 
-w_qargs = WList(
-    SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))),
-    SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))),
-    SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))),
-    SWord('paused', EQ(Value(w_qargs2, name = 'ticker_paused'))))
+w_qargs = List(
+    WordEQ('idle_period', Value(w_qargs2, name = 'ticker_idle_period')),
+    WordEQ('max_count', Value(w_qargs2, name = 'ticker_max_count')),
+    WordEQ('max_lag', Value(w_qargs2, name = 'ticker_max_lag')),
+    WordEQ('paused', Value(w_qargs2, name = 'ticker_paused')))
 
-w_qargs2.set_real(WList(
+w_qargs2.set_real(List(
     w_done,
     Symbol(',', w_qargs)))
 
 w_set_q = Word('set', w_qargs)
 
-w_alter_q = WList(
+w_alter_q = List(
         Symbol('*', w_set_q, name = 'queue'),
         Queue(w_set_q, name = 'queue'))
 
@@ -331,7 +355,7 @@ w_drop = Word('queue', Queue(w_done, name = 'queue'),
         name = 'cmd2')
 
 w_reg_target = Proxy()
-w_reg_target.set_real(WList(
+w_reg_target.set_real(List(
         Word('on', Queue(w_reg_target, name = 'queue')),
         Word('copy', Consumer(w_reg_target, name = 'copy_reg')),
         Word('at', TickId(w_reg_target, name = 'at_tick')),
@@ -341,16 +365,13 @@ w_cons_on_queue = Word('consumer',
         Consumer(w_reg_target, name = 'consumer'),
         name = 'cmd2')
 
-w_from_queue = Word('from',
-        Queue(w_done, name = 'queue'),
-        name = 'fromqueue')
+w_from_queue = Word('from', Queue(w_done, name = 'queue'))
 
 w_cons_from_queue = Word('consumer',
-        Consumer(WList(w_done, w_from_queue),
-                name = 'consumer'),
+        Consumer(List(w_done, w_from_queue), name = 'consumer'),
         name = 'cmd2')
 
-w_top = WList(
+w_top = List(
     Word('alter', w_alter),
     Word('connect', w_connect),
     Word('create', w_create),
@@ -387,6 +408,7 @@ class AdminConsole:
     initial_connstr = None
 
     rc_hosts = re.compile('\s+')
+
     def get_queue_list(self):
         q = "select queue_name from pgq.queue order by 1"
         return self._ccache('queue_list', q, 'pgq')
@@ -458,7 +480,7 @@ class AdminConsole:
                             clist.append(h)
                 clist.sort()
                 self.comp_cache['host_list'] = clist
-            except:
+            except IOError:
                 clist = []
         return clist
 
@@ -526,6 +548,7 @@ class AdminConsole:
         curs = db.cursor()
         curs.execute(q)
         res = curs.fetchone()
+        print 'Server version', res[1]
         self.cur_database = res[0]
         return db
 
@@ -551,7 +574,11 @@ class AdminConsole:
         if self.cmd_file:
             cmd_str = open(self.cmd_file, "r").read()
 
-        self.db = self.db_connect(self.initial_connstr)
+        try:
+            self.db = self.db_connect(self.initial_connstr)
+        except psycopg2.Error, d:
+            print str(d).strip()
+            sys.exit(1)
 
         if cmd_str:
             self.exec_string(cmd_str)
@@ -562,6 +589,7 @@ class AdminConsole:
         readline.parse_and_bind('tab: complete')
         readline.set_completer(self.rl_completer_safe)
         #print 'delims: ', repr(readline.get_completer_delims())
+
         hist_file = os.path.expanduser("~/.qadmin_history")
         try:
             readline.read_history_file(hist_file)
@@ -572,7 +600,6 @@ class AdminConsole:
         while 1:
             try:
                 ln = self.line_input()
-                #print 'line:', repr(ln)
                 self.exec_string(ln)
             except KeyboardInterrupt:
                 print
@@ -580,7 +607,11 @@ class AdminConsole:
                 print
                 break
             self.reset_comp_cache()
-        readline.write_history_file(hist_file)
+
+        try:
+            readline.write_history_file(hist_file)
+        except IOError:
+            pass
 
     def rl_completer(self, curword, state):
         curline = readline.get_line_buffer()
@@ -686,7 +717,7 @@ class AdminConsole:
         print 'unimplemented command'
 
     def cmd_connect(self, params):
-        qname = params.get('queue')
+        qname = params.get('queue', self.cur_queue)
 
         if 'node' in params and not qname:
             print 'node= needs a queue also'
@@ -731,7 +762,7 @@ class AdminConsole:
         # set default queue
         if 'queue' in params:
             self.cur_queue = qname
-        
+
         print "CONNECT"
 
     def cmd_install(self, params):
@@ -775,6 +806,8 @@ class AdminConsole:
             "queue_ticker_idle_period as idle_period",
             "queue_ticker_paused as paused",
             "ticker_lag",
+            "ev_per_sec",
+            "ev_new",
         ]
         pfx = "select " + ",".join(fields)
 
@@ -785,20 +818,12 @@ class AdminConsole:
             q = pfx + " from pgq.get_queue_info(%s)"
             curs.execute(q, [queue])
 
-        display_result(curs, 'Queues')
+        display_result(curs, 'Queue: %s' % queue)
 
     def cmd_show_consumer(self, params):
         """Show consumer status"""
-        consumer = params.get('consumer')
-        queue = params.get('queue')
-
-        if not queue:
-            # queue not provided, see if we have default.
-            queue = self.cur_queue
-
-        if not consumer:
-            print 'Consumer not specified'
-            return
+        consumer = params.get('consumer', '*')
+        queue = params.get('queue', '*')
 
         q_queue = (queue != '*' and queue or None)
         q_consumer = (consumer != '*' and consumer or None)
@@ -807,36 +832,29 @@ class AdminConsole:
         q = "select * from pgq.get_consumer_info(%s, %s)"
         curs.execute(q, [q_queue, q_consumer])
 
-        display_result(curs, 'Consumers')
+        display_result(curs, 'Consumer "%s" on queue "%s"' % (consumer, queue))
 
     def cmd_show_node(self, params):
         """Show node information."""
 
-        # TODO: This should additionally show node roles, lags and hierarchy. 
+        # TODO: This should additionally show node roles, lags and hierarchy.
         # Similar to londiste "status".
 
-        node = params.get('node')
-        queue = params.get('queue')
-
-        if not queue:
-            # queue not provided, see if we have default.
-            queue = self.cur_queue
-
-        if not node:
-            print 'Node not specified'
-            return
+        node = params.get('node', '*')
+        queue = params.get('queue', '*')
 
         q_queue = (queue != '*' and queue or None)
         q_node = (node != '*' and node or None)
 
         curs = self.db.cursor()
-        q = """select node_name, node_name, node_location, dead, queue_name
+        q = """select queue_name, node_name, node_location, dead
                from pgq_node.node_location
                where node_name = coalesce(%s, node_name)
-                     and queue_name = coalesce(%s, queue_name)"""
+                     and queue_name = coalesce(%s, queue_name)
+               order by 1,2"""
         curs.execute(q, [q_node, q_queue])
 
-        display_result(curs, 'Database nodes')
+        display_result(curs, 'Node "%s" on queue "%s"' % (node, queue))
 
     def cmd_show_batch(self, params):
         batch_id = params.get('batch_id')
@@ -960,33 +978,9 @@ class AdminConsole:
 
 def main():
     global script
-
     script = AdminConsole()
     script.run(sys.argv[1:])
 
-def test(pfx, curword):
-    global script
-    params = {}
-    script = AdminConsole()
-    sgs = script.find_suggestions(pfx, curword, params)
-    #print repr(pfx), repr(curword), repr(sgs), repr(params)
-
-def sgtest():
-    global script
-    script = AdminConsole()
-    test('', '')
-    test('', 'se')
-    test('', 'cr')
-    test('set ', '')
-    test('set ', 'q')
-    test('set queue = blah;', '')
-    test('set queue = ', '')
-
-    script.exec_string('create queue foo;')
-    script.exec_string('create queue "foo";')
-    script.exec_string('create queue \'foo\';')
-
 if __name__ == '__main__':
-    #sgtest()
     main()