new londiste wrapper
authorMarko Kreen <markokr@gmail.com>
Fri, 4 Apr 2008 09:02:08 +0000 (09:02 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 4 Apr 2008 09:02:08 +0000 (09:02 +0000)
python/londiste.py
python/pgq/setadmin.py [new file with mode: 0644]
python/setadm.py

index 32af4a50746a7823bd04919399a8c6dded5ad709..b19ed35b7efacfb68b27c94fccbc687827560c5e 100755 (executable)
@@ -3,7 +3,7 @@
 """Londiste launcher.
 """
 
-import sys, os, optparse, skytools
+import sys, os, optparse, skytools, pgq, pgq.setadmin
 
 # python 2.3 will try londiste.py first...
 import sys, os.path
@@ -11,82 +11,82 @@ if os.path.exists(os.path.join(sys.path[0], 'londiste.py')) \
     and not os.path.exists(os.path.join(sys.path[0], 'londiste')):
     del sys.path[0]
 
-from londiste import *
+import londiste
 
 command_usage = """
 %prog [options] INI CMD [subcmd args]
 
-commands:
-  replay                        replay events to subscriber
-
-  provider install              installs modules, creates queue
-  provider add TBL ...          add table to queue
-  provider remove TBL ...       remove table from queue
-  provider tables               show all tables on provider
-  provider add-seq SEQ ...      add sequence to provider
-  provider remove-seq SEQ ...   remove sequence from provider
-  provider seqs                 show all sequences on provider
-
-  subscriber install            installs schema
-  subscriber add TBL ...        add table to subscriber
-  subscriber remove TBL ...     remove table from subscriber
-  subscriber add-seq SEQ ...    add table to subscriber
-  subscriber remove-seq SEQ ... remove table from subscriber
-  subscriber tables             list tables subscriber has attached to
-  subscriber seqs               list sequences subscriber is interested
-  subscriber missing            list tables subscriber has not yet attached to
-  subscriber check              compare table structure on both sides
-  subscriber fkeys              print out fkey drop/create commands
-  subscriber resync TBL ...     do full copy again
-
-  compare [TBL ...]             compare table contents on both sides
-  repair [TBL ...]              repair data on subscriber
-
-  copy                          [internal command - copy table logic]
+Node Initialization:
+  init-root   NODE_NAME NODE_CONSTR
+  init-branch NODE_NAME NODE_CONSTR --provider=<constr>
+  init-leaf   NODE_NAME NODE_CONSTR --provider=<constr>
+    Initializes node.  Given connstr is kept as global connstring
+    for that node.  Those commands ignore node_db in .ini.
+    The --provider connstr is used only for initial set info
+    fetching, later actual provider's connect string is used.
+
+Node Administration:
+  members               Show members in set
+  tag-dead NODE ..      Tag node as dead
+  tag-alive NODE ..     Tag node as alive
+
+  redirect              Switch provider
+  make-root             Promote to root
+
+Replication Daemon:
+  worker                replay events to subscriber
+
+Replication Administration:
+  add TBL ...           add table to queue
+  remove TBL ...        remove table from queue
+  add-seq SEQ ...       add sequence to provider
+  remove-seq SEQ ...    remove sequence from provider
+  tables                show all tables on provider
+  seqs                  show all sequences on provider
+  missing               list tables subscriber has not yet attached to
+  resync TBL ...        do full copy again
+
+Replication Extra:
+  check                 compare table structure on both sides
+  fkeys                 print out fkey drop/create commands
+  compare [TBL ...]     compare table contents on both sides
+  repair [TBL ...]      repair data on subscriber
+
+Internal Commands:
+  copy                  copy table logic
 """
 
+class NodeSetup(pgq.setadmin.SetAdmin):
+    def __init__(self, args):
+        pgq.setadmin.SetAdmin.__init__(self, 'londiste', args)
+
+cmd_handlers = (
+    (('init-root', 'init-branch', 'init-leaf', 'members', 'tag-dead', 'tag-alive',
+      'redirect', 'promote-root'), NodeSetup),
+    (('worker', 'replay'), londiste.Replicator),
+    (('add', 'remove', 'add-seq', 'remove-seq', 'tables', 'seqs',
+      'missing', 'resync', 'check', 'fkeys'), londiste.LondisteSetup),
+    (('compare',), londiste.Comparator),
+    (('repair',), londiste.Repairer),
+)
+
 class Londiste(skytools.DBScript):
     def __init__(self, args):
         skytools.DBScript.__init__(self, 'londiste', args)
 
-        if self.options.rewind or self.options.reset:
-            self.script = Replicator(args)
-            return
-
         if len(self.args) < 2:
             print "need command"
             sys.exit(1)
         cmd = self.args[1]
-
-        if cmd =="provider":
-            script = ProviderSetup(args)
-        elif cmd == "subscriber":
-            script = SubscriberSetup(args)
-        elif cmd == "replay":
-            method = self.cf.get('method', 'direct')
-            if method == 'direct':
-                script = Replicator(args)
-            elif method == 'file_write':
-                script = FileWrite(args)
-            elif method == 'file_write':
-                script = FileWrite(args)
-            else:
-                print "unknown method, quitting"
-                sys.exit(1)
-        elif cmd == "copy":
-            script = CopyTable(args)
-        elif cmd == "compare":
-            script = Comparator(args)
-        elif cmd == "repair":
-            script = Repairer(args)
-        elif cmd == "upgrade":
-            script = UpgradeV2(args)
-        else:
+        self.script = None
+        for names, cls in cmd_handlers:
+            if cmd in names:
+                self.script = cls(args)
+                break
+        if not self.script:
             print "Unknown command '%s', use --help for help" % cmd
             sys.exit(1)
 
-        self.script = script
-
     def start(self):
         self.script.start()
 
@@ -103,10 +103,6 @@ class Londiste(skytools.DBScript):
                 help = "add: no copy needed", default=False)
         g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
                 help = "add: keep old data", default=False)
-        g.add_option("--rewind", action="store_true",
-                help = "replay: sync queue pos with subscriber")
-        g.add_option("--reset", action="store_true",
-                help = "replay: forget queue pos on subscriber")
         p.add_option_group(g)
 
         return p
diff --git a/python/pgq/setadmin.py b/python/pgq/setadmin.py
new file mode 100644 (file)
index 0000000..606c798
--- /dev/null
@@ -0,0 +1,208 @@
+#! /usr/bin/env python
+
+import sys, optparse, skytools
+
+from pgq.setconsumer import MemberInfo, NodeInfo
+
+class SetInfo:
+    def __init__(self, set_name, info_row, member_rows):
+        self.root_info = info_row
+        self.set_name = set_name
+        self.member_map = {}
+        self.root_name = info_row['node_name']
+        self.root_type = info_row['node_type']
+        self.global_watermark = info_row['global_watermark']
+
+        for r in member_rows:
+            n = MemberInfo(r)
+            self.member_map[n.name] = n
+
+    def get_member(self, name):
+        return self.member_map.get(name)
+
+command_usage = """
+%prog [options] INI CMD [subcmd args]
+
+commands:
+"""
+
+class SetAdmin(skytools.DBScript):
+    root_name = None
+    root_info = None
+    member_map = {}
+    set_name = None
+
+    def init_optparse(self, parser = None):
+        p = skytools.DBScript.init_optparse(self, parser)
+        p.set_usage(command_usage.strip())
+
+        g = optparse.OptionGroup(p, "actual setadm options")
+        g.add_option("--connstr", action="store_true",
+                     help = "add: ignore table differences, repair: ignore lag")
+        g.add_option("--provider",
+                     help = "add: ignore table differences, repair: ignore lag")
+        p.add_option_group(g)
+        return p
+
+    def work(self):
+        self.set_single_loop(1)
+
+        self.set_name = self.cf.get('set_name')
+
+        if self.is_cmd("init-root", 2):
+            self.init_node("root", self.args[2], self.args[3])
+        elif self.is_cmd("init-branch", 2):
+            self.init_node("branch", self.args[2], self.args[3])
+        elif self.is_cmd("init-leaf", 2):
+            self.init_node("leaf", self.args[2], self.args[3])
+        else:
+            self.log.info("need command")
+
+    def is_cmd(self, name, argcnt):
+        if len(self.args) < 2:
+            return False
+        if self.args[1] != name:
+            return False
+        if len(self.args) != argcnt + 2:
+            self.log.error("cmd %s needs %d args" % (name, argcnt))
+            sys.exit(1)
+        return True
+
+    def init_node(self, node_type, node_name, node_location):
+        # connect to database
+        db = self.get_database("new_node", connstr = node_location)
+
+        # check if code is installed
+        self.install_code(db)
+
+        # query current status
+        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+        info = res[0]
+        if info['node_type'] is not None:
+            self.log.info("Node is already initialized as %s" % info['node_type'])
+            return
+        
+        worker_name = "%s_%s_worker" % (self.set_name, node_name)
+
+        # register member
+        if node_type in ('root', 'combined-root'):
+            global_watermark = None
+            combined_set = None
+            provider_name = None
+            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
+        else:
+            root_db = self.find_root_db()
+            set = self.load_root_info(root_db)
+
+            # check if member already exists
+            if set.get_member(node_name) is not None:
+                self.log.error("Node '%s' already exists" % node_name)
+                sys.exit(1)
+
+            global_watermark = set.global_watermark
+            combined_set = None
+            provider_name = self.options.provider
+
+            # register member on root
+            self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            root_db.commit()
+
+            # lookup provider
+            provider = set.get_member(provider_name)
+            if not provider:
+                self.log.error("Node %s does not exist" % provider_name)
+                sys.exit(1)
+
+            # register on provider
+            provider_db = self.get_database('provider_db', connstr = provider.location)
+            self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)",
+                          [self.set_name, node_name, worker_name])
+            provider_db.commit()
+
+            # initialize node itself
+            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, node_name, node_location])
+            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
+                          [self.set_name, provider_name, provider.location])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, worker_name, provider_name,
+                           global_watermark, combined_set])
+            db.commit()
+
+            
+
+
+        self.log.info("Done")
+
+    def find_root_db(self):
+        db = self.get_database('root_db')
+
+        while 1:
+            # query current status
+            res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+            info = res[0]
+            type = info['node_type']
+            if type is None:
+                self.log.info("Root node not initialized?")
+                sys.exit(1)
+
+            # configured db may not be root anymore, walk upwards then
+            if type in ('root', 'combined-root'):
+                db.commit()
+                return db
+
+            self.close_connection()
+            loc = info['provider_location']
+            if loc is None:
+                self.log.info("Sub node provider not initialized?")
+                sys.exit(1)
+
+            # walk upwards
+            db = self.get_database('root_db', connstr = loc)
+
+    def load_root_info(self, db):
+        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
+        info = res[0]
+
+        q = "select * from pgq_set.get_member_info(%s)"
+        node_list = self.exec_query(db, q, [self.set_name])
+
+        db.commit()
+
+        return SetInfo(self.set_name, info, node_list)
+
+    def exec_sql(self, db, q, args):
+        self.log.debug(q)
+        curs = db.cursor()
+        curs.execute(q, args)
+        db.commit()
+
+    def exec_query(self, db, q, args):
+        self.log.debug(q)
+        curs = db.cursor()
+        curs.execute(q, args)
+        res = curs.dictfetchall()
+        db.commit()
+        return res
+
+    def install_code(self, db):
+        objs = [
+            skytools.DBLanguage("plpgsql"),
+            skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
+            skytools.DBSchema("pgq", sql_file="pgq.sql"),
+            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"),
+            skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"),
+        ]
+        skytools.db_install(db.cursor(), objs, self.log.debug)
+        db.commit()
+
+if __name__ == '__main__':
+    script = SetAdmin('set_admin', sys.argv[1:])
+    script.start()
+
index 9cc4c332d624b61ce1d02f410782df4348b321d6..61cfdc7753a27e8cdfeb4cf5ed243d894e625b21 100755 (executable)
@@ -1,215 +1,8 @@
 #! /usr/bin/env python
 
-import sys, optparse, skytools
-
-from pgq.setconsumer import MemberInfo, NodeInfo
-
-
-class MemberInfo:
-    def __init__(self, row):
-        self.name = row['node_name']
-        self.location = row['node_location']
-        self.dead = row['dead']
-
-class SetInfo:
-    def __init__(self, set_name, info_row, member_rows):
-        self.root_info = info_row
-        self.set_name = set_name
-        self.member_map = {}
-        self.root_name = info_row['node_name']
-        self.root_type = info_row['node_type']
-        self.global_watermark = info_row['global_watermark']
-
-        for r in member_rows:
-            n = MemberInfo(r)
-            self.member_map[n.name] = n
-
-    def get_member(self, name):
-        return self.member_map.get(name)
-
-command_usage = """
-%prog [options] INI CMD [subcmd args]
-
-commands:
-"""
-
-class SetAdmin(skytools.DBScript):
-    root_name = None
-    root_info = None
-    member_map = {}
-    set_name = None
-
-    def init_optparse(self, parser = None):
-        p = skytools.DBScript.init_optparse(self, parser)
-        p.set_usage(command_usage.strip())
-
-        g = optparse.OptionGroup(p, "actual setadm options")
-        g.add_option("--connstr", action="store_true",
-                     help = "add: ignore table differences, repair: ignore lag")
-        g.add_option("--provider",
-                     help = "add: ignore table differences, repair: ignore lag")
-        p.add_option_group(g)
-        return p
-
-    def work(self):
-        self.set_single_loop(1)
-
-        self.set_name = self.cf.get('set_name')
-
-        if self.is_cmd("init-root", 2):
-            self.init_node("root", self.args[2], self.args[3])
-        elif self.is_cmd("init-branch", 2):
-            self.init_node("branch", self.args[2], self.args[3])
-        elif self.is_cmd("init-leaf", 2):
-            self.init_node("leaf", self.args[2], self.args[3])
-        else:
-            self.log.info("need command")
-
-    def is_cmd(self, name, argcnt):
-        if len(self.args) < 2:
-            return False
-        if self.args[1] != name:
-            return False
-        if len(self.args) != argcnt + 2:
-            self.log.error("cmd %s needs %d args" % (name, argcnt))
-            sys.exit(1)
-        return True
-
-    def init_node(self, node_type, node_name, node_location):
-        # connect to database
-        db = self.get_database("new_node", connstr = node_location)
-
-        # check if code is installed
-        self.install_code(db)
-
-        # query current status
-        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-        info = res[0]
-        if info['node_type'] is not None:
-            self.log.info("Node is already initialized as %s" % info['node_type'])
-            return
-        
-        worker_name = "%s_%s_worker" % (self.set_name, node_name)
-
-        # register member
-        if node_type in ('root', 'combined-root'):
-            global_watermark = None
-            combined_set = None
-            provider_name = None
-            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
-        else:
-            root_db = self.find_root_db()
-            set = self.load_root_info(root_db)
-
-            # check if member already exists
-            if set.get_member(node_name) is not None:
-                self.log.error("Node '%s' already exists" % node_name)
-                sys.exit(1)
-
-            global_watermark = set.global_watermark
-            combined_set = None
-            provider_name = self.options.provider
-
-            # register member on root
-            self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            root_db.commit()
-
-            # lookup provider
-            provider = set.get_member(provider_name)
-            if not provider:
-                self.log.error("Node %s does not exist" % provider_name)
-                sys.exit(1)
-
-            # register on provider
-            provider_db = self.get_database('provider_db', connstr = provider.location)
-            self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)",
-                          [self.set_name, node_name, worker_name])
-            provider_db.commit()
-
-            # initialize node itself
-            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
-                          [self.set_name, provider_name, provider.location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, worker_name, provider_name,
-                           global_watermark, combined_set])
-            db.commit()
-
-            
-
-
-        self.log.info("Done")
-
-    def find_root_db(self):
-        db = self.get_database('root_db')
-
-        while 1:
-            # query current status
-            res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-            info = res[0]
-            type = info['node_type']
-            if type is None:
-                self.log.info("Root node not initialized?")
-                sys.exit(1)
-
-            # configured db may not be root anymore, walk upwards then
-            if type in ('root', 'combined-root'):
-                db.commit()
-                return db
-
-            self.close_connection()
-            loc = info['provider_location']
-            if loc is None:
-                self.log.info("Sub node provider not initialized?")
-                sys.exit(1)
-
-            # walk upwards
-            db = self.get_database('root_db', connstr = loc)
-
-    def load_root_info(self, db):
-        res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name])
-        info = res[0]
-
-        q = "select * from pgq_set.get_member_info(%s)"
-        node_list = self.exec_query(db, q, [self.set_name])
-
-        db.commit()
-
-        return SetInfo(self.set_name, info, node_list)
-
-    def exec_sql(self, db, q, args):
-        self.log.debug(q)
-        curs = db.cursor()
-        curs.execute(q, args)
-        db.commit()
-
-    def exec_query(self, db, q, args):
-        self.log.debug(q)
-        curs = db.cursor()
-        curs.execute(q, args)
-        res = curs.dictfetchall()
-        db.commit()
-        return res
-
-    def install_code(self, db):
-        objs = [
-            skytools.DBLanguage("plpgsql"),
-            skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
-            skytools.DBSchema("pgq", sql_file="pgq.sql"),
-            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"),
-            skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"),
-        ]
-        skytools.db_install(db.cursor(), objs, self.log.debug)
-        db.commit()
+import sys, pgq.setadmin
 
 if __name__ == '__main__':
-    script = SetAdmin('set_admin', sys.argv[1:])
+    script = pgq.setadmin.SetAdmin('set_admin', sys.argv[1:])
     script.start()