g.add_option("--expect-sync", action="store_true", dest="expect_sync",
help = "add: no copy needed", default=False)
g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
- help = "copy: keep old data", default=False)
+ help = "add: keep old data", default=False)
g.add_option("--rewind", action="store_true",
help = "replay: sync queue pos with subscriber")
g.add_option("--reset", action="store_true",
self.log = log
self.forget()
self.changed = 0
+ self.skip_truncate = False
def forget(self):
self.state = TABLE_MISSING
self.sync_tick_id = None
self.ok_batch_count = 0
self.last_tick = 0
+ self.skip_truncate = False
self.changed = 1
def change_snapshot(self, str_snapshot, tag_changed = 1):
return state
- def loaded_state(self, merge_state, str_snapshot):
+ def loaded_state(self, merge_state, str_snapshot, skip_truncate):
self.log.debug("loaded_state: %s: %s / %s" % (
self.name, merge_state, str_snapshot))
self.change_snapshot(str_snapshot, 0)
self.state = self.parse_state(merge_state)
self.changed = 0
+ self.skip_truncate = skip_truncate
if merge_state == "?":
self.changed = 1
self.fill_mirror_queue(mirror_list, dst_curs)
def handle_data_event(self, ev, dst_curs):
+ # buffer SQL statements, then send them together
fmt = self.sql_command[ev.type]
sql = fmt % (ev.extra1, ev.data)
self.sql_list.append(sql)
ev.tag_done()
def flush_sql(self, dst_curs):
+ # send all buffered statements at once
+
if len(self.sql_list) == 0:
return
to load state on every batch.
"""
- q = """select table_name, snapshot, merge_state
- from londiste.subscriber_get_table_list(%s)
- """
+ q = "select table_name, snapshot, merge_state, skip_truncate"\
+ " from londiste.subscriber_get_table_list(%s)"
curs.execute(q, [self.pgq_queue_name])
new_list = []
t = self.get_table_by_name(row['table_name'])
if not t:
t = TableState(row['table_name'], self.log)
- t.loaded_state(row['merge_state'], row['snapshot'])
+ t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate'])
new_list.append(t)
new_map[t.name] = t
p = skytools.DBScript.init_optparse(self, parser)
p.add_option("--expect-sync", action="store_true", dest="expect_sync",
help = "no copy needed", default=False)
+ p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
+ help = "dont delete old data", default=False)
p.add_option("--force", action="store_true",
help="force", default=False)
return p
else:
sys.exit(1)
+ dst_db = self.get_database('subscriber_db')
+ dst_curs = dst_db.cursor()
for tbl in table_list:
tbl = skytools.fq_name(tbl)
if tbl in subscriber_tables:
self.log.info("Table %s already added" % tbl)
else:
self.log.info("Adding %s" % tbl)
- self.subscriber_add_one_table(tbl)
+ self.subscriber_add_one_table(dst_curs, tbl)
+ dst_db.commit()
def subscriber_remove_tables(self, table_list):
subscriber_tables = self.get_subscriber_table_list()
dst_curs.execute(q, [self.pgq_queue_name, tbl])
dst_db.commit()
- def subscriber_add_one_table(self, tbl):
+ def subscriber_add_one_table(self, dst_curs, tbl):
q = "select londiste.subscriber_add_table(%s, %s)"
- dst_db = self.get_database('subscriber_db')
- dst_curs = dst_db.cursor()
dst_curs.execute(q, [self.pgq_queue_name, tbl])
- if self.options.expect_sync:
+ if self.options.expect_sync and self.options.skip_truncate:
+ self.log.error("Too many options: --expect-sync and --skip-truncate")
+ sys.exit(1)
+ elif self.options.expect_sync:
q = "select londiste.subscriber_set_table_state(%s, %s, null, 'ok')"
dst_curs.execute(q, [self.pgq_queue_name, tbl])
- dst_db.commit()
+ elif self.options.skip_truncate:
+ q = "select londiste.subscriber_set_skip_truncate(%s, %s, true)"
+ dst_curs.execute(q, [self.pgq_queue_name, tbl])
def subscriber_remove_one_table(self, tbl):
q = "select londiste.subscriber_remove_table(%s, %s)"
self.consumer_id += "_copy"
self.copy_thread = 1
- def init_optparse(self, parser=None):
- p = Replicator.init_optparse(self, parser)
- p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
- help = "avoid truncate", default=False)
- return p
-
def do_copy(self, tbl_stat):
src_db = self.get_database('provider_db')
dst_db = self.get_database('subscriber_db')
dst_db.commit()
# change to SERIALIZABLE isolation level
- src_db.set_isolation_level(2)
+ src_db.set_isolation_level(skytools.I_SERIALIZABLE)
src_db.commit()
# initial sync copy
dst_struct.drop(dst_curs, objs, log = self.log)
# do truncate & copy
- self.real_copy(src_curs, dst_curs, tbl_stat.name)
+ self.real_copy(src_curs, dst_curs, tbl_stat)
# get snapshot
src_curs.execute("select get_current_snapshot()")
self.save_table_state(dst_curs)
dst_db.commit()
- def real_copy(self, srccurs, dstcurs, tablename):
+ def real_copy(self, srccurs, dstcurs, tbl_stat):
"Main copy logic."
+ tablename = tbl_stat.name
# drop data
- if self.options.skip_truncate:
+ if tbl_stat.skip_truncate:
self.log.info("%s: skipping truncate" % tablename)
else:
self.log.info("%s: truncating" % tablename)