move --skip-truncate switch to subscriber-add, keep state in db
authorMarko Kreen <markokr@gmail.com>
Mon, 9 Apr 2007 12:09:13 +0000 (12:09 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 9 Apr 2007 12:09:13 +0000 (12:09 +0000)
python/londiste.py
python/londiste/playback.py
python/londiste/setup.py
python/londiste/table_copy.py

index 9e5684ea97888d977ba0da11724e61e6b6407a26..916b104f7e4138b132b0e3f03ab7481a7c194b6c 100755 (executable)
@@ -115,7 +115,7 @@ class Londiste(skytools.DBScript):
         g.add_option("--expect-sync", action="store_true", dest="expect_sync",
                 help = "add: no copy needed", default=False)
         g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
-                help = "copy: keep old data", default=False)
+                help = "add: keep old data", default=False)
         g.add_option("--rewind", action="store_true",
                 help = "replay: sync queue pos with subscriber")
         g.add_option("--reset", action="store_true",
index 2bcb1bc78ecad42567a3a0b5c203e6159d5958b3..bba5604adf5c9f0f4f7015beacdf2bcb2da8a264 100644 (file)
@@ -57,6 +57,7 @@ class TableState(object):
         self.log = log
         self.forget()
         self.changed = 0
+        self.skip_truncate = False
 
     def forget(self):
         self.state = TABLE_MISSING
@@ -65,6 +66,7 @@ class TableState(object):
         self.sync_tick_id = None
         self.ok_batch_count = 0
         self.last_tick = 0
+        self.skip_truncate = False
         self.changed = 1
 
     def change_snapshot(self, str_snapshot, tag_changed = 1):
@@ -135,12 +137,13 @@ class TableState(object):
 
         return state
 
-    def loaded_state(self, merge_state, str_snapshot):
+    def loaded_state(self, merge_state, str_snapshot, skip_truncate):
         self.log.debug("loaded_state: %s: %s / %s" % (
                        self.name, merge_state, str_snapshot))
         self.change_snapshot(str_snapshot, 0)
         self.state = self.parse_state(merge_state)
         self.changed = 0
+        self.skip_truncate = skip_truncate
         if merge_state == "?":
             self.changed = 1
 
@@ -408,6 +411,7 @@ class Replicator(pgq.SerialConsumer):
             self.fill_mirror_queue(mirror_list, dst_curs)
 
     def handle_data_event(self, ev, dst_curs):
+        # buffer SQL statements, then send them together
         fmt = self.sql_command[ev.type]
         sql = fmt % (ev.extra1, ev.data)
         self.sql_list.append(sql)
@@ -416,6 +420,8 @@ class Replicator(pgq.SerialConsumer):
         ev.tag_done()
 
     def flush_sql(self, dst_curs):
+        # send all buffered statements at once
+
         if len(self.sql_list) == 0:
             return
 
@@ -472,9 +478,8 @@ class Replicator(pgq.SerialConsumer):
         to load state on every batch.
         """
 
-        q = """select table_name, snapshot, merge_state
-               from londiste.subscriber_get_table_list(%s)
-            """
+        q = "select table_name, snapshot, merge_state, skip_truncate"\
+            "  from londiste.subscriber_get_table_list(%s)"
         curs.execute(q, [self.pgq_queue_name])
 
         new_list = []
@@ -483,7 +488,7 @@ class Replicator(pgq.SerialConsumer):
             t = self.get_table_by_name(row['table_name'])
             if not t:
                 t = TableState(row['table_name'], self.log)
-            t.loaded_state(row['merge_state'], row['snapshot'])
+            t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate'])
             new_list.append(t)
             new_map[t.name] = t
 
index ed44b093e8e861980ca40c8635811c2bfeb1a11e..d7fe6897711bfcd47a6a63db32e68c7fbb9e6982 100644 (file)
@@ -120,6 +120,8 @@ class CommonSetup(skytools.DBScript):
         p = skytools.DBScript.init_optparse(self, parser)
         p.add_option("--expect-sync", action="store_true", dest="expect_sync",
                     help = "no copy needed", default=False)
+        p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
+                    help = "dont delete old data", default=False)
         p.add_option("--force", action="store_true",
                     help="force", default=False)
         return p
@@ -459,13 +461,16 @@ class SubscriberSetup(CommonSetup):
             else:
                 sys.exit(1)
 
+        dst_db = self.get_database('subscriber_db')
+        dst_curs = dst_db.cursor()
         for tbl in table_list:
             tbl = skytools.fq_name(tbl)
             if tbl in subscriber_tables:
                 self.log.info("Table %s already added" % tbl)
             else:
                 self.log.info("Adding %s" % tbl)
-                self.subscriber_add_one_table(tbl)
+                self.subscriber_add_one_table(dst_curs, tbl)
+            dst_db.commit()
 
     def subscriber_remove_tables(self, table_list):
         subscriber_tables = self.get_subscriber_table_list()
@@ -497,16 +502,19 @@ class SubscriberSetup(CommonSetup):
                 dst_curs.execute(q, [self.pgq_queue_name, tbl])
         dst_db.commit()
 
-    def subscriber_add_one_table(self, tbl):
+    def subscriber_add_one_table(self, dst_curs, tbl):
         q = "select londiste.subscriber_add_table(%s, %s)"
 
-        dst_db = self.get_database('subscriber_db')
-        dst_curs = dst_db.cursor()
         dst_curs.execute(q, [self.pgq_queue_name, tbl])
-        if self.options.expect_sync:
+        if self.options.expect_sync and self.options.skip_truncate:
+            self.log.error("Too many options: --expect-sync and --skip-truncate")
+            sys.exit(1)
+        elif self.options.expect_sync:
             q = "select londiste.subscriber_set_table_state(%s, %s, null, 'ok')"
             dst_curs.execute(q, [self.pgq_queue_name, tbl])
-        dst_db.commit()
+        elif self.options.skip_truncate:
+            q = "select londiste.subscriber_set_skip_truncate(%s, %s, true)"
+            dst_curs.execute(q, [self.pgq_queue_name, tbl])
 
     def subscriber_remove_one_table(self, tbl):
         q = "select londiste.subscriber_remove_table(%s, %s)"
index 1754baaf1f24d323c72333aa734b1b48f7ae0c99..b8b4cb96f173342a670adae861938d509d1d17ee 100644 (file)
@@ -21,12 +21,6 @@ class CopyTable(Replicator):
             self.consumer_id += "_copy"
             self.copy_thread = 1
 
-    def init_optparse(self, parser=None):
-        p = Replicator.init_optparse(self, parser)
-        p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
-                    help = "avoid truncate", default=False)
-        return p
-
     def do_copy(self, tbl_stat):
         src_db = self.get_database('provider_db')
         dst_db = self.get_database('subscriber_db')
@@ -36,7 +30,7 @@ class CopyTable(Replicator):
         dst_db.commit()
 
         # change to SERIALIZABLE isolation level
-        src_db.set_isolation_level(2)
+        src_db.set_isolation_level(skytools.I_SERIALIZABLE)
         src_db.commit()
 
         # initial sync copy
@@ -60,7 +54,7 @@ class CopyTable(Replicator):
         dst_struct.drop(dst_curs, objs, log = self.log)
 
         # do truncate & copy
-        self.real_copy(src_curs, dst_curs, tbl_stat.name)
+        self.real_copy(src_curs, dst_curs, tbl_stat)
 
         # get snapshot
         src_curs.execute("select get_current_snapshot()")
@@ -83,11 +77,12 @@ class CopyTable(Replicator):
         self.save_table_state(dst_curs)
         dst_db.commit()
 
-    def real_copy(self, srccurs, dstcurs, tablename):
+    def real_copy(self, srccurs, dstcurs, tbl_stat):
         "Main copy logic."
 
+        tablename = tbl_stat.name
         # drop data
-        if self.options.skip_truncate:
+        if tbl_stat.skip_truncate:
             self.log.info("%s: skipping truncate" % tablename)
         else:
             self.log.info("%s: truncating" % tablename)