help = "add: no copy needed", default=False)
g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
help = "add: keep old data", default=False)
+ p.add_option("--copy-condition", dest="copy_condition",
+ help = "copy: where expression")
g.add_option("--provider",
help = "init: upstream node temp connect string")
g.add_option("--create", action="store_true",
help = "no copy needed", default=False)
p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
help = "dont delete old data", default=False)
+ p.add_option("--copy-condition", dest="copy_condition",
+ help = "copy: where expression")
p.add_option("--force", action="store_true",
help="force", default=False)
p.add_option("--all", action="store_true",
if self.options.expect_sync:
q = "select * from londiste.local_set_table_state(%s, %s, NULL, 'ok')"
self.exec_cmd(dst_curs, q, [self.set_name, tbl])
+ if self.options.copy_condition:
+ q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
+ attrs = {'copy_condition': self.options.copy_condition}
+ enc_attrs = skytools.db_urlencode(attrs)
+ self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
dst_db.commit()
def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
tablename = tbl_stat.name
# do copy
self.log.info("%s: start copy" % tablename)
- stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list)
+ w_cond = tbl_stat.table_attrs.get('copy_condition')
+ stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list, w_cond)
if stats:
self.log.info("%s: copy finished: %d bytes, %d rows" % (
tablename, stats[0], stats[1]))
self.buf.seek(0)
self.buf.truncate()
-def full_copy(tablename, src_curs, dst_curs, column_list = []):
+def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None):
"""COPY table from one db to another."""
qtable = quote_fqident(tablename)
if column_list:
- qfields = [quote_ident(f) for f in column_list]
- hdr = "%s (%s)" % (qtable, ",".join(qfields))
+ qfields = ",".join([quote_ident(f) for f in column_list])
+ src = dst = "%s (%s)" % (qtable, qfields)
else:
- hdr = qtable
+ qfields = '*'
+ src = dst = qtable
+
+ if condition:
+ src = "(SELECT %s FROM %s WHERE %s)" % (qfields, qtable, condition)
+
if hasattr(src_curs, 'copy_expert'):
- sql_to = "COPY %s TO stdout" % hdr
- sql_from = "COPY %s FROM stdout" % hdr
+ sql_to = "COPY %s TO stdout" % src
+ sql_from = "COPY %s FROM stdin" % dst
buf = CopyPipe(dst_curs, sql_from = sql_from)
src_curs.copy_expert(sql_to, buf)
else:
- buf = CopyPipe(dst_curs, hdr)
- src_curs.copy_to(buf, hdr)
+ if condition:
+ # regular psycopg copy_to generates invalid sql for subselect copy
+ raise Exception('copy_expert() is needed for conditional copy')
+ buf = CopyPipe(dst_curs, dst)
+ src_curs.copy_to(buf, src)
buf.flush()
return (buf.total_bytes, buf.total_rows)