help = "no copy needed", default=False)
p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
help = "dont delete old data", default=False)
+ p.add_option("--find-copy-node", action="store_true", dest="find_copy_node",
+ help = "add: find table source for copy by walking upwards")
p.add_option("--copy-node", dest="copy_node",
help = "add: use NODE as source for initial copy")
p.add_option("--copy-condition", dest="copy_condition",
needs_tbl = self.handler_needs_table()
args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
+ # search for usable copy node if requested
+ if (self.options.find_copy_node
+ and not self.is_root()
+ and needs_tbl):
+ src_db = self.find_copy_node(dst_db, args)
+ src_curs = src_db.cursor()
+ src_tbls = self.fetch_set_tables(src_curs)
+ src_db.commit()
+
# dont check for exist/not here (root handling)
if not self.is_root() and not self.options.expect_sync:
problems = False
return p.needs_table()
return True
+ def handler_allows_copy(self, table_attrs):
+ """Decide if table is copyable based on attrs."""
+ if not table_attrs:
+ return True
+ attrs = skytools.db_urldecode(table_attrs)
+ hstr = attrs['handler']
+ p = londiste.handler.build_handler('unused.string', hstr, None)
+ return p.needs_table()
+
def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
for tbl in src_tbls.keys():
q = "select * from londiste.global_add_table(%s, %s)"
self.exec_cmd(db, q, [self.queue_name, fname], commit = False)
db.commit()
+ def find_copy_node(self, dst_db, args):
+ src_db = self.get_provider_db()
+
+ need = {}
+ for t in args:
+ need[t] = 1
+
+ while 1:
+ src_curs = src_db.cursor()
+
+ q = "select * from pgq_node.get_node_info(%s)"
+ src_curs.execute(q, [self.queue_name])
+ info = src_curs.fetchone()
+ if info['ret_code'] >= 400:
+ raise UsageError("Node does not exists")
+
+ self.log.info("Checking if %s can be used for copy", info['node_name'])
+
+ q = "select table_name, local, table_attrs from londiste.get_table_list(%s)"
+ src_curs.execute(q, [self.queue_name])
+ got = {}
+ for row in src_curs.fetchall():
+ tbl = row['table_name']
+ if tbl not in need:
+ continue
+ if not row['local']:
+ self.log.debug("Problem: %s is not local", tbl)
+ continue
+ if not self.handler_allows_copy(row['table_attrs']):
+ self.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
+ continue
+ self.log.debug("Good: %s is usable", tbl)
+ got[row['table_name']] = 1
+
+ ok = 1
+ for t in args:
+ if t not in got:
+ self.log.info("Node %s does not have all tables", info['node_name'])
+ ok = 0
+ break
+
+ if ok:
+ self.options.copy_node = info['node_name']
+ self.log.info("Node %s seems good source, using it", info['node_name'])
+ break
+
+ if info['node_type'] == 'root':
+ raise skytools.UsageError("Found root and no source found")
+
+ self.close_database('provider_db')
+ src_db = self.get_database('provider_db', connstr = info['provider_location'])
+
+ return src_db
+
def get_provider_db(self):
# use custom node for copy