londiste: --find-copy-node switch for add-table
authorMarko Kreen <markokr@gmail.com>
Fri, 13 Jul 2012 10:46:30 +0000 (13:46 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 13 Jul 2012 19:45:37 +0000 (22:45 +0300)
This will find node where tables are available for copy
by walking upwards to root.

python/londiste.py
python/londiste/setup.py
tests/londiste/regen.sh

index 78d90d6f6caef4a9f76a434721cde58e97d911ea..5d46bbdc826f33dd496bfa6e8ead23a569d51060 100755 (executable)
@@ -134,6 +134,8 @@ class Londiste(skytools.DBScript):
                 help="add: Custom handler for table")
         g.add_option("--handler-arg", action="append",
                 help="add: Argument to custom handler")
+        g.add_option("--find-copy-node", dest="find_copy_node", action="store_true",
+                help = "add: walk upstream to find node to copy from")
         g.add_option("--copy-node", dest="copy_node",
                 help = "add: use NODE as source for initial COPY")
         g.add_option("--copy-condition", dest="copy_condition",
index 67a6854dc865671c1dbd74958e0616b1ccc52aa8..b8ca4a0c45b0fa58a9cfda399776f749f552618d 100644 (file)
@@ -46,6 +46,8 @@ class LondisteSetup(CascadeAdmin):
                     help = "no copy needed", default=False)
         p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
                     help = "dont delete old data", default=False)
+        p.add_option("--find-copy-node", action="store_true", dest="find_copy_node",
+                help = "add: find table source for copy by walking upwards")
         p.add_option("--copy-node", dest="copy_node",
                 help = "add: use NODE as source for initial copy")
         p.add_option("--copy-condition", dest="copy_condition",
@@ -135,6 +137,15 @@ class LondisteSetup(CascadeAdmin):
         needs_tbl = self.handler_needs_table()
         args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
 
+        # search for usable copy node if requested
+        if (self.options.find_copy_node
+                and not self.is_root()
+                and needs_tbl):
+            src_db = self.find_copy_node(dst_db, args)
+            src_curs = src_db.cursor()
+            src_tbls = self.fetch_set_tables(src_curs)
+            src_db.commit()
+
         # dont check for exist/not here (root handling)
         if not self.is_root() and not self.options.expect_sync:
             problems = False
@@ -260,6 +271,15 @@ class LondisteSetup(CascadeAdmin):
             return p.needs_table()
         return True
 
+    def handler_allows_copy(self, table_attrs):
+        """Decide if table is copyable based on attrs."""
+        if not table_attrs:
+            return True
+        attrs = skytools.db_urldecode(table_attrs)
+        hstr = attrs['handler']
+        p = londiste.handler.build_handler('unused.string', hstr, None)
+        return p.needs_table()
+
     def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
         for tbl in src_tbls.keys():
             q = "select * from londiste.global_add_table(%s, %s)"
@@ -461,6 +481,60 @@ class LondisteSetup(CascadeAdmin):
             self.exec_cmd(db, q, [self.queue_name, fname], commit = False)
         db.commit()
 
+    def find_copy_node(self, dst_db, args):
+        src_db = self.get_provider_db()
+
+        need = {}
+        for t in args:
+            need[t] = 1
+
+        while 1:
+            src_curs = src_db.cursor()
+
+            q = "select * from pgq_node.get_node_info(%s)"
+            src_curs.execute(q, [self.queue_name])
+            info = src_curs.fetchone()
+            if info['ret_code'] >= 400:
+                raise UsageError("Node does not exists")
+
+            self.log.info("Checking if %s can be used for copy", info['node_name'])
+
+            q = "select table_name, local, table_attrs from londiste.get_table_list(%s)"
+            src_curs.execute(q, [self.queue_name])
+            got = {}
+            for row in src_curs.fetchall():
+                tbl = row['table_name']
+                if tbl not in need:
+                    continue
+                if not row['local']:
+                    self.log.debug("Problem: %s is not local", tbl)
+                    continue
+                if not self.handler_allows_copy(row['table_attrs']):
+                    self.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
+                    continue
+                self.log.debug("Good: %s is usable", tbl)
+                got[row['table_name']] = 1
+
+            ok = 1
+            for t in args:
+                if t not in got:
+                    self.log.info("Node %s does not have all tables", info['node_name'])
+                    ok = 0
+                    break
+
+            if ok:
+                self.options.copy_node = info['node_name']
+                self.log.info("Node %s seems good source, using it", info['node_name'])
+                break
+
+            if info['node_type'] == 'root':
+                raise skytools.UsageError("Found root and no source found")
+
+            self.close_database('provider_db')
+            src_db = self.get_database('provider_db', connstr = info['provider_location'])
+
+        return src_db
+
     def get_provider_db(self):
 
         # use custom node for copy
index 72e0d73f666aeaa1113f51f342a37eab2bcdeb55..6e60f1dc1f468dddbf374e2fb68f1ba34b0fc5e5 100755 (executable)
@@ -120,10 +120,20 @@ run londiste3 conf/londiste_db5.ini wait-sync
 
 msg "Unregister table2 from root"
 run londiste3 $v conf/londiste_db1.ini remove-table mytable2
-msg "Wait until unregister reaches db5"
 
+msg "Wait until unregister reaches db5"
 run londiste3 conf/londiste_db5.ini wait-root
 
+
+
+run londiste3 conf/londiste_db5.ini status
+
+msg "Test skipped copy"
+run londiste3 $v conf/londiste_db1.ini add-table mytable2
+run londiste3 $v conf/londiste_db5.ini wait-root
+run londiste3 $v conf/londiste_db5.ini add-table mytable2 --find-copy-node
+run londiste3 $v conf/londiste_db5.ini wait-sync
+
 ##
 ## basic setup done
 ##