Move --find-copy-node logic to 'copy'
authorMarko Kreen <markokr@gmail.com>
Thu, 18 Oct 2012 08:15:35 +0000 (11:15 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 18 Oct 2012 08:15:35 +0000 (11:15 +0300)
This allows use of this switch also in merge situations,
where the node is different in each partition.

python/londiste/setup.py
python/londiste/table_copy.py

index f06d9b0b4d6df1e5623bd65d63e3fe580dbd4ff2..49a5eb9630a1640fde364984da622836ad78b2b1 100644 (file)
@@ -135,17 +135,8 @@ class LondisteSetup(CascadeAdmin):
         needs_tbl = self.handler_needs_table()
         args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
 
-        # search for usable copy node if requested
-        if (self.options.find_copy_node
-                and not self.is_root()
-                and needs_tbl):
-            src_db = self.find_copy_node(dst_db, args)
-            src_curs = src_db.cursor()
-            src_tbls = self.fetch_set_tables(src_curs)
-            src_db.commit()
-
         # dont check for exist/not here (root handling)
-        if not self.is_root() and not self.options.expect_sync:
+        if not self.is_root() and not self.options.expect_sync and not self.options.find_copy_node:
             problems = False
             for tbl in args:
                 tbl = skytools.fq_name(tbl)
@@ -169,6 +160,11 @@ class LondisteSetup(CascadeAdmin):
             self.log.error("--dest-table can be given only for single table")
             sys.exit(1)
 
+        # not implemented
+        if self.options.find_copy_node and create_flags != 0:
+            self.log.error("--find-copy-node does not work with --create")
+            sys.exit(1)
+
         # seems ok
         for tbl in args:
             self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
@@ -236,7 +232,9 @@ class LondisteSetup(CascadeAdmin):
             attrs['handler'] = hstr
             p.add(tgargs)
 
-        if self.options.copy_node:
+        if self.options.find_copy_node:
+            attrs['copy_node'] = '?'
+        elif self.options.copy_node:
             attrs['copy_node'] = self.options.copy_node
 
         if self.options.expect_sync:
@@ -267,15 +265,6 @@ class LondisteSetup(CascadeAdmin):
             return p.needs_table()
         return True
 
-    def handler_allows_copy(self, table_attrs):
-        """Decide if table is copyable based on attrs."""
-        if not table_attrs:
-            return True
-        attrs = skytools.db_urldecode(table_attrs)
-        hstr = attrs['handler']
-        p = londiste.handler.build_handler('unused.string', hstr, None)
-        return p.needs_table()
-
     def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
         for tbl in src_tbls.keys():
             q = "select * from londiste.global_add_table(%s, %s)"
@@ -477,60 +466,6 @@ class LondisteSetup(CascadeAdmin):
             self.exec_cmd(db, q, [self.queue_name, fname], commit = False)
         db.commit()
 
-    def find_copy_node(self, dst_db, args):
-        src_db = self.get_provider_db()
-
-        need = {}
-        for t in args:
-            need[t] = 1
-
-        while 1:
-            src_curs = src_db.cursor()
-
-            q = "select * from pgq_node.get_node_info(%s)"
-            src_curs.execute(q, [self.queue_name])
-            info = src_curs.fetchone()
-            if info['ret_code'] >= 400:
-                raise UsageError("Node does not exists")
-
-            self.log.info("Checking if %s can be used for copy", info['node_name'])
-
-            q = "select table_name, local, table_attrs from londiste.get_table_list(%s)"
-            src_curs.execute(q, [self.queue_name])
-            got = {}
-            for row in src_curs.fetchall():
-                tbl = row['table_name']
-                if tbl not in need:
-                    continue
-                if not row['local']:
-                    self.log.debug("Problem: %s is not local", tbl)
-                    continue
-                if not self.handler_allows_copy(row['table_attrs']):
-                    self.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
-                    continue
-                self.log.debug("Good: %s is usable", tbl)
-                got[row['table_name']] = 1
-
-            ok = 1
-            for t in args:
-                if t not in got:
-                    self.log.info("Node %s does not have all tables", info['node_name'])
-                    ok = 0
-                    break
-
-            if ok:
-                self.options.copy_node = info['node_name']
-                self.log.info("Node %s seems good source, using it", info['node_name'])
-                break
-
-            if info['node_type'] == 'root':
-                raise skytools.UsageError("Found root and no source found")
-
-            self.close_database('provider_db')
-            src_db = self.get_database('provider_db', connstr = info['provider_location'])
-
-        return src_db
-
     def get_provider_db(self):
 
         # use custom node for copy
index a2138d988903017a3ff2f9874acbca7177ca37c1..6da4b1ee8ebff08e97783c374705b99805e824e0 100644 (file)
@@ -7,6 +7,8 @@ For internal usage.
 
 import sys, time, skytools
 
+import londiste
+
 from skytools.dbstruct import *
 from londiste.playback import *
 
@@ -241,26 +243,81 @@ class CopyTable(Replicator):
             if v_attrs:
                 attrs = skytools.db_urldecode(v_attrs)
 
+        # fetch parent consumer state
+        q = "select * from pgq_node.get_consumer_state(%s, %s)"
+        rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.old_consumer_name ])
+        state = rows[0]
+        source_node = state['provider_node']
+        source_location = state['provider_location']
+
         # do we have node here?
         if 'copy_node' in attrs:
-            # take node from attrs
-            source_node = attrs['copy_node']
-            q = "select * from pgq_node.get_queue_locations(%s) where node_name = %s"
-            dst_curs.execute(q, [ self.queue_name, source_node ])
-            rows = dst_curs.fetchall()
-            if len(rows):
-                source_location = rows[0]['node_location']
-        else:
-            # fetch parent consumer state
-            q = "select * from pgq_node.get_consumer_state(%s, %s)"
-            rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.old_consumer_name ])
-            state = rows[0]
-            source_node = state['provider_node']
-            source_location = state['provider_location']
+            if attrs['copy_node'] == '?':
+                source_node, source_location = self.find_copy_source(source_node, source_location)
+            else:
+                # take node from attrs
+                source_node = attrs['copy_node']
+                q = "select * from pgq_node.get_queue_locations(%s) where node_name = %s"
+                dst_curs.execute(q, [ self.queue_name, source_node ])
+                rows = dst_curs.fetchall()
+                if len(rows):
+                    source_location = rows[0]['node_location']
 
         self.log.info("Using '%s' as source node", source_node)
         self.register_consumer(source_location)
 
+    def handler_allows_copy(self, table_attrs):
+        """Decide if table is copyable based on attrs."""
+        if not table_attrs:
+            return True
+        attrs = skytools.db_urldecode(table_attrs)
+        hstr = attrs['handler']
+        p = londiste.handler.build_handler('unused.string', hstr, None)
+        return p.needs_table()
+
+    def find_copy_source(self, node_name, node_location):
+        while 1:
+            src_db = self.get_database('_source_db', connstr = node_location, autocommit = 1)
+            src_curs = src_db.cursor()
+
+            q = "select * from pgq_node.get_node_info(%s)"
+            src_curs.execute(q, [self.queue_name])
+            info = src_curs.fetchone()
+            if info['ret_code'] >= 400:
+                raise skytools.UsageError("Node does not exists")
+
+            self.log.info("Checking if %s can be used for copy", info['node_name'])
+
+            q = "select table_name, local, table_attrs from londiste.get_table_list(%s) where table_name = %s"
+            src_curs.execute(q, [self.queue_name, self.copy_table_name])
+            got = False
+            for row in src_curs.fetchall():
+                tbl = row['table_name']
+                if tbl != self.copy_table_name:
+                    continue
+                if not row['local']:
+                    self.log.debug("Problem: %s is not local", tbl)
+                    continue
+                if not self.handler_allows_copy(row['table_attrs']):
+                    self.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
+                    continue
+                self.log.debug("Good: %s is usable", tbl)
+                got = True
+                break
+
+            self.close_database('_source_db')
+
+            if got:
+                self.log.info("Node %s seems good source, using it", info['node_name'])
+                return node_name, node_location
+
+            if info['node_type'] == 'root':
+                raise skytools.UsageError("Found root and no source found")
+
+            # walk upwards
+            node_name = info['provider_node']
+            node_location = info['provider_location']
+
 if __name__ == '__main__':
     script = CopyTable(sys.argv[1:])
     script.start()