From 4e3d76ee0d6cd509bfbf9d0ecdf3a4774f9027b1 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Wed, 9 Sep 2009 14:09:40 +0300 Subject: [PATCH] londiste: on error, apply sql on-by-one Check whether last loop got error (work_state = -1) and apply sql one-by-one if thats true. Also, apply TRUNCATE separately from other SQL. --- python/londiste/playback.py | 13 ++++++++++--- python/pgq/consumer.py | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/python/londiste/playback.py b/python/londiste/playback.py index ea55975a..a9d90702 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -507,7 +507,7 @@ class Replicator(CascadedWorker): fqname = skytools.quote_fqident(ev.extra1) sql = "TRUNCATE %s;" % fqname - self.apply_sql(sql, dst_curs) + self.apply_sql(sql, dst_curs, True) def handle_execute_event(self, ev, dst_curs): """handle one EXECUTE event""" @@ -531,9 +531,16 @@ class Replicator(CascadedWorker): q = "select * from londiste.execute_finish(%s, %s)" self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False) - def apply_sql(self, sql, dst_curs): + def apply_sql(self, sql, dst_curs, force = False): + if force: + self.flush_sql(dst_curs) + self.sql_list.append(sql) - if len(self.sql_list) > 200: + + limit = 200 + if self.work_state == -1 or force: + limit = 0 + if len(self.sql_list) >= limit: self.flush_sql(dst_curs) def flush_sql(self, dst_curs): diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 32fa9f76..57a74a3e 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -245,7 +245,7 @@ class Consumer(skytools.DBScript): curs.execute(q, [self.queue_name, self.consumer_name]) return curs.fetchone()[0] - def _flush_retry(self, curs, list): + def _flush_retry(self, curs, batch_id, list): """Tag retry events.""" retry = 0 -- 2.39.5