qadmin: londiste commands
authorMarko Kreen <markokr@gmail.com>
Thu, 25 Nov 2010 09:53:17 +0000 (11:53 +0200)
committerMarko Kreen <markokr@gmail.com>
Thu, 25 Nov 2010 09:53:17 +0000 (11:53 +0200)
add_table, remove_table, tables,
add_seq, remove_seq, seqs,
missing

python/qadmin.py

index bdc47cc22c25ec1bb3b4e68eefefaaca4c1dd245..a31e2a7568ce8c4bb8d093e5b8317d331daac974 100755 (executable)
@@ -22,6 +22,16 @@ Following commands expect default queue:
     show batch <batch_id>;
     show batch <consumer>;
 
+Londiste commands:
+
+    londiste add_table <tbl>;
+    londiste remove_table <tbl>;
+    londiste add_seq <seq>;
+    londiste remove_seq <seq>;
+    londiste tables;
+    londiste seqs;
+    londiste missing;
+
 Other commands:
 
     exit;   (or press ^D)
@@ -240,6 +250,22 @@ class User(DynList):
     def get_wlist(self):
         return script.get_user_list()
 
+class NewTable(DynList):
+    def get_wlist(self):
+        return script.get_new_table_list()
+
+class KnownTable(DynList):
+    def get_wlist(self):
+        return script.get_known_table_list()
+
+class NewSeq(DynList):
+    def get_wlist(self):
+        return script.get_new_seq_list()
+
+class KnownSeq(DynList):
+    def get_wlist(self):
+        return script.get_known_seq_list()
+
 class BatchId(DynList):
     tk_type = ("num",)
     def get_wlist(self):
@@ -375,6 +401,21 @@ w_cons_from_queue = Word('consumer',
         Consumer(List(w_done, w_from_queue), name = 'consumer'),
         name = 'cmd2')
 
+w_londiste_add_table = NewTable(w_done, name = 'table')
+w_londiste_add_seq = NewSeq(w_done, name = 'seq')
+w_londiste_remove_table = KnownTable(w_done, name = 'table')
+w_londiste_remove_seq = KnownSeq(w_done, name = 'seq')
+
+w_londiste = List(
+    Word('add_table', w_londiste_add_table),
+    Word('add_seq', w_londiste_add_seq),
+    Word('remove_table', w_londiste_remove_table),
+    Word('remove_seq', w_londiste_remove_seq),
+    Word('missing', w_done),
+    Word('tables', w_done),
+    Word('seqs', w_done),
+    name = "cmd2")
+
 w_top = List(
     Word('alter', w_alter),
     Word('connect', w_connect),
@@ -385,6 +426,7 @@ w_top = List(
     Word('unregister', w_cons_from_queue),
     Word('show', w_show),
     Word('exit', w_done),
+    Word('londiste', w_londiste),
     name = "cmd")
 
 top_level.set_real(w_top)
@@ -434,6 +476,52 @@ class AdminConsole:
         q = "select distinct node_name from pgq_node.node_location order by 1"
         return self._ccache('node_list', q, 'pgq_node')
 
+    def get_new_table_list(self):
+        if not self.cur_queue:
+            return []
+        qname = skytools.quote_literal(self.cur_queue)
+        q = """select n.nspname || '.' || r.relname
+            from pg_catalog.pg_class r
+            join pg_catalog.pg_namespace n on (n.oid = r.relnamespace)
+            left join londiste.table_info i on (i.queue_name = %s and i.table_name = (n.nspname || '.' || r.relname))
+            where r.relkind='r'
+              and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext')
+              and n.nspname !~ 'pg_.*'
+              and i.table_name is null
+            order by 1 """ % qname
+        return self._ccache('new_table_list', q, 'londiste')
+
+    def get_known_table_list(self):
+        if not self.cur_queue:
+            return []
+        qname = skytools.quote_literal(self.cur_queue)
+        q = "select table_name from londiste.table_info"\
+            " where queue_name = %s order by 1" % qname
+        return self._ccache('known_table_list', q, 'londiste')
+
+    def get_new_seq_list(self):
+        if not self.cur_queue:
+            return []
+        qname = skytools.quote_literal(self.cur_queue)
+        q = """select n.nspname || '.' || r.relname
+            from pg_catalog.pg_class r
+            join pg_catalog.pg_namespace n on (n.oid = r.relnamespace)
+            left join londiste.seq_info i on (i.queue_name = %s and i.seq_name = (n.nspname || '.' || r.relname))
+            where r.relkind='S'
+              and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext')
+              and n.nspname !~ 'pg_.*'
+              and i.seq_name is null
+            order by 1 """ % qname
+        return self._ccache('new_seq_list', q, 'londiste')
+
+    def get_known_seq_list(self):
+        if not self.cur_queue:
+            return []
+        qname = skytools.quote_literal(self.cur_queue)
+        q = "select seq_name from londiste.seq_info"\
+            " where queue_name = %s order by 1" % qname
+        return self._ccache('known_seq_list', q, 'londiste')
+
     def get_batch_list(self):
         if not self.cur_queue:
             return []
@@ -645,6 +733,7 @@ class AdminConsole:
     def sql_words(self, sql):
         return skytools.sql_tokenizer(sql,
                 standard_quoting = True,
+                fqident = True,
                 ignore_whitespace = True)
 
     def reset_comp_cache(self):
@@ -687,11 +776,12 @@ class AdminConsole:
             w = w.lower()
             #print repr(typ), repr(w)
             if typ == 'error':
-                print 'syntax error:', repr(ln)
+                print 'syntax error 1:', repr(ln)
                 return
+            onode = node
             node = node.get_next(typ, w, params)
             if not node:
-                print "syntax error:", repr(ln)
+                print "syntax error 2:", repr(ln), repr(typ), repr(w), repr(params)
                 return
             if node == top_level:
                 self.exec_params(params)
@@ -717,6 +807,7 @@ class AdminConsole:
             fn(params)
         except Exception, ex:
             print "ERROR: %s" % str(ex).strip()
+            raise
 
     def bad_cmd(self, params):
         print 'unimplemented command'
@@ -984,6 +1075,83 @@ class AdminConsole:
     def cmd_exit(self, params):
         sys.exit(0)
 
+    ##
+    ## Londiste
+    ##
+
+    def cmd_londiste_missing(self, params):
+        """Show missing objects."""
+
+        queue = self.cur_queue
+
+        curs = self.db.cursor()
+        q = """select * from londiste.local_show_missing(%s)"""
+        curs.execute(q, [queue])
+
+        display_result(curs, 'Missing objects queue "%s"' % (queue))
+
+    def cmd_londiste_tables(self, params):
+        """Show local tables."""
+
+        queue = self.cur_queue
+
+        curs = self.db.cursor()
+        q = """select * from londiste.get_table_list(%s) where local"""
+        curs.execute(q, [queue])
+
+        display_result(curs, 'Local tables on queue "%s"' % (queue))
+
+    def cmd_londiste_seqs(self, params):
+        """Show local seqs."""
+
+        queue = self.cur_queue
+
+        curs = self.db.cursor()
+        q = """select * from londiste.get_seq_list(%s) where local"""
+        curs.execute(q, [queue])
+
+        display_result(curs, 'Sequences on queue "%s"' % (queue))
+
+    def cmd_londiste_add_table(self, params):
+        """Add table."""
+
+        curs = self.db.cursor()
+        q = """select * from londiste.local_add_table(%s, %s)"""
+        curs.execute(q, [self.cur_queue, params['table']])
+
+        res = curs.fetchone()
+        print 'ADD_TABLE:', res[0], res[1]
+
+    def cmd_londiste_remove_table(self, params):
+        """Remove table."""
+
+        curs = self.db.cursor()
+        q = """select * from londiste.local_remove_table(%s, %s)"""
+        curs.execute(q, [self.cur_queue, params['table']])
+
+        res = curs.fetchone()
+        print 'REMOVE_TABLE:', res[0], res[1]
+
+    def cmd_londiste_add_seq(self, params):
+        """Add seq."""
+
+        curs = self.db.cursor()
+        q = """select * from londiste.local_add_seq(%s, %s)"""
+        curs.execute(q, [self.cur_queue, params['seq']])
+
+        res = curs.fetchone()
+        print 'ADD_SEQ:', res[0], res[1]
+
+    def cmd_londiste_remove_seq(self, params):
+        """Remove seq."""
+
+        curs = self.db.cursor()
+        q = """select * from londiste.local_remove_seq(%s, %s)"""
+        curs.execute(q, [self.cur_queue, params['seq']])
+
+        res = curs.fetchone()
+        print 'REMOVE_SEQ:', res[0], res[1]
+
 def main():
     global script
     script = AdminConsole()