From 76669ea5e38a633a6632c2f7c2c2cc51eb127361 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Mon, 9 Mar 2009 18:17:05 +0200 Subject: [PATCH] experimental newadm code --- doc/Makefile | 2 +- doc/newadm.txt | 92 +++++++ python/newadm.py | 657 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 750 insertions(+), 1 deletion(-) create mode 100644 doc/newadm.txt create mode 100755 python/newadm.py diff --git a/doc/Makefile b/doc/Makefile index 2d03da9f..a2578c7e 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -11,7 +11,7 @@ EPYARGS = --no-private --url="http://pgfoundry.org/projects/skytools/" \ HTMLS = londiste.cmdline.html londiste.config.html README.html INSTALL.html \ londiste.ref.html TODO.html pgq-sql.html pgq-admin.html pgq-nodupes.html \ - $(SCRIPT_HTMLS) faq.html set.notes.html + $(SCRIPT_HTMLS) faq.html set.notes.html newadm.html SCRIPT_TXTS = walmgr.txt cube_dispatcher.txt table_dispatcher.txt \ queue_mover.txt queue_splitter.txt bulk_loader.txt \ diff --git a/doc/newadm.txt b/doc/newadm.txt new file mode 100644 index 00000000..8ee1e263 --- /dev/null +++ b/doc/newadm.txt @@ -0,0 +1,92 @@ += newadm = + +== Goal == + +Easy to use admin console to examine and administer PgQ queues. +Main feature is psql like tab-completion for everything (queue/consumer names). + +== Current direction == + +* Cmdline switches similar to psql. +* No config file. (?) +* SQL-like syntax (potential to accept full SQL). +* Target is console usage, not scripts. +* Info about both plain and cascaded queues. +* Administer only plain queues. + +== Potential future directions == + + * SQL Scripts. + - Full SQL + - Variables? + - Logic? + * Admin scripts for cascaded queues. Eg. implement current "switchover", "change-provider" + commands with it. + - Needs variables and multi-node support (execute on this node). + Maybe look at the paralled exec features of psql. + * Admin scripts for something else. + - data maintainer? + - release_script? + +== Command line == + + + -U:: username + -h:: host + -p:: port + -d:: database + + --help:: help + --version:: version + +== Internal language == + +All the commands assume they are connected to a database. + +Raw connect, maybe set default queue: + + CONNECT dbname=.. host=.. port=.. user=.. password=.. queue=..; + +Set default queue: + + CONNECT queue=..; + +Connect to node on default queue: + + CONNECT node=..; + +Plain queue modification: + + CREATE QUEUE [params?]; + ALTER QUEUE SET ..; + REGISTER CONSUMER ON QUEUE [AT ]; + UNREGISTER CONSUMER FROM QUEUE ; + DROP QUEUE ; + +Install code: + + INSTALL pgq; -- txid, pgq, pgq_ext, pgq_node + INSTALL londiste; -- all of above + londiste + +Information: + + SHOW QUEUE ( | *); + SHOW CONSUMER ( | *) [ ON | ) [ON ?]; + SHOW BATCH EVENTS ( | ) [ON ?]; + +== Smaller Open Questions == + +* What arguments should newadm accept from command line? + + psql-style: newadm [switches] [dbname [username]] + ssh-style: newadm [switches] [command [args]] + +* Command style and syntax. + +* Multi-word vs. long words: + - SHOW BATCH EVENTS ; + - SHOW_BATCH_EVENTS ; + +* Default queue vs. queue name in commands. + diff --git a/python/newadm.py b/python/newadm.py new file mode 100755 index 00000000..00af5bf6 --- /dev/null +++ b/python/newadm.py @@ -0,0 +1,657 @@ +#! /usr/bin/env python + +"""New admin tool. + +connect dbname=.. host=.. service=.. queue=..; +connect queue=..; +connect queue=.. node=..; + +install pgq; +install londiste; + +--------------------- +show all queues | consumers; + +show queue ..; +show consumer ..; +show batch events <>; +show batch info <>; + +show_queue_info ; +show_queue_stats ; +show_consumer_batch ; +show_batch_info ; +show_batch_events [ev_id]; + +--------------------- +alter queue set param = , ...; + +create queue ; // db +register consumer foo; +unregister consumer foo +drop queue ; + +------------ +create node ; // +create node .; // + +add location ; // db, queue +---------------- + +""" + +__version__ = '0.1' + +cmdline_usage = '''\ +Usage: newadm [switches] + +Initial connection options: + -h host + -p port + -U user + -d dbname + +Command options: + -c cmd_string + -f execfile + +General options: + --help + --version +''' + +import sys, os, readline, skytools, getopt, re + +script = None + +IGNORE_HOSTS = { + 'ip6-allhosts': 1, + 'ip6-allnodes': 1, + 'ip6-allrouters': 1, + #'ip6-localhost': 1, + 'ip6-localnet': 1, + 'ip6-loopback': 1, + 'ip6-mcastprefix': 1, +} + +def unquote_any(self, s): + if s: + c = s[0] + if c == "'": + s = skytools.unquote_literal(c, stdstr = True) + elif c == '"': + s = skytools.unquote_ident(c) + # extquote? + return s + +class Node: + def __init__(self, **kwargs): + self.name = kwargs.get('name') + def get_next(self, typ, word, params): + return None + def get_completions(self, params): + return [] + +class Proxy(Node): + def set_real(self, node): + self.get_next = node.get_next + self.get_completions = node.get_completions + +class WList(Node): + c_append = ' ' + def __init__(self, *args, **kwargs): + Node.__init__(self, **kwargs) + self.wlist = args + + def get_wlist(self): + return self.wlist + + def get_next(self, typ, word, params): + cw = word.lower() + for w in self.get_wlist(): + if w.word == cw: + if self.name: + params[self.name] = cw + return w.next + return None + + def get_completions(self, params): + wlist = self.get_wlist() + comp_list = [] + for w in wlist: + comp_list += w.get_completions(params) + return comp_list + +class DynList(Node): + tk_type = ('ident',) + c_append = ' ' + def __init__(self, next, **kwargs): + Node.__init__(self, **kwargs) + self.next = next + + def get_wlist(self): + return [] + + def get_next(self, typ, word, params): + if typ not in self.tk_type: + return None + if self.name: + params[self.name] = word + return self.next + + def get_completions(self, params): + wlist = self.get_wlist() + comp_list = [w + self.c_append for w in wlist] + return comp_list + +class Queue(DynList): + def get_wlist(self): + return script.get_queue_list() + +class DBNode(DynList): + def get_wlist(self): + return script.get_node_list() + +class Database(DynList): + def get_wlist(self): + return script.get_database_list() + +class Host(DynList): + def get_wlist(self): + return script.get_host_list() + +class User(DynList): + def get_wlist(self): + return script.get_user_list() + +class Port(DynList): + tk_type = ("num",) + def get_wlist(self): + return ['5432', '6432'] + +class Word(Node): + tk_type = ("ident",) + c_append = ' ' + def __init__(self, word, next, **kwargs): + Node.__init__(self, **kwargs) + self.word = word + self.next = next + def get_next(self, typ, word, params): + if typ in self.tk_type and word == self.word: + return self.next + return None + def get_completions(self, params): + return [self.word + self.c_append] + +class SWord(Word): + c_append = '=' + +class Symbol(Word): + tk_type = ("sym",) + c_append = '' + +class EQ(Symbol): + def __init__(self, next): + Symbol.__init__(self, '=', next) + +class Value(Node): + tk_type = ("str", "num", "ident") + def __init__(self, next, **kwargs): + Node.__init__(self, **kwargs) + self.next = next + def get_next(self, typ, word, params): + if typ not in self.tk_type: + return None + if self.name: + params[self.name] = word + return self.next + def get_completions(self, params): + return [] + +## +## Now describe the syntax. +## + +top_level = Proxy() + +w_done = Symbol(';', top_level) + +eq_val = Symbol('=', Value(w_done, name = 'value')) + +w_connect = Proxy() +w_connect.set_real( + WList( + SWord('dbname', EQ(Database(w_connect, name = 'dbname'))), + SWord('host', EQ(Host(w_connect, name = 'host'))), + SWord('port', EQ(Port(w_connect, name = 'port'))), + SWord('user', EQ(User(w_connect, name = 'user'))), + SWord('password', EQ(Value(w_connect, name = 'password'))), + SWord('queue', EQ(Queue(w_connect, name = 'queue'))), + SWord('node', EQ(DBNode(w_connect, name = 'node'))), + w_done)) + +w_set = WList( + SWord('queue', EQ(Queue(w_done, name = 'value'))), + SWord('consumer', EQ(Value(w_done, name = 'value'))), + name = "param") + +w_show = WList( + Word('queues', w_done), + Word('databases', w_done), + Word('consumers', w_done), + Word('stats', w_done), + name = "show") + +w_install = WList( + Word('pgq', w_done), + Word('londiste', w_done), + name = 'module') + +w_create = Word('queue', Value(w_done, name = 'queue')) + +w_top = WList( + Word('connect', w_connect), + Word('create', w_create), + Word('install', w_install), + Word('set', w_set), + Word('show', w_show), + name = "cmd") + +top_level.set_real(w_top) + +## +## Main class for keeping the state. +## + +class AdminConsole: + cur_queue = None + cur_database = None + + cmd_file = None + cmd_str = None + + comp_cache = { + 'comp_pfx': None, + 'comp_list': None, + 'queue_list': None, + 'database_list': None, + 'consumer_list': None, + 'host_list': None, + 'user_list': None, + } + db = None + + rc_hosts = re.compile('\s+') + def get_queue_list(self): + q = "select queue_name from pgq.queue order by 1" + return self._ccache('queue_list', q, 'pgq') + + def get_database_list(self): + q = "select datname from pg_catalog.pg_database order by 1" + return self._ccache('database_list', q) + + def get_user_list(self): + q = "select usename from pg_catalog.pg_user order by 1" + return self._ccache('user_list', q) + + def get_consumer_list(self): + q = "select co_name from pgq.consumer order by 1" + return self._ccache('consumer_list', q, 'pgq') + + def get_node_list(self): + q = "select distinct node_name from pgq_node.node_location order by 1" + return self._ccache('node_list', q, 'pgq_node') + + def _ccache(self, cname, q, req_schema = None): + if not self.db: + return [] + + # check if schema exists + if req_schema: + k = "schema_exists_%s" % req_schema + ok = self.comp_cache.get(k) + if ok is None: + curs = self.db.cursor() + ok = skytools.exists_schema(curs, req_schema) + self.comp_cache[k] = ok + if not ok: + return [] + + # actual completion + clist = self.comp_cache.get(cname) + if clist is None: + curs = self.db.cursor() + curs.execute(q) + clist = [r[0] for r in curs.fetchall()] + self.comp_cache[cname] = clist + return clist + + def get_host_list(self): + clist = self.comp_cache.get('host_list') + if clist is None: + try: + f = open('/etc/hosts', 'r') + clist = [] + while 1: + ln = f.readline() + if not ln: + break + ln = ln.strip() + if ln == '' or ln[0] == '#': + continue + lst = self.rc_hosts.split(ln) + for h in lst[1:]: + if h not in IGNORE_HOSTS: + clist.append(h) + clist.sort() + self.comp_cache['host_list'] = clist + except: + clist = [] + return clist + + def parse_cmdline(self, argv): + switches = "c:h:p:d:U:f:" + lswitches = ['help', 'version'] + try: + opts, args = getopt.getopt(argv, switches, lswitches) + except getopt.GetoptError, ex: + print str(ex) + print "Use --help to see command line options" + sys.exit(1) + + cstr_map = { + 'dbname': None, + 'host': None, + 'port': None, + 'user': None, + 'password': None, + } + cmd_file = cmd_str = None + for o, a in opts: + if o == "--help": + print cmdline_usage + sys.exit(0) + elif o == "--version": + print "newadm version %s" % __version__ + sys.exit(0) + elif o == "-h": + cstr_map['host'] = a + elif o == "-p": + cstr_map['port'] = a + elif o == "-d": + cstr_map['dbname'] = a + elif o == "-U": + cstr_map['user'] = a + elif o == "-c": + self.cmd_str = a + elif o == "-f": + self.cmd_file = a + + cstr_list = [] + for k, v in cstr_map.items(): + if v is not None: + cstr_list.append("%s=%s" % (k, v)) + if len(args) == 1: + a = args[0] + if a.find('=') >= 0: + cstr_list.append(a) + else: + cstr_list.append("dbname=%s" % a) + elif len(args) > 1: + print "too many arguments, use --help to see syntax" + sys.exit(1) + + self.initial_connstr = " ".join(cstr_list) + + def db_connect(self, connstr): + db = skytools.connect_database(connstr) + db.set_isolation_level(0) # autocommit + + q = "select current_database(), current_setting('server_version')" + curs = db.cursor() + curs.execute(q) + res = curs.fetchone() + self.cur_database = res[0] + return db + + #print res + #print dir(self.db) + #print dir(self.db.cursor()) + #print self.db.status + #print "connected to", repr(self.initial_connstr) + + + def run(self, argv): + self.parse_cmdline(argv) + + if self.cmd_file is not None and self.cmd_str is not None: + print "cannot handle -c and -f together" + sys.exit(1) + + cmd_str = self.cmd_str + if self.cmd_file: + cmd_str = open(self.cmd_file, "r").read() + + self.db = self.db_connect(self.initial_connstr) + + if cmd_str: + self.exec_string(cmd_str) + else: + self.main_loop() + + def main_loop(self): + readline.parse_and_bind('tab: complete') + readline.set_completer(self.rl_completer_safe) + #print 'delims: ', repr(readline.get_completer_delims()) + hist_file = os.path.expanduser("~/.newadm_history") + try: + readline.read_history_file(hist_file) + except IOError: + pass + + while 1: + try: + ln = self.line_input() + #print 'line:', repr(ln) + self.exec_string(ln) + except KeyboardInterrupt: + print + except EOFError: + print + break + self.reset_comp_cache() + readline.write_history_file(hist_file) + + def rl_completer(self, curword, state): + curline = readline.get_line_buffer() + start = readline.get_begidx() + end = readline.get_endidx() + + pfx = curline[:start] + sglist = self.find_suggestions(pfx, curword) + if state < len(sglist): + return sglist[state] + return None + + def rl_completer_safe(self, curword, state): + try: + return self.rl_completer(curword, state) + except BaseException, det: + print 'got some error', str(det) + + def line_input(self): + qname = "(noqueue)" + if self.cur_queue: + qname = self.cur_queue + p = "%s@%s> " % (qname, self.cur_database) + return raw_input(p) + + def sql_words(self, sql): + return skytools.sql_tokenizer(sql, + standard_quoting = True, + ignore_whitespace = True) + + def reset_comp_cache(self): + self.comp_cache = {} + + def find_suggestions_real(self, pfx, params): + # find level + node = top_level + for typ, w in self.sql_words(pfx): + w = w.lower() + node = node.get_next(typ, w, params) + if not node: + break + + # find possible matches + if node: + return node.get_completions(params) + else: + return [] + + def find_suggestions(self, pfx, curword, params = {}): + c_pfx = self.comp_cache.get('comp_pfx') + c_list = self.comp_cache.get('comp_list', []) + if c_pfx != pfx: + c_list = self.find_suggestions_real(pfx, params) + self.comp_cache['comp_pfx'] = pfx + self.comp_cache['comp_list'] = c_list + + wlen = len(curword) + res = [] + for cword in c_list: + if curword == cword[:wlen]: + res.append(cword) + return res + + def exec_string(self, ln, eof = False): + node = top_level + params = {} + for typ, w in self.sql_words(ln): + w = w.lower() + #print repr(typ), repr(w) + if typ == 'error': + print 'syntax error:', repr(ln) + return + node = node.get_next(typ, w, params) + if not node: + print "syntax error:", repr(ln) + return + if node == top_level: + self.exec_params(params) + params = {} + if eof: + if params: + self.exec_params(params) + elif node != top_level: + print "multi-line commands not supported:", repr(ln) + + def exec_params(self, params): + cmd = params.get('cmd') + if not cmd: + print 'parse error: no command found' + return + #print 'RUN', repr(params) + fn = getattr(self, 'cmd_' + cmd, self.bad_cmd) + fn(params) + + def bad_cmd(self, params): + print 'unimplemented command' + + def cmd_connect(self, params): + qname = params.get('queue') + if not qname: + qname = self.cur_queue + if 'node' in params and not qname: + print 'node= needs a queue also' + return + + # load raw connection params + cdata = [] + for k in ('dbname', 'host', 'port', 'user', 'password'): + if k in params: + arg = "%s=%s" % (k, params[k]) + cdata.append(arg) + + # raw connect + if cdata: + if 'node' in params: + print 'node= cannot be used together with raw params' + return + cstr = " ".join(cdata) + self.db = self.db_connect(cstr) + + # connect to node + if 'node' in params: + curs = self.db.cursor() + q = "select node_location from pgq_node.get_queue_locations(%s)"\ + " where node_name = %s" + curs.execute(q, [qname, params['node']]) + res = curs.fetchall() + if len(res) == 0: + print "node not found" + return + cstr = res[0]['node_location'] + self.db = self.db_connect(cstr) + + # set default queue + if 'queue' in params: + self.cur_queue = qname + print 'queue=', qname + + def cmd_install(self, params): + pgq_objs = [ + skytools.DBLanguage("plpgsql"), + skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"), + skytools.DBSchema("pgq", sql_file="pgq.sql"), + skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"), + skytools.DBSchema("pgq_node", sql_file="pgq_node.sql"), + ] + londiste_objs = pgq_objs + [ + skytools.DBSchema("londiste", sql_file="londiste.sql"), + ] + mod_map = { + 'londiste': londiste_objs, + 'pgq': pgq_objs, + } + mod_name = params['module'] + objs = mod_map[mod_name] + if not self.db: + print "no db?" + return + curs = self.db.cursor() + skytools.db_install(curs, objs, None) + print "%s installed" % mod_name + +def main(): + global script + + script = AdminConsole() + script.run(sys.argv[1:]) + +def test(pfx, curword): + global script + params = {} + script = AdminConsole() + sgs = script.find_suggestions(pfx, curword, params) + #print repr(pfx), repr(curword), repr(sgs), repr(params) + +def sgtest(): + global script + script = AdminConsole() + test('', '') + test('', 'se') + test('', 'cr') + test('set ', '') + test('set ', 'q') + test('set queue = blah;', '') + test('set queue = ', '') + + script.exec_string('create queue foo;') + script.exec_string('create queue "foo";') + script.exec_string('create queue \'foo\';') + +if __name__ == '__main__': + #sgtest() + main() + -- 2.39.5