experimental newadm code
authorMarko Kreen <markokr@gmail.com>
Mon, 9 Mar 2009 16:17:05 +0000 (18:17 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 10 Mar 2009 12:10:08 +0000 (14:10 +0200)
doc/Makefile
doc/newadm.txt [new file with mode: 0644]
python/newadm.py [new file with mode: 0755]

index 2d03da9f87a8ffefaf17409d3928590a9134db4d..a2578c7e500bc1bed396d0eaf161a7ceb5093914 100644 (file)
@@ -11,7 +11,7 @@ EPYARGS = --no-private --url="http://pgfoundry.org/projects/skytools/" \
 
 HTMLS = londiste.cmdline.html londiste.config.html README.html INSTALL.html \
        londiste.ref.html TODO.html pgq-sql.html pgq-admin.html pgq-nodupes.html \
-       $(SCRIPT_HTMLS) faq.html set.notes.html
+       $(SCRIPT_HTMLS) faq.html set.notes.html newadm.html
 
 SCRIPT_TXTS = walmgr.txt cube_dispatcher.txt table_dispatcher.txt \
              queue_mover.txt queue_splitter.txt bulk_loader.txt \
diff --git a/doc/newadm.txt b/doc/newadm.txt
new file mode 100644 (file)
index 0000000..8ee1e26
--- /dev/null
@@ -0,0 +1,92 @@
+= newadm =
+
+== Goal ==
+
+Easy to use admin console to examine and administer PgQ queues.
+Main feature is psql like tab-completion for everything (queue/consumer names).
+
+== Current direction ==
+
+* Cmdline switches similar to psql.
+* No config file. (?)
+* SQL-like syntax (potential to accept full SQL).
+* Target is console usage, not scripts.
+* Info about both plain and cascaded queues.
+* Administer only plain queues.
+
+== Potential future directions ==
+
+ * SQL Scripts.
+  - Full SQL
+  - Variables?
+  - Logic?
+ * Admin scripts for cascaded queues.  Eg. implement current "switchover", "change-provider"
+   commands with it.
+  - Needs variables and multi-node support (execute on this node).
+    Maybe look at the paralled exec features of psql.
+ * Admin scripts for something else.
+  - data maintainer?
+  - release_script?
+
+== Command line ==
+
+
+  -U:: username
+  -h:: host
+  -p:: port
+  -d:: database
+  --help:: help
+  --version:: version
+
+== Internal language ==
+
+All the commands assume they are connected to a database.
+
+Raw connect, maybe set default queue:
+
+  CONNECT dbname=.. host=.. port=.. user=.. password=.. queue=..;
+
+Set default queue:
+
+  CONNECT queue=..;
+
+Connect to node on default queue:
+
+  CONNECT node=..;
+
+Plain queue modification:
+
+  CREATE QUEUE <qname> [params?];
+  ALTER QUEUE <qname> SET ..;
+  REGISTER CONSUMER <cons> ON QUEUE <qname> [AT <tick_id>];
+  UNREGISTER CONSUMER <cons> FROM QUEUE <qname>;
+  DROP QUEUE <qname>;
+
+Install code:
+
+  INSTALL pgq;      -- txid, pgq, pgq_ext, pgq_node
+  INSTALL londiste; -- all of above + londiste
+
+Information:
+
+  SHOW QUEUE (<qname> | *);
+  SHOW CONSUMER (<cons> | *) [ ON <qname | * ];
+  SHOW BATCH INFO (<id> | <consumer>) [ON <queue>?];
+  SHOW BATCH EVENTS (<id> | <consumer>) [ON <queue>?];
+
+== Smaller Open Questions ==
+
+* What arguments should newadm accept from command line?
+
+  psql-style: newadm [switches] [dbname [username]]
+  ssh-style:  newadm [switches] [command [args]]
+
+* Command style and syntax.
+
+* Multi-word vs. long words:
+ - SHOW BATCH EVENTS <id>;
+ - SHOW_BATCH_EVENTS <id>;
+
+* Default queue vs. queue name in commands.
+
diff --git a/python/newadm.py b/python/newadm.py
new file mode 100755 (executable)
index 0000000..00af5bf
--- /dev/null
@@ -0,0 +1,657 @@
+#! /usr/bin/env python
+
+"""New admin tool.
+
+connect dbname=.. host=.. service=.. queue=..;
+connect queue=..;
+connect queue=.. node=..;
+
+install pgq;
+install londiste;
+
+---------------------
+show all queues | consumers;
+
+show queue ..;
+show consumer ..;
+show batch events <>;
+show batch info <>;
+
+show_queue_info <q>;
+show_queue_stats <q>;
+show_consumer_batch <cons>;
+show_batch_info <bid>;
+show_batch_events <bid> [ev_id];
+
+---------------------
+alter queue <qname> set param = , ...;
+
+create queue <qname>; // db
+register consumer foo;
+unregister consumer foo
+drop queue <q>;
+
+------------
+create node <foo>; // 
+create node <qname>.<foo>; // 
+
+add location <node> <loc>; // db, queue
+----------------
+
+"""
+
+__version__ = '0.1'
+
+cmdline_usage = '''\
+Usage: newadm [switches]
+
+Initial connection options:
+    -h host
+    -p port
+    -U user
+    -d dbname
+
+Command options:
+    -c cmd_string
+    -f execfile
+
+General options:
+    --help
+    --version
+'''
+
+import sys, os, readline, skytools, getopt, re
+
+script = None
+
+IGNORE_HOSTS = {
+    'ip6-allhosts': 1,
+    'ip6-allnodes': 1,
+    'ip6-allrouters': 1,
+    #'ip6-localhost': 1,
+    'ip6-localnet': 1,
+    'ip6-loopback': 1,
+    'ip6-mcastprefix': 1,
+}
+
+def unquote_any(self, s):
+    if s:
+        c = s[0]
+        if c == "'":
+            s = skytools.unquote_literal(c, stdstr = True)
+        elif c == '"':
+            s = skytools.unquote_ident(c)
+        # extquote?
+    return s
+
+class Node:
+    def __init__(self, **kwargs):
+        self.name = kwargs.get('name')
+    def get_next(self, typ, word, params):
+        return None
+    def get_completions(self, params):
+        return []
+
+class Proxy(Node):
+    def set_real(self, node):
+        self.get_next = node.get_next
+        self.get_completions = node.get_completions
+
+class WList(Node):
+    c_append = ' '
+    def __init__(self, *args, **kwargs):
+        Node.__init__(self, **kwargs)
+        self.wlist = args
+
+    def get_wlist(self):
+        return self.wlist
+
+    def get_next(self, typ, word, params):
+        cw = word.lower()
+        for w in self.get_wlist():
+            if w.word == cw:
+                if self.name:
+                    params[self.name] = cw
+                return w.next
+        return None
+
+    def get_completions(self, params):
+        wlist = self.get_wlist()
+        comp_list = []
+        for w in wlist:
+            comp_list += w.get_completions(params)
+        return comp_list
+
+class DynList(Node):
+    tk_type = ('ident',)
+    c_append = ' '
+    def __init__(self, next, **kwargs):
+        Node.__init__(self, **kwargs)
+        self.next = next
+
+    def get_wlist(self):
+        return []
+
+    def get_next(self, typ, word, params):
+        if typ not in self.tk_type:
+            return None
+        if self.name:
+            params[self.name] = word
+        return self.next
+
+    def get_completions(self, params):
+        wlist = self.get_wlist()
+        comp_list = [w + self.c_append for w in wlist]
+        return comp_list
+
+class Queue(DynList):
+    def get_wlist(self):
+        return script.get_queue_list()
+
+class DBNode(DynList):
+    def get_wlist(self):
+        return script.get_node_list()
+
+class Database(DynList):
+    def get_wlist(self):
+        return script.get_database_list()
+
+class Host(DynList):
+    def get_wlist(self):
+        return script.get_host_list()
+
+class User(DynList):
+    def get_wlist(self):
+        return script.get_user_list()
+
+class Port(DynList):
+    tk_type = ("num",)
+    def get_wlist(self):
+        return ['5432', '6432']
+
+class Word(Node):
+    tk_type = ("ident",)
+    c_append = ' '
+    def __init__(self, word, next, **kwargs):
+        Node.__init__(self, **kwargs)
+        self.word = word
+        self.next = next
+    def get_next(self, typ, word, params):
+        if typ in self.tk_type and word == self.word:
+            return self.next
+        return None
+    def get_completions(self, params):
+        return [self.word + self.c_append]
+
+class SWord(Word):
+    c_append = '='
+
+class Symbol(Word):
+    tk_type = ("sym",)
+    c_append = ''
+
+class EQ(Symbol):
+    def __init__(self, next):
+        Symbol.__init__(self, '=', next)
+
+class Value(Node):
+    tk_type = ("str", "num", "ident")
+    def __init__(self, next, **kwargs):
+        Node.__init__(self, **kwargs)
+        self.next = next
+    def get_next(self, typ, word, params):
+        if typ not in self.tk_type:
+            return None
+        if self.name:
+            params[self.name] = word
+        return self.next
+    def get_completions(self, params):
+        return []
+
+##
+##  Now describe the syntax.
+##
+
+top_level = Proxy()
+
+w_done = Symbol(';', top_level)
+
+eq_val = Symbol('=', Value(w_done, name = 'value'))
+
+w_connect = Proxy()
+w_connect.set_real(
+    WList(
+        SWord('dbname', EQ(Database(w_connect, name = 'dbname'))),
+        SWord('host', EQ(Host(w_connect, name = 'host'))),
+        SWord('port', EQ(Port(w_connect, name = 'port'))),
+        SWord('user', EQ(User(w_connect, name = 'user'))),
+        SWord('password', EQ(Value(w_connect, name = 'password'))),
+        SWord('queue', EQ(Queue(w_connect, name = 'queue'))),
+        SWord('node', EQ(DBNode(w_connect, name = 'node'))),
+        w_done))
+
+w_set = WList(
+    SWord('queue', EQ(Queue(w_done, name = 'value'))),
+    SWord('consumer', EQ(Value(w_done, name = 'value'))),
+    name = "param")
+
+w_show = WList(
+    Word('queues', w_done),
+    Word('databases', w_done),
+    Word('consumers', w_done),
+    Word('stats', w_done),
+    name = "show")
+
+w_install = WList(
+    Word('pgq', w_done),
+    Word('londiste', w_done),
+    name = 'module')
+
+w_create = Word('queue', Value(w_done, name = 'queue'))
+
+w_top = WList(
+    Word('connect', w_connect),
+    Word('create', w_create),
+    Word('install', w_install),
+    Word('set', w_set),
+    Word('show', w_show),
+    name = "cmd")
+
+top_level.set_real(w_top)
+
+##
+## Main class for keeping the state.
+##
+
+class AdminConsole:
+    cur_queue = None
+    cur_database = None
+
+    cmd_file = None
+    cmd_str = None
+
+    comp_cache = {
+        'comp_pfx': None,
+        'comp_list': None,
+        'queue_list': None,
+        'database_list': None,
+        'consumer_list': None,
+        'host_list': None,
+        'user_list': None,
+    }
+    db = None
+
+    rc_hosts = re.compile('\s+')
+    def get_queue_list(self):
+        q = "select queue_name from pgq.queue order by 1"
+        return self._ccache('queue_list', q, 'pgq')
+
+    def get_database_list(self):
+        q = "select datname from pg_catalog.pg_database order by 1"
+        return self._ccache('database_list', q)
+
+    def get_user_list(self):
+        q = "select usename from pg_catalog.pg_user order by 1"
+        return self._ccache('user_list', q)
+
+    def get_consumer_list(self):
+        q = "select co_name from pgq.consumer order by 1"
+        return self._ccache('consumer_list', q, 'pgq')
+
+    def get_node_list(self):
+        q = "select distinct node_name from pgq_node.node_location order by 1"
+        return self._ccache('node_list', q, 'pgq_node')
+
+    def _ccache(self, cname, q, req_schema = None):
+        if not self.db:
+            return []
+
+        # check if schema exists
+        if req_schema:
+            k = "schema_exists_%s" % req_schema
+            ok = self.comp_cache.get(k)
+            if ok is None:
+                curs = self.db.cursor()
+                ok = skytools.exists_schema(curs, req_schema)
+                self.comp_cache[k] = ok
+            if not ok:
+                return []
+
+        # actual completion
+        clist = self.comp_cache.get(cname)
+        if clist is None:
+            curs = self.db.cursor()
+            curs.execute(q)
+            clist = [r[0] for r in curs.fetchall()]
+            self.comp_cache[cname] = clist
+        return clist
+
+    def get_host_list(self):
+        clist = self.comp_cache.get('host_list')
+        if clist is None:
+            try:
+                f = open('/etc/hosts', 'r')
+                clist = []
+                while 1:
+                    ln = f.readline()
+                    if not ln:
+                        break
+                    ln = ln.strip()
+                    if ln == '' or ln[0] == '#':
+                        continue
+                    lst = self.rc_hosts.split(ln)
+                    for h in lst[1:]:
+                        if h not in IGNORE_HOSTS:
+                            clist.append(h)
+                clist.sort()
+                self.comp_cache['host_list'] = clist
+            except:
+                clist = []
+        return clist
+
+    def parse_cmdline(self, argv):
+        switches = "c:h:p:d:U:f:"
+        lswitches = ['help', 'version']
+        try:
+            opts, args = getopt.getopt(argv, switches, lswitches)
+        except getopt.GetoptError, ex:
+            print str(ex)
+            print "Use --help to see command line options"
+            sys.exit(1)
+
+        cstr_map = {
+            'dbname': None,
+            'host': None,
+            'port': None,
+            'user': None,
+            'password': None,
+        }
+        cmd_file = cmd_str = None
+        for o, a in opts:
+            if o == "--help":
+                print cmdline_usage
+                sys.exit(0)
+            elif o == "--version":
+                print "newadm version %s" % __version__
+                sys.exit(0)
+            elif o == "-h":
+                cstr_map['host'] = a
+            elif o == "-p":
+                cstr_map['port'] = a
+            elif o == "-d":
+                cstr_map['dbname'] = a
+            elif o == "-U":
+                cstr_map['user'] = a
+            elif o == "-c":
+                self.cmd_str = a
+            elif o == "-f":
+                self.cmd_file = a
+
+        cstr_list = []
+        for k, v in cstr_map.items():
+            if v is not None:
+                cstr_list.append("%s=%s" % (k, v))
+        if len(args) == 1:
+            a = args[0]
+            if a.find('=') >= 0:
+                cstr_list.append(a)
+            else:
+                cstr_list.append("dbname=%s" % a)
+        elif len(args) > 1:
+            print "too many arguments, use --help to see syntax"
+            sys.exit(1)
+
+        self.initial_connstr = " ".join(cstr_list)
+
+    def db_connect(self, connstr):
+        db = skytools.connect_database(connstr)
+        db.set_isolation_level(0) # autocommit
+
+        q = "select current_database(), current_setting('server_version')"
+        curs = db.cursor()
+        curs.execute(q)
+        res = curs.fetchone()
+        self.cur_database = res[0]
+        return db
+
+        #print res
+        #print dir(self.db)
+        #print dir(self.db.cursor())
+        #print self.db.status
+        #print "connected to", repr(self.initial_connstr)
+
+
+    def run(self, argv):
+        self.parse_cmdline(argv)
+
+        if self.cmd_file is not None and self.cmd_str is not None:
+            print "cannot handle -c and -f together"
+            sys.exit(1)
+
+        cmd_str = self.cmd_str
+        if self.cmd_file:
+            cmd_str = open(self.cmd_file, "r").read()
+
+        self.db = self.db_connect(self.initial_connstr)
+
+        if cmd_str:
+            self.exec_string(cmd_str)
+        else:
+            self.main_loop()
+
+    def main_loop(self):
+        readline.parse_and_bind('tab: complete')
+        readline.set_completer(self.rl_completer_safe)
+        #print 'delims: ', repr(readline.get_completer_delims())
+        hist_file = os.path.expanduser("~/.newadm_history")
+        try:
+            readline.read_history_file(hist_file)
+        except IOError:
+            pass
+
+        while 1:
+            try:
+                ln = self.line_input()
+                #print 'line:', repr(ln)
+                self.exec_string(ln)
+            except KeyboardInterrupt:
+                print
+            except EOFError:
+                print
+                break
+            self.reset_comp_cache()
+        readline.write_history_file(hist_file)
+
+    def rl_completer(self, curword, state):
+        curline = readline.get_line_buffer()
+        start = readline.get_begidx()
+        end = readline.get_endidx()
+
+        pfx = curline[:start]
+        sglist = self.find_suggestions(pfx, curword)
+        if state < len(sglist):
+            return sglist[state]
+        return None
+
+    def rl_completer_safe(self, curword, state):
+        try:
+            return self.rl_completer(curword, state)
+        except BaseException, det:
+            print 'got some error', str(det)
+
+    def line_input(self):
+        qname = "(noqueue)"
+        if self.cur_queue:
+            qname = self.cur_queue
+        p = "%s@%s> " % (qname, self.cur_database)
+        return raw_input(p)
+
+    def sql_words(self, sql):
+        return skytools.sql_tokenizer(sql,
+                standard_quoting = True,
+                ignore_whitespace = True)
+
+    def reset_comp_cache(self):
+        self.comp_cache = {}
+
+    def find_suggestions_real(self, pfx, params):
+        # find level
+        node = top_level
+        for typ, w in self.sql_words(pfx):
+            w = w.lower()
+            node = node.get_next(typ, w, params)
+            if not node:
+                break
+
+        # find possible matches
+        if node:
+            return node.get_completions(params)
+        else:
+            return []
+
+    def find_suggestions(self, pfx, curword, params = {}):
+        c_pfx = self.comp_cache.get('comp_pfx')
+        c_list = self.comp_cache.get('comp_list', [])
+        if c_pfx != pfx:
+            c_list = self.find_suggestions_real(pfx, params)
+            self.comp_cache['comp_pfx'] = pfx
+            self.comp_cache['comp_list'] = c_list
+
+        wlen = len(curword)
+        res = []
+        for cword in c_list:
+            if curword == cword[:wlen]:
+                res.append(cword)
+        return res
+
+    def exec_string(self, ln, eof = False):
+        node = top_level
+        params = {}
+        for typ, w in self.sql_words(ln):
+            w = w.lower()
+            #print repr(typ), repr(w)
+            if typ == 'error':
+                print 'syntax error:', repr(ln)
+                return
+            node = node.get_next(typ, w, params)
+            if not node:
+                print "syntax error:", repr(ln)
+                return
+            if node == top_level:
+                self.exec_params(params)
+                params = {}
+        if eof:
+            if params:
+                self.exec_params(params)
+        elif node != top_level:
+            print "multi-line commands not supported:", repr(ln)
+
+    def exec_params(self, params):
+        cmd = params.get('cmd')
+        if not cmd:
+            print 'parse error: no command found'
+            return
+        #print 'RUN', repr(params)
+        fn = getattr(self, 'cmd_' + cmd, self.bad_cmd)
+        fn(params)
+
+    def bad_cmd(self, params):
+        print 'unimplemented command'
+
+    def cmd_connect(self, params):
+        qname = params.get('queue')
+        if not qname:
+            qname = self.cur_queue
+        if 'node' in params and not qname:
+            print 'node= needs a queue also'
+            return
+
+        # load raw connection params
+        cdata = []
+        for k in ('dbname', 'host', 'port', 'user', 'password'):
+            if k in params:
+                arg = "%s=%s" % (k, params[k])
+                cdata.append(arg)
+
+        # raw connect
+        if cdata:
+            if 'node' in params:
+                print 'node= cannot be used together with raw params'
+                return
+            cstr = " ".join(cdata)
+            self.db = self.db_connect(cstr)
+
+        # connect to node
+        if 'node' in params:
+            curs = self.db.cursor()
+            q = "select node_location from pgq_node.get_queue_locations(%s)"\
+                " where node_name = %s"
+            curs.execute(q, [qname, params['node']])
+            res = curs.fetchall()
+            if len(res) == 0:
+                print "node not found"
+                return
+            cstr = res[0]['node_location']
+            self.db = self.db_connect(cstr)
+
+        # set default queue
+        if 'queue' in params:
+            self.cur_queue = qname
+            print 'queue=', qname
+
+    def cmd_install(self, params):
+        pgq_objs = [
+            skytools.DBLanguage("plpgsql"),
+            skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
+            skytools.DBSchema("pgq", sql_file="pgq.sql"),
+            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"),
+            skytools.DBSchema("pgq_node", sql_file="pgq_node.sql"),
+        ]
+        londiste_objs = pgq_objs + [
+            skytools.DBSchema("londiste", sql_file="londiste.sql"),
+        ]
+        mod_map = {
+            'londiste': londiste_objs,
+            'pgq': pgq_objs,
+        }
+        mod_name = params['module']
+        objs = mod_map[mod_name]
+        if not self.db:
+            print "no db?"
+            return
+        curs = self.db.cursor()
+        skytools.db_install(curs, objs, None)
+        print "%s installed" % mod_name
+
+def main():
+    global script
+
+    script = AdminConsole()
+    script.run(sys.argv[1:])
+
+def test(pfx, curword):
+    global script
+    params = {}
+    script = AdminConsole()
+    sgs = script.find_suggestions(pfx, curword, params)
+    #print repr(pfx), repr(curword), repr(sgs), repr(params)
+
+def sgtest():
+    global script
+    script = AdminConsole()
+    test('', '')
+    test('', 'se')
+    test('', 'cr')
+    test('set ', '')
+    test('set ', 'q')
+    test('set queue = blah;', '')
+    test('set queue = ', '')
+
+    script.exec_string('create queue foo;')
+    script.exec_string('create queue "foo";')
+    script.exec_string('create queue \'foo\';')
+
+if __name__ == '__main__':
+    #sgtest()
+    main()
+