From aae5c5b1dde3e456189cdff0fa1cd7b5cb369aa8 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Thu, 18 Apr 2013 14:55:25 +0300 Subject: [PATCH] londiste copy: fix data filtering when parallel_copies > 1 The problem appears in 'catching-up' phase were code checked table status only and not whether the table is owned by current process. Then events can be replayed by several copy processes when they are all in catching-up phase. The problem does not appear when local_only=1. Also remove unused function. --- python/londiste/playback.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 1c60c97b..73b6d298 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -201,10 +201,12 @@ class TableState(object): return self.max_parallel_copy and\ self.copy_pos >= self.max_parallel_copy - def interesting(self, ev, tick_id, copy_thread): + def interesting(self, ev, tick_id, copy_thread, copy_table_name): """Check if table wants this event.""" if copy_thread: + if self.name != copy_table_name: + return False if self.state not in (TABLE_CATCHING_UP, TABLE_DO_SYNC): return False else: @@ -651,7 +653,7 @@ class Replicator(CascadedWorker): def handle_data_event(self, ev, dst_curs): """handle one truncate event""" t = self.get_table_by_name(ev.extra1) - if not t or not t.interesting(ev, self.cur_tick, self.copy_thread): + if not t or not t.interesting(ev, self.cur_tick, self.copy_thread, self.copy_table_name): self.stat_increase('ignored_events') return @@ -667,7 +669,7 @@ class Replicator(CascadedWorker): def handle_truncate_event(self, ev, dst_curs): """handle one truncate event""" t = self.get_table_by_name(ev.extra1) - if not t or not t.interesting(ev, self.cur_tick, self.copy_thread): + if not t or not t.interesting(ev, self.cur_tick, self.copy_thread, self.copy_table_name): self.stat_increase('ignored_events') return @@ -754,17 +756,6 @@ class Replicator(CascadedWorker): dst_curs.execute(buf) - def interesting(self, ev): - """See if event is interesting.""" - if ev.type not in ('I', 'U', 'D', 'R'): - raise Exception('bug - bad event type in .interesting') - t = self.get_table_by_name(ev.extra1) - if not t: - return 0 - if not t.interesting(ev, self.cur_tick, self.copy_thread): - return 0 - return 1 - def add_set_table(self, dst_curs, tbl): """There was new table added to root, remember it.""" -- 2.39.5