londiste: add-table supports --find-copy-node working with --create now
authormartinko <gamato@users.sf.net>
Mon, 27 May 2013 18:12:50 +0000 (20:12 +0200)
committermartinko <gamato@users.sf.net>
Mon, 27 May 2013 18:12:50 +0000 (20:12 +0200)
python/londiste/setup.py
python/londiste/util.py

index 5a7398565afb6d75c7c2641c42201dc958f01370..7d73f9432fbef518ffe2867f0debe3c06f5c0f13 100644 (file)
@@ -7,6 +7,7 @@ import sys, os, re, skytools
 
 from pgq.cascade.admin import CascadeAdmin
 from londiste.exec_attrs import ExecAttrs
+from londiste.util import find_copy_source
 
 import londiste.handler
 
@@ -137,6 +138,25 @@ class LondisteSetup(CascadeAdmin):
         needs_tbl = self.handler_needs_table()
         args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
 
+        # pick proper create flags
+        if self.options.create_full:
+            create_flags = skytools.T_ALL
+        elif self.options.create:
+            create_flags = skytools.T_TABLE | skytools.T_PKEY
+        else:
+            create_flags = 0
+
+        # search for usable copy node if requested & needed
+        if (self.options.find_copy_node and create_flags != 0
+                and needs_tbl and not self.is_root()):
+            src_name, src_loc, _ = find_copy_source(self, self.queue_name, args, None, self.provider_location)
+            self.options.copy_node = src_name
+            self.close_database('provider_db')
+            src_db = self.get_provider_db()
+            src_curs = src_db.cursor()
+            src_tbls = self.fetch_set_tables(src_curs)
+            src_db.commit()
+
         # dont check for exist/not here (root handling)
         if not self.is_root() and not self.options.expect_sync and not self.options.find_copy_node:
             problems = False
@@ -149,24 +169,11 @@ class LondisteSetup(CascadeAdmin):
                 self.log.error("Problems, canceling operation")
                 sys.exit(1)
 
-        # pick proper create flags
-        if self.options.create_full:
-            create_flags = skytools.T_ALL
-        elif self.options.create:
-            create_flags = skytools.T_TABLE | skytools.T_PKEY
-        else:
-            create_flags = 0
-
         # sanity check
         if self.options.dest_table and len(args) > 1:
             self.log.error("--dest-table can be given only for single table")
             sys.exit(1)
 
-        # not implemented
-        if self.options.find_copy_node and create_flags != 0:
-            self.log.error("--find-copy-node does not work with --create")
-            sys.exit(1)
-
         # seems ok
         for tbl in args:
             self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
@@ -540,9 +547,8 @@ class LondisteSetup(CascadeAdmin):
         db.commit()
 
     def get_provider_db(self):
-
-        # use custom node for copy
         if self.options.copy_node:
+            # use custom node for copy
             source_node = self.options.copy_node
             m = self.queue_info.get_member(source_node)
             if not m:
@@ -556,6 +562,7 @@ class LondisteSetup(CascadeAdmin):
             q = 'select * from pgq_node.get_node_info(%s)'
             res = self.exec_cmd(db, q, [self.queue_name], quiet = True)
             self.provider_location = res[0]['provider_location']
+
         return self.get_database('provider_db', connstr = self.provider_location, profile = 'remote')
 
     def expand_arg_list(self, db, kind, existing, args, needs_tbl=True):
index cba18f62688768848b998e1829b50b42335703b5..07ff94077089e92540b871da886f7b383b0270d6 100644 (file)
@@ -18,7 +18,7 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
 
     @param script: DbScript
     @param queue_name: name of the cascaded queue
-    @param copy_table_name: name of the table
+    @param copy_table_name: name of the table (or list of names)
     @param node_name: target node name
     @param node_location: target node location
     @returns (node_name, node_location, downstream_worker_name) of source node
@@ -27,6 +27,11 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
     # None means no steps upwards were taken, so local consumer is worker
     worker_name = None
 
+    if isinstance(copy_table_name, str):
+        need = set([copy_table_name])
+    else:
+        need = set(copy_table_name)
+
     while 1:
         src_db = script.get_database('_source_db', connstr = node_location, autocommit = 1, profile = 'remote')
         src_curs = src_db.cursor()
@@ -39,12 +44,12 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
 
         script.log.info("Checking if %s can be used for copy", info['node_name'])
 
-        q = "select table_name, local, table_attrs from londiste.get_table_list(%s) where table_name = %s"
-        src_curs.execute(q, [queue_name, copy_table_name])
-        got = False
+        q = "select table_name, local, table_attrs from londiste.get_table_list(%s)"
+        src_curs.execute(q, [queue_name])
+        got = set()
         for row in src_curs.fetchall():
             tbl = row['table_name']
-            if tbl != copy_table_name:
+            if tbl not in need:
                 continue
             if not row['local']:
                 script.log.debug("Problem: %s is not local", tbl)
@@ -53,14 +58,15 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati
                 script.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
                 continue
             script.log.debug("Good: %s is usable", tbl)
-            got = True
-            break
+            got.add(tbl)
 
         script.close_database('_source_db')
 
-        if got:
+        if got == need:
             script.log.info("Node %s seems good source, using it", info['node_name'])
             return node_name, node_location, worker_name
+        else:
+            script.log.info("Node %s does not have all tables", info['node_name'])
 
         if info['node_type'] == 'root':
             raise skytools.UsageError("Found root and no source found")