londiste: on error, apply sql on-by-one
authorMarko Kreen <markokr@gmail.com>
Wed, 9 Sep 2009 11:09:40 +0000 (14:09 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 10 Sep 2009 11:45:09 +0000 (14:45 +0300)
Check whether last loop got error (work_state = -1)
and apply sql one-by-one if thats true.

Also, apply TRUNCATE separately from other SQL.

python/londiste/playback.py
python/pgq/consumer.py

index ea55975a06691e73048992f541e0fb99d8f92178..a9d90702602fb7cd064531995b72b69ae893f33c 100644 (file)
@@ -507,7 +507,7 @@ class Replicator(CascadedWorker):
 
         fqname = skytools.quote_fqident(ev.extra1)
         sql = "TRUNCATE %s;" % fqname
-        self.apply_sql(sql, dst_curs)
+        self.apply_sql(sql, dst_curs, True)
 
     def handle_execute_event(self, ev, dst_curs):
         """handle one EXECUTE event"""
@@ -531,9 +531,16 @@ class Replicator(CascadedWorker):
         q = "select * from londiste.execute_finish(%s, %s)"
         self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False)
 
-    def apply_sql(self, sql, dst_curs):
+    def apply_sql(self, sql, dst_curs, force = False):
+        if force:
+            self.flush_sql(dst_curs)
+
         self.sql_list.append(sql)
-        if len(self.sql_list) > 200:
+
+        limit = 200
+        if self.work_state == -1 or force:
+            limit = 0
+        if len(self.sql_list) >= limit:
             self.flush_sql(dst_curs)
 
     def flush_sql(self, dst_curs):
index 32fa9f76f97f11408b120043fad03ed4d38e89af..57a74a3ecc2173c49db935b5fe6505ff97d70527 100644 (file)
@@ -245,7 +245,7 @@ class Consumer(skytools.DBScript):
         curs.execute(q, [self.queue_name, self.consumer_name])
         return curs.fetchone()[0]
 
-    def _flush_retry(self, curs, list):
+    def _flush_retry(self, curs, batch_id, list):
         """Tag retry events."""
 
         retry = 0