dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
src_where = dst_where
- self.log.info('Counting %s' % dst_tbl)
+ self.log.info('Counting %s', dst_tbl)
# get common cols
cols = self.calc_cols(src_curs, src_tbl, dst_curs, dst_tbl)
f += ", checksum=%(chksum)s"
f = self.cf.get('compare_fmt', f)
- self.log.debug("srcdb: " + src_q)
+ self.log.debug("srcdb: %s", src_q)
src_curs.execute(src_q)
src_row = src_curs.fetchone()
src_str = f % src_row
- self.log.info("srcdb: %s" % src_str)
+ self.log.info("srcdb: %s", src_str)
src_db.commit()
- self.log.debug("dstdb: " + dst_q)
+ self.log.debug("dstdb: %s", dst_q)
dst_curs.execute(dst_q)
dst_row = dst_curs.fetchone()
dst_str = f % dst_row
- self.log.info("dstdb: %s" % dst_str)
+ self.log.info("dstdb: %s", dst_str)
dst_db.commit()
if src_str != dst_str:
- self.log.warning("%s: Results do not match!" % dst_tbl)
+ self.log.warning("%s: Results do not match!", dst_tbl)
return 1
return 0
qfn = skytools.quote_fqident(fn)
qargs = [skytools.quote_literal(a) for a in args]
sql = "select %s(%s);" % (qfn, ', '.join(qargs))
- self.log.debug('applyfn.sql: %s' % sql)
+ self.log.debug('applyfn.sql: %s', sql)
sql_queue_func(sql, qfunc_arg)
#------------------------------------------------------------------------------
if not self.method in (0,1,2):
raise Exception('unknown method: %s' % self.method)
- self.log.debug('bulk_init(%s), method=%d' % (repr(args), self.method))
+ self.log.debug('bulk_init(%r), method=%d', args, self.method)
def reset(self):
self.pkey_ev_map = {}
op = ev.ev_type[0]
if op not in 'IUD':
raise Exception('Unknown event type: '+ev.ev_type)
- self.log.debug('bulk.process_event: %s/%s' % (ev.ev_type, ev.ev_data))
+ self.log.debug('bulk.process_event: %s/%s', ev.ev_type, ev.ev_data)
# pkey_list = ev.ev_type[2:].split(',')
data = skytools.db_urldecode(ev.ev_data)
real_update_count = len(upd_list)
- self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)" % (
- self.table_name, len(ins_list), len(upd_list), len(del_list)))
+ self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)",
+ self.table_name, len(ins_list), len(upd_list), len(del_list))
# hack to unbroke stuff
if self.method == METH_MERGED:
for fld in self.dist_fields:
if fld not in key_fields:
key_fields.append(fld)
- self.log.debug("PKey fields: %s Dist fields: %s" % (
- ",".join(self.pkey_list), ",".join(self.dist_fields)))
+ self.log.debug("PKey fields: %s Dist fields: %s",
+ ",".join(self.pkey_list), ",".join(self.dist_fields))
# create temp table
temp, qtemp = self.create_temp_table(curs)
# process deleted rows
if len(del_list) > 0:
- self.log.debug("bulk: Deleting %d rows from %s" % (len(del_list), tbl))
+ self.log.debug("bulk: Deleting %d rows from %s", len(del_list), tbl)
# delete old rows
q = "truncate %s" % qtemp
- self.log.debug('bulk: %s' % q)
+ self.log.debug('bulk: %s', q)
curs.execute(q)
# copy rows
- self.log.debug("bulk: COPY %d rows into %s" % (len(del_list), temp))
+ self.log.debug("bulk: COPY %d rows into %s", len(del_list), temp)
skytools.magic_insert(curs, qtemp, del_list, col_list, quoted_table=1)
# delete rows
- self.log.debug('bulk: ' + del_sql)
+ self.log.debug('bulk: %s', del_sql)
curs.execute(del_sql)
- self.log.debug("bulk: %s - %d" % (curs.statusmessage, curs.rowcount))
+ self.log.debug("bulk: %s - %d", curs.statusmessage, curs.rowcount)
if len(del_list) != curs.rowcount:
- self.log.warning("Delete mismatch: expected=%s deleted=%d"
- % (len(del_list), curs.rowcount))
+ self.log.warning("Delete mismatch: expected=%s deleted=%d",
+ len(del_list), curs.rowcount)
temp_used = True
# process updated rows
if len(upd_list) > 0:
- self.log.debug("bulk: Updating %d rows in %s" % (len(upd_list), tbl))
+ self.log.debug("bulk: Updating %d rows in %s", len(upd_list), tbl)
# delete old rows
q = "truncate %s" % qtemp
- self.log.debug('bulk: ' + q)
+ self.log.debug('bulk: %s', q)
curs.execute(q)
# copy rows
- self.log.debug("bulk: COPY %d rows into %s" % (len(upd_list), temp))
+ self.log.debug("bulk: COPY %d rows into %s", len(upd_list), temp)
skytools.magic_insert(curs, qtemp, upd_list, col_list, quoted_table=1)
temp_used = True
if self.method == METH_CORRECT:
# update main table
- self.log.debug('bulk: ' + upd_sql)
+ self.log.debug('bulk: %s', upd_sql)
curs.execute(upd_sql)
- self.log.debug("bulk: %s - %d" % (curs.statusmessage, curs.rowcount))
+ self.log.debug("bulk: %s - %d", curs.statusmessage, curs.rowcount)
# check count
if len(upd_list) != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s updated=%d"
- % (len(upd_list), curs.rowcount))
+ self.log.warning("Update mismatch: expected=%s updated=%d",
+ len(upd_list), curs.rowcount)
else:
# delete from main table
- self.log.debug('bulk: ' + del_sql)
+ self.log.debug('bulk: %s', del_sql)
curs.execute(del_sql)
- self.log.debug('bulk: ' + curs.statusmessage)
+ self.log.debug('bulk: %s', curs.statusmessage)
# check count
if real_update_count != curs.rowcount:
- self.log.warning("bulk: Update mismatch: expected=%s deleted=%d"
- % (real_update_count, curs.rowcount))
+ self.log.warning("bulk: Update mismatch: expected=%s deleted=%d",
+ real_update_count, curs.rowcount)
# insert into main table
if AVOID_BIZGRES_BUG:
# copy again, into main table
- self.log.debug("bulk: COPY %d rows into %s" % (len(upd_list), tbl))
+ self.log.debug("bulk: COPY %d rows into %s", len(upd_list), tbl)
skytools.magic_insert(curs, qtbl, upd_list, col_list, quoted_table=1)
else:
# better way, but does not work due bizgres bug
- self.log.debug('bulk: ' + ins_sql)
+ self.log.debug('bulk: %s', ins_sql)
curs.execute(ins_sql)
- self.log.debug('bulk: ' + curs.statusmessage)
+ self.log.debug('bulk: %s', curs.statusmessage)
# process new rows
if len(ins_list) > 0:
- self.log.debug("bulk: Inserting %d rows into %s" % (len(ins_list), tbl))
- self.log.debug("bulk: COPY %d rows into %s" % (len(ins_list), tbl))
+ self.log.debug("bulk: Inserting %d rows into %s", len(ins_list), tbl)
+ self.log.debug("bulk: COPY %d rows into %s", len(ins_list), tbl)
skytools.magic_insert(curs, qtbl, ins_list, col_list, quoted_table=1)
# delete remaining rows
else:
# fscking problems with long-lived temp tables
q = "drop table %s" % qtemp
- self.log.debug('bulk: ' + q)
+ self.log.debug('bulk: %s', q)
curs.execute(q)
self.reset()
# check if exists
if USE_REAL_TABLE:
if skytools.exists_table(curs, tempname):
- self.log.debug("bulk: Using existing real table %s" % tempname)
+ self.log.debug("bulk: Using existing real table %s", tempname)
return tempname, quote_fqident(tempname)
# create non-temp table
q = "create table %s (like %s)" % (
quote_fqident(tempname),
quote_fqident(self.dest_table))
- self.log.debug("bulk: Creating real table: %s" % q)
+ self.log.debug("bulk: Creating real table: %s", q)
curs.execute(q)
return tempname, quote_fqident(tempname)
elif USE_LONGLIVED_TEMP_TABLES:
if skytools.exists_temp_table(curs, tempname):
- self.log.debug("bulk: Using existing temp table %s" % tempname)
+ self.log.debug("bulk: Using existing temp table %s", tempname)
return tempname, quote_ident(tempname)
# bizgres crashes on delete rows
# create temp table for loading
q = "create temp table %s (like %s) %s" % (
quote_ident(tempname), quote_fqident(self.dest_table), arg)
- self.log.debug("bulk: Creating temp table: %s" % q)
+ self.log.debug("bulk: Creating temp table: %s", q)
curs.execute(q)
return tempname, quote_ident(tempname)
def logexec(self, curs, sql):
"""Logs and executes sql statement"""
- self.log.debug('exec: %s' % sql)
+ self.log.debug('exec: %s', sql)
curs.execute(sql)
- self.log.debug('msg: %s, rows: %s' % (
- curs.statusmessage, curs.rowcount))
+ self.log.debug('msg: %s, rows: %s', curs.statusmessage, curs.rowcount)
# create sql parts
cnt = len(data)
if (cnt == 0):
return
- self.log.debug("bulk: Deleting %d rows from %s" % (cnt, self.table))
+ self.log.debug("bulk: Deleting %d rows from %s", cnt, self.table)
# copy rows to temp
self.bulk_insert(curs, data)
# delete rows using temp
self.delete(curs)
# check if right amount of rows deleted (only in direct mode)
if self.conf.table_mode == 'direct' and cnt != curs.rowcount:
- self.log.warning("%s: Delete mismatch: expected=%s deleted=%d"
- % (self.table, cnt, curs.rowcount))
+ self.log.warning("%s: Delete mismatch: expected=%s deleted=%d",
+ self.table, cnt, curs.rowcount)
def process_update(self, curs, op_map):
"""Process update list"""
cnt = len(data)
if (cnt == 0):
return
- self.log.debug("bulk: Updating %d rows in %s" % (cnt, self.table))
+ self.log.debug("bulk: Updating %d rows in %s", cnt, self.table)
# copy rows to temp
self.bulk_insert(curs, data)
if self.method == METH_CORRECT:
self.update(curs)
# check count (only in direct mode)
if self.conf.table_mode == 'direct' and cnt != curs.rowcount:
- self.log.warning("%s: Update mismatch: expected=%s updated=%d"
- % (self.table, cnt, curs.rowcount))
+ self.log.warning("%s: Update mismatch: expected=%s updated=%d",
+ self.table, cnt, curs.rowcount)
else:
# delete from main table using temp
self.delete(curs)
# check count (only in direct mode)
if self.conf.table_mode == 'direct' and real_cnt != curs.rowcount:
- self.log.warning("%s: Update mismatch: expected=%s deleted=%d"
- % (self.table, real_cnt, curs.rowcount))
+ self.log.warning("%s: Update mismatch: expected=%s deleted=%d",
+ self.table, real_cnt, curs.rowcount)
# insert into main table
if AVOID_BIZGRES_BUG:
# copy again, into main table
# merged method loads inserts together with updates
if (cnt == 0) or (self.method == METH_MERGED):
return
- self.log.debug("bulk: Inserting %d rows into %s" % (cnt, self.table))
+ self.log.debug("bulk: Inserting %d rows into %s", cnt, self.table)
# copy into target table (no temp used)
self.bulk_insert(curs, data, table = self.qtable)
def bulk_flush(self, curs, op_map):
- self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)" % (
- self.table, len(op_map['I']), len(op_map['U']), len(op_map['D'])))
+ self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)", self.table,
+ len(op_map['I']), len(op_map['U']), len(op_map['D']))
# fetch distribution fields
if self.dist_fields is None:
self.dist_fields = self.find_dist_fields(curs)
- self.log.debug("Key fields: %s Dist fields: %s" % (
- ",".join(self.pkeys), ",".join(self.dist_fields)))
+ self.log.debug("Key fields: %s Dist fields: %s",
+ ",".join(self.pkeys), ",".join(self.dist_fields))
# add them to key
for key in self.dist_fields:
if key not in self.keys:
"""
if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE:
if self.temp_present:
- self.log.debug("bulk: Using existing temp table %s" % self.temp)
+ self.log.debug("bulk: Using existing temp table %s", self.temp)
return False
self.create(curs)
self.temp_present = True
# truncate when re-using existing table
if not self.create_temp(curs):
self.truncate(curs)
- self.log.debug("bulk: COPY %d rows into %s" % (len(data), table))
+ self.log.debug("bulk: COPY %d rows into %s", len(data), table)
skytools.magic_insert(curs, table, data, self.fields,
quoted_table = True)
if _use_temp and self.run_analyze:
BaseHandler.__init__(self, table_name, args, dest_table)
# show args
- self.log.debug("dispatch.init: table_name=%r, args=%r" % \
- (table_name, args))
+ self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
self.batch_info = None
self.dst_curs = None
self.pkeys = None
# process only operations specified
if not op in self.conf.event_types:
return
- self.log.debug('dispatch.process_event: %s/%s' % (
- ev.ev_type, ev.ev_data))
+ self.log.debug('dispatch.process_event: %s/%s', ev.ev_type, ev.ev_data)
if self.pkeys is None:
self.pkeys = self.filter_pkeys(pkeys.split(','))
data = self.filter_data(data)
have_func = skytools.exists_function(curs, PART_FUNC_OLD, len(PART_FUNC_ARGS))
if have_func:
- self.log.debug('check_part.exec: func: %s, args: %s' % (pfcall, vals))
+ self.log.debug('check_part.exec: func: %s, args: %s', pfcall, vals)
curs.execute(pfcall, vals)
else:
#
# - check constraints
# - inheritance
#
- self.log.debug('part func %s not found, cloning table' % self.conf.part_func)
+ self.log.debug('part func %s not found, cloning table', self.conf.part_func)
struct = TableStruct(curs, self.dest_table)
struct.create(curs, T_ALL, dst)
exec_with_vals(self.conf.post_part)
- self.log.info("Created table: %s" % dst)
+ self.log.info("Created table: %s", dst)
if self.conf.retention_period:
self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
func = RETENTION_FUNC
args = [parent_table, retention_period, partition_period]
sql = "select " + func + " (%s, %s, %s)"
- self.log.debug("func: %s, args: %s" % (func, args))
- curs.execute (sql, args)
+ self.log.debug("func: %s, args: %s", func, args)
+ curs.execute(sql, args)
res = []
for row in curs.fetchall():
res.append(row[0])
"""Filter event by hash in extra3, apply only local part."""
if ev.extra3:
meta = skytools.db_urldecode(ev.extra3)
- self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d' %\
- (int(meta['hash']), self.max_part, self.local_part))
+ self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d',
+ int(meta['hash']), self.max_part, self.local_part)
if (int(meta['hash']) & self.max_part) != self.local_part:
self.log.debug('part.process_event: not my event')
return
"""Prepare the where condition for copy and replay filtering"""
self.load_part_info(dst_curs)
w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
- self.log.debug('part: copy_condition=%s' % w)
+ self.log.debug('part: copy_condition=%s', w)
return w
def load_part_info(self, curs):
"""Set snapshot."""
if self.str_snapshot == str_snapshot:
return
- self.log.debug("%s: change_snapshot to %s" % (self.name, str_snapshot))
+ self.log.debug("%s: change_snapshot to %s", self.name, str_snapshot)
self.str_snapshot = str_snapshot
if str_snapshot:
self.from_snapshot = skytools.Snapshot(str_snapshot)
self.state = state
self.sync_tick_id = tick_id
self.changed = 1
- self.log.debug("%s: change_state to %s" % (self.name,
- self.render_state()))
+ self.log.debug("%s: change_state to %s", self.name, self.render_state())
def render_state(self):
"""Make a string to be stored in db."""
def loaded_state(self, row):
"""Update object with info from db."""
- self.log.debug("loaded_state: %s: %s / %s" % (
- self.name, row['merge_state'], row['custom_snapshot']))
+ self.log.debug("loaded_state: %s: %s / %s",
+ self.name, row['merge_state'], row['custom_snapshot'])
self.change_snapshot(row['custom_snapshot'], 0)
self.state = self.parse_state(row['merge_state'])
self.changed = 0
# compare: sql to use
#compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t
# workaround for hashtext change between 8.3 and 8.4
- #compare_sql = select count(1) as cnt, sum(('x'||substr(md5(t.*::text),1,16))::bit(64)::bigint) as chksum from only _TABLE_ t
+ #compare_sql = select count(1) as cnt, sum(('x'||substr(md5(t.*::text),1,16))::bit(64)::bigint) as chksum from only _TABLE_ t
#compare_fmt = %(cnt)d rows, checksum=%(chksum)s
"""
else:
# regular provider is used
if t.name not in pmap:
- self.log.warning("Table %s not available on provider" % t.name)
+ self.log.warning("Table %s not available on provider", t.name)
continue
pt = pmap[t.name]
if pt.state != TABLE_OK: # or pt.custom_snapshot: # FIXME: does snapsnot matter?
- self.log.info("Table %s not OK on provider, waiting" % t.name)
+ self.log.info("Table %s not OK on provider, waiting", t.name)
continue
# dont allow more copies than configured
# somebody may have done remove-table in the meantime
if self.copy_table_name not in self.table_map:
- self.log.error("copy_sync: lost table: %s" % self.copy_table_name)
+ self.log.error("copy_sync: lost table: %s", self.copy_table_name)
return SYNC_EXIT
# This operates on single table
elif self.cur_tick < t.sync_tick_id:
return SYNC_OK
else:
- self.log.error("copy_sync: cur_tick=%d sync_tick=%d" % (
- self.cur_tick, t.sync_tick_id))
+ self.log.error("copy_sync: cur_tick=%d sync_tick=%d",
+ self.cur_tick, t.sync_tick_id)
raise Exception('Invalid table state')
elif t.state == TABLE_WANNA_SYNC:
# wait for main thread to react
def process_remote_event(self, src_curs, dst_curs, ev):
"""handle one event"""
- self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s" % (ev.id, ev.type, ev.data, ev.extra1))
+ self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s", ev.id, ev.type, ev.data, ev.extra1)
# set current_event only if processing them one-by-one
if self.work_state < 0:
if not t.changed:
continue
merge_state = t.render_state()
- self.log.info("storing state of %s: copy:%d new_state:%s" % (
- t.name, self.copy_thread, merge_state))
+ self.log.info("storing state of %s: copy:%d new_state:%s",
+ t.name, self.copy_thread, merge_state)
q = "select londiste.local_set_table_state(%s, %s, %s, %s)"
curs.execute(q, [self.set_name,
t.name, t.str_snapshot, merge_state])
self.save_table_state(dst_db.cursor())
dst_db.commit()
- self.log.info("Table %s status changed to '%s'" % (
- tbl.name, tbl.render_state()))
+ self.log.info("Table %s status changed to '%s'",
+ tbl.name, tbl.render_state())
def get_tables_in_state(self, state):
"get all tables with specific state"
time.sleep(2)
# launch and wait for daemonization result
- self.log.debug("Launch args: "+repr(cmd))
+ self.log.debug("Launch args: %r", cmd)
res = os.spawnvp(os.P_WAIT, script, cmd)
- self.log.debug("Launch result: "+repr(res))
+ self.log.debug("Launch result: %r", res)
if res != 0:
- self.log.error("Failed to launch copy process, result=%d" % res)
+ self.log.error("Failed to launch copy process, result=%d", res)
def sync_database_encodings(self, src_db, dst_db):
"""Make sure client_encoding is same on both side."""
if __name__ == '__main__':
script = Replicator(sys.argv[1:])
script.start()
-
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- self.log.info('Checking %s' % dst_tbl)
+ self.log.info('Checking %s', dst_tbl)
self.common_fields = []
self.fq_common_fields = []
dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
src_where = dst_where
- self.log.info("Dumping src table: %s" % src_tbl)
+ self.log.info("Dumping src table: %s", src_tbl)
self.dump_table(src_tbl, src_curs, dump_src, src_where)
src_db.commit()
- self.log.info("Dumping dst table: %s" % dst_tbl)
+ self.log.info("Dumping dst table: %s", dst_tbl)
self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where)
dst_db.commit()
-
- self.log.info("Sorting src table: %s" % dump_src)
+
+ self.log.info("Sorting src table: %s", dump_src)
self.do_sort(dump_src, dump_src + '.sorted')
- self.log.info("Sorting dst table: %s" % dump_dst)
+ self.log.info("Sorting dst table: %s", dump_dst)
self.do_sort(dump_dst, dump_dst + '.sorted')
self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted")
self.fq_common_fields = fqlist
cols = ",".join(fqlist)
- self.log.debug("using columns: %s" % cols)
+ self.log.debug("using columns: %s", cols)
def dump_table(self, tbl, curs, fn, whr):
"""Dump table to disk."""
if len(whr) == 0:
whr = 'true'
q = "copy (SELECT %s FROM %s WHERE %s) to stdout" % (cols, skytools.quote_fqident(tbl), whr)
- self.log.debug("Query: %s" % q)
+ self.log.debug("Query: %s", q)
f = open(fn, "w", 64*1024)
curs.copy_expert(q, f)
size = f.tell()
f.close()
- self.log.info('%s: Got %d bytes' % (tbl, size))
+ self.log.info('%s: Got %d bytes', tbl, size)
def get_row(self, ln):
"""Parse a row into dict."""
def dump_compare(self, tbl, src_fn, dst_fn):
"""Dump + compare single table."""
- self.log.info("Comparing dumps: %s" % tbl)
+ self.log.info("Comparing dumps: %s", tbl)
self.cnt_insert = 0
self.cnt_update = 0
self.cnt_delete = 0
dst_ln = f2.readline()
if dst_ln: self.total_dst += 1
- self.log.info("finished %s: src: %d rows, dst: %d rows,"\
- " missed: %d inserts, %d updates, %d deletes" % (
+ self.log.info("finished %s: src: %d rows, dst: %d rows,"
+ " missed: %d inserts, %d updates, %d deletes",
tbl, self.total_src, self.total_dst,
- self.cnt_insert, self.cnt_update, self.cnt_delete))
+ self.cnt_insert, self.cnt_update, self.cnt_delete)
def got_missed_insert(self, tbl, src_row):
"""Create sql for missed insert."""
def show_fix(self, tbl, q, desc):
"""Print/write/apply repair sql."""
- self.log.info("missed %s: %s" % (desc, q))
+ self.log.info("missed %s: %s", desc, q)
if self.apply_curs:
self.apply_curs.execute(q)
else:
def cmp_keys(self, src_row, dst_row):
"""Compare primary keys of the rows.
-
+
Returns 1 if src > dst, -1 if src < dst and 0 if src == dst"""
# None means table is done. tag it larger than any existing row.
elif v1 > v2:
return 1
return 0
-
for tbl in args:
tbl = skytools.fq_name(tbl)
if (tbl in src_tbls) and not src_tbls[tbl]['local']:
- self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl)
+ self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
problems = True
if problems:
self.log.error("Problems, canceling operation")
if create_flags:
if tbl_exists:
- self.log.info('Table %s already exist, not touching' % desc)
+ self.log.info('Table %s already exist, not touching', desc)
else:
src_dest_table = src_tbls[tbl]['dest_table']
if not skytools.exists_table(src_curs, src_dest_table):
# table not present on provider - nowhere to get the DDL from
- self.log.warning('Table %s missing on provider, cannot create, skipping' % desc)
+ self.log.warning('Table %s missing on provider, cannot create, skipping', desc)
return
schema = skytools.fq_name_parts(dest_table)[0]
if not skytools.exists_schema(dst_curs, schema):
for tbl in src_tbls.keys():
q = "select * from londiste.global_add_table(%s, %s)"
if tbl not in dst_tbls:
- self.log.info("Table %s info missing from subscriber, adding" % tbl)
+ self.log.info("Table %s info missing from subscriber, adding", tbl)
self.exec_cmd(dst_curs, q, [self.set_name, tbl])
dst_tbls[tbl] = {'local': False, 'dest_table': tbl}
for tbl in dst_tbls.keys():
" where table_name = %s and local"
curs.execute(q, [self.set_name, tbl])
if curs.rowcount == 0:
- self.log.error("Table %s not found on this node" % tbl)
+ self.log.error("Table %s not found on this node", tbl)
sys.exit(1)
attrs, dest_table = curs.fetchone()
seq_exists = skytools.exists_sequence(dst_curs, seq)
if create_flags:
if seq_exists:
- self.log.info('Sequence %s already exist, not creating' % seq)
+ self.log.info('Sequence %s already exist, not creating', seq)
else:
if not skytools.exists_sequence(src_curs, seq):
# sequence not present on provider - nowhere to get the DDL from
- self.log.warning('Sequence "%s" missing on provider, skipping' % seq)
+ self.log.warning('Sequence "%s" missing on provider, skipping', seq)
return
s = skytools.SeqStruct(src_curs, seq)
src_db.commit()
s.create(dst_curs, create_flags, log = self.log)
elif not seq_exists:
- self.log.warning('Sequence "%s" missing on subscriber, use --create if necessary' % seq)
+ self.log.warning('Sequence "%s" missing on subscriber, use --create if necessary', seq)
return
q = "select * from londiste.local_add_seq(%s, %s)"
for seq in src_seqs.keys():
q = "select * from londiste.global_update_seq(%s, %s, %s)"
if seq not in dst_seqs:
- self.log.info("Sequence %s info missing from subscriber, adding" % seq)
+ self.log.info("Sequence %s info missing from subscriber, adding", seq)
self.exec_cmd(dst_curs, q, [self.set_name, seq, src_seqs[seq]['last_value']])
tmp = src_seqs[seq].copy()
tmp['local'] = False
res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False)
ret = res[0]['ret_code']
if ret >= 300:
- self.log.warning("Skipping execution of '%s'" % fname)
+ self.log.warning("Skipping execution of '%s'", fname)
continue
if attrs.need_execute(curs, local_tables, local_seqs):
self.log.info("%s: executing sql", fname)
res_list.append(a)
res_map[a] = 1
elif a in reverse_map:
- self.log.info("%s already processed" % a)
+ self.log.info("%s already processed", a)
elif allow_nonexist:
res_list.append(a)
res_map[a] = 1
elif self.options.force:
- self.log.warning("%s not available, but --force is used" % a)
+ self.log.warning("%s not available, but --force is used", a)
res_list.append(a)
res_map[a] = 1
else:
- self.log.warning("%s not available" % a)
+ self.log.warning("%s not available", a)
err = 1
if err:
raise skytools.UsageError("Cannot proceed")
for tbl in tlist:
tbl = skytools.fq_name(tbl)
if not tbl in dst_tables:
- self.log.warning('Table not subscribed: %s' % tbl)
+ self.log.warning('Table not subscribed: %s', tbl)
continue
t2 = dst_tables[tbl]
if t2.merge_state != 'ok':
- self.log.warning('Table %s not synced yet, no point' % tbl)
+ self.log.warning('Table %s not synced yet, no point', tbl)
continue
pnode, ploc, wname = find_copy_source(self, self.queue_name, tbl, pnode, ploc)
src_tables, ignore = self.get_tables(src_db)
if not tbl in src_tables:
- self.log.warning('Table not available on provider: %s' % tbl)
+ self.log.warning('Table not available on provider: %s', tbl)
return
t1 = src_tables[tbl]
if t1.merge_state != 'ok':
- self.log.warning('Table %s not ready yet on provider' % tbl)
+ self.log.warning('Table %s not ready yet on provider', tbl)
return
#self.check_consumer(setup_db, dst_db)
dst_curs = dst_db.cursor()
if not skytools.exists_table(src_curs, src_tbl):
- self.log.warning("Table %s does not exist on provider side" % src_tbl)
+ self.log.warning("Table %s does not exist on provider side", src_tbl)
return
if not skytools.exists_table(dst_curs, dst_tbl):
- self.log.warning("Table %s does not exist on subscriber side" % dst_tbl)
+ self.log.warning("Table %s does not exist on subscriber side", dst_tbl)
return
# lock table against changes
lock_curs = lock_db.cursor()
# lock table in separate connection
- self.log.info('Locking %s' % src_tbl)
+ self.log.info('Locking %s', src_tbl)
lock_db.commit()
self.set_lock_timeout(lock_curs)
lock_time = time.time()
lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(src_tbl))
# now wait until consumer has updated target table until locking
- self.log.info('Syncing %s' % dst_tbl)
+ self.log.info('Syncing %s', dst_tbl)
# consumer must get futher than this tick
tick_id = self.force_tick(setup_curs)
self.old_worker_paused = self.pause_consumer(setup_curs, self.provider_info['worker_name'])
lock_curs = lock_db.cursor()
- self.log.info('Syncing %s' % dst_tbl)
+ self.log.info('Syncing %s', dst_tbl)
# consumer must get futher than this tick
tick_id = self.force_tick(setup_curs, False)
break
time.sleep(0.5)
return oldflag
-
if tbl_stat.copy_role == 'wait-copy':
self.log.info('waiting for first partition to initialize copy')
elif tbl_stat.max_parallel_copies_reached():
- self.log.info('number of max parallel copies (%s) reached' %\
+ self.log.info('number of max parallel copies (%s) reached',
tbl_stat.max_parallel_copy)
else:
break
if pt.state == TABLE_OK:
break
- self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
+ self.log.warning("table %s not in sync yet on provider, waiting", tbl_stat.name)
time.sleep(10)
src_real_table = pt.dest_table
self.sync_database_encodings(src_db, dst_db)
- self.log.info("Starting full copy of %s" % tbl_stat.name)
+ self.log.info("Starting full copy of %s", tbl_stat.name)
# just in case, drop all fkeys (in case "replay" was skipped)
# !! this may commit, so must be done before anything else !!
common_cols = []
for c in slist:
if c not in dlist:
- self.log.warning("Table %s column %s does not exist on subscriber"
- % (tbl_stat.name, c))
+ self.log.warning("Table %s column %s does not exist on subscriber",
+ tbl_stat.name, c)
else:
common_cols.append(c)
for c in dlist:
if c not in slist:
- self.log.warning("Table %s column %s does not exist on provider"
- % (tbl_stat.name, c))
+ self.log.warning("Table %s column %s does not exist on provider",
+ tbl_stat.name, c)
# drop unnecessary stuff
if cmode > 0:
# drop data
if tbl_stat.table_attrs.get('skip_truncate'):
- self.log.info("%s: skipping truncate" % tbl_stat.name)
+ self.log.info("%s: skipping truncate", tbl_stat.name)
else:
- self.log.info("%s: truncating" % tbl_stat.name)
+ self.log.info("%s: truncating", tbl_stat.name)
q = "truncate "
if dst_db.server_version >= 80400:
q += "only "
tbl_stat.dropped_ddl = ddl
# do truncate & copy
- self.log.info("%s: start copy" % tbl_stat.name)
+ self.log.info("%s: start copy", tbl_stat.name)
p = tbl_stat.get_plugin()
stats = p.real_copy(src_real_table, src_curs, dst_curs, common_cols)
if stats:
- self.log.info("%s: copy finished: %d bytes, %d rows" % (
- tbl_stat.name, stats[0], stats[1]))
+ self.log.info("%s: copy finished: %d bytes, %d rows",
+ tbl_stat.name, stats[0], stats[1])
# get snapshot
src_curs.execute("select txid_current_snapshot()")
if __name__ == '__main__':
script = CopyTable(sys.argv[1:])
script.start()
-