fqname = skytools.quote_fqident(ev.extra1)
sql = "TRUNCATE %s;" % fqname
- self.apply_sql(sql, dst_curs)
+ self.apply_sql(sql, dst_curs, True)
def handle_execute_event(self, ev, dst_curs):
"""handle one EXECUTE event"""
q = "select * from londiste.execute_finish(%s, %s)"
self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False)
- def apply_sql(self, sql, dst_curs):
+ def apply_sql(self, sql, dst_curs, force = False):
+ if force:
+ self.flush_sql(dst_curs)
+
self.sql_list.append(sql)
- if len(self.sql_list) > 200:
+
+ limit = 200
+ if self.work_state == -1 or force:
+ limit = 0
+ if len(self.sql_list) >= limit:
self.flush_sql(dst_curs)
def flush_sql(self, dst_curs):
curs.execute(q, [self.queue_name, self.consumer_name])
return curs.fetchone()[0]
- def _flush_retry(self, curs, list):
+ def _flush_retry(self, curs, batch_id, list):
"""Tag retry events."""
retry = 0