From 1d2342d0ec3c3e01fa6c1256fe81628592fbdf24 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Mon, 9 Apr 2007 12:09:13 +0000 Subject: [PATCH] move --skip-truncate switch to subscriber-add, keep state in db --- python/londiste.py | 2 +- python/londiste/playback.py | 15 ++++++++++----- python/londiste/setup.py | 20 ++++++++++++++------ python/londiste/table_copy.py | 15 +++++---------- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/python/londiste.py b/python/londiste.py index 9e5684ea..916b104f 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -115,7 +115,7 @@ class Londiste(skytools.DBScript): g.add_option("--expect-sync", action="store_true", dest="expect_sync", help = "add: no copy needed", default=False) g.add_option("--skip-truncate", action="store_true", dest="skip_truncate", - help = "copy: keep old data", default=False) + help = "add: keep old data", default=False) g.add_option("--rewind", action="store_true", help = "replay: sync queue pos with subscriber") g.add_option("--reset", action="store_true", diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 2bcb1bc7..bba5604a 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -57,6 +57,7 @@ class TableState(object): self.log = log self.forget() self.changed = 0 + self.skip_truncate = False def forget(self): self.state = TABLE_MISSING @@ -65,6 +66,7 @@ class TableState(object): self.sync_tick_id = None self.ok_batch_count = 0 self.last_tick = 0 + self.skip_truncate = False self.changed = 1 def change_snapshot(self, str_snapshot, tag_changed = 1): @@ -135,12 +137,13 @@ class TableState(object): return state - def loaded_state(self, merge_state, str_snapshot): + def loaded_state(self, merge_state, str_snapshot, skip_truncate): self.log.debug("loaded_state: %s: %s / %s" % ( self.name, merge_state, str_snapshot)) self.change_snapshot(str_snapshot, 0) self.state = self.parse_state(merge_state) self.changed = 0 + self.skip_truncate = skip_truncate if merge_state == "?": self.changed = 1 @@ -408,6 +411,7 @@ class Replicator(pgq.SerialConsumer): self.fill_mirror_queue(mirror_list, dst_curs) def handle_data_event(self, ev, dst_curs): + # buffer SQL statements, then send them together fmt = self.sql_command[ev.type] sql = fmt % (ev.extra1, ev.data) self.sql_list.append(sql) @@ -416,6 +420,8 @@ class Replicator(pgq.SerialConsumer): ev.tag_done() def flush_sql(self, dst_curs): + # send all buffered statements at once + if len(self.sql_list) == 0: return @@ -472,9 +478,8 @@ class Replicator(pgq.SerialConsumer): to load state on every batch. """ - q = """select table_name, snapshot, merge_state - from londiste.subscriber_get_table_list(%s) - """ + q = "select table_name, snapshot, merge_state, skip_truncate"\ + " from londiste.subscriber_get_table_list(%s)" curs.execute(q, [self.pgq_queue_name]) new_list = [] @@ -483,7 +488,7 @@ class Replicator(pgq.SerialConsumer): t = self.get_table_by_name(row['table_name']) if not t: t = TableState(row['table_name'], self.log) - t.loaded_state(row['merge_state'], row['snapshot']) + t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate']) new_list.append(t) new_map[t.name] = t diff --git a/python/londiste/setup.py b/python/londiste/setup.py index ed44b093..d7fe6897 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -120,6 +120,8 @@ class CommonSetup(skytools.DBScript): p = skytools.DBScript.init_optparse(self, parser) p.add_option("--expect-sync", action="store_true", dest="expect_sync", help = "no copy needed", default=False) + p.add_option("--skip-truncate", action="store_true", dest="skip_truncate", + help = "dont delete old data", default=False) p.add_option("--force", action="store_true", help="force", default=False) return p @@ -459,13 +461,16 @@ class SubscriberSetup(CommonSetup): else: sys.exit(1) + dst_db = self.get_database('subscriber_db') + dst_curs = dst_db.cursor() for tbl in table_list: tbl = skytools.fq_name(tbl) if tbl in subscriber_tables: self.log.info("Table %s already added" % tbl) else: self.log.info("Adding %s" % tbl) - self.subscriber_add_one_table(tbl) + self.subscriber_add_one_table(dst_curs, tbl) + dst_db.commit() def subscriber_remove_tables(self, table_list): subscriber_tables = self.get_subscriber_table_list() @@ -497,16 +502,19 @@ class SubscriberSetup(CommonSetup): dst_curs.execute(q, [self.pgq_queue_name, tbl]) dst_db.commit() - def subscriber_add_one_table(self, tbl): + def subscriber_add_one_table(self, dst_curs, tbl): q = "select londiste.subscriber_add_table(%s, %s)" - dst_db = self.get_database('subscriber_db') - dst_curs = dst_db.cursor() dst_curs.execute(q, [self.pgq_queue_name, tbl]) - if self.options.expect_sync: + if self.options.expect_sync and self.options.skip_truncate: + self.log.error("Too many options: --expect-sync and --skip-truncate") + sys.exit(1) + elif self.options.expect_sync: q = "select londiste.subscriber_set_table_state(%s, %s, null, 'ok')" dst_curs.execute(q, [self.pgq_queue_name, tbl]) - dst_db.commit() + elif self.options.skip_truncate: + q = "select londiste.subscriber_set_skip_truncate(%s, %s, true)" + dst_curs.execute(q, [self.pgq_queue_name, tbl]) def subscriber_remove_one_table(self, tbl): q = "select londiste.subscriber_remove_table(%s, %s)" diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index 1754baaf..b8b4cb96 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -21,12 +21,6 @@ class CopyTable(Replicator): self.consumer_id += "_copy" self.copy_thread = 1 - def init_optparse(self, parser=None): - p = Replicator.init_optparse(self, parser) - p.add_option("--skip-truncate", action="store_true", dest="skip_truncate", - help = "avoid truncate", default=False) - return p - def do_copy(self, tbl_stat): src_db = self.get_database('provider_db') dst_db = self.get_database('subscriber_db') @@ -36,7 +30,7 @@ class CopyTable(Replicator): dst_db.commit() # change to SERIALIZABLE isolation level - src_db.set_isolation_level(2) + src_db.set_isolation_level(skytools.I_SERIALIZABLE) src_db.commit() # initial sync copy @@ -60,7 +54,7 @@ class CopyTable(Replicator): dst_struct.drop(dst_curs, objs, log = self.log) # do truncate & copy - self.real_copy(src_curs, dst_curs, tbl_stat.name) + self.real_copy(src_curs, dst_curs, tbl_stat) # get snapshot src_curs.execute("select get_current_snapshot()") @@ -83,11 +77,12 @@ class CopyTable(Replicator): self.save_table_state(dst_curs) dst_db.commit() - def real_copy(self, srccurs, dstcurs, tablename): + def real_copy(self, srccurs, dstcurs, tbl_stat): "Main copy logic." + tablename = tbl_stat.name # drop data - if self.options.skip_truncate: + if tbl_stat.skip_truncate: self.log.info("%s: skipping truncate" % tablename) else: self.log.info("%s: truncating" % tablename) -- 2.39.5