From 8cc52f36d91d237870e5dcef05519240ab56517b Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 7 May 2010 15:17:53 +0300 Subject: [PATCH] Pluggable per-table handlers for event processing. 1. Make handler modules known for Londiste: [londiste3] handler_modules = londiste.bublin, some_other.module Modules are imported and classes found in __londiste_handlers__ module variable are registered. 2. Generic table setup: londiste3 cf.ini add-table foo --handler='handler1' --handler='handler2(arg1, arg2)' Londiste standard handler is default and always appended to custom plugins. --- python/londiste.py | 2 + python/londiste/__init__.py | 4 +- python/londiste/handler.py | 166 ++++++++++++++++++++++++++++++++++ python/londiste/playback.py | 93 ++++++++++--------- python/londiste/setup.py | 23 ++++- python/londiste/table_copy.py | 8 +- 6 files changed, 242 insertions(+), 54 deletions(-) create mode 100644 python/londiste/handler.py diff --git a/python/londiste.py b/python/londiste.py index 1939b2d0..b1d42985 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -111,6 +111,8 @@ class Londiste(skytools.DBScript): help = "takeover: old node was branch") p.add_option("--trigger-arg", action="append", help="add: Custom trigger arg") + p.add_option("--handler", action="append", + help="add: Custom handler for table") p.add_option_group(g) return p diff --git a/python/londiste/__init__.py b/python/londiste/__init__.py index 1b06de8a..a47f602f 100644 --- a/python/londiste/__init__.py +++ b/python/londiste/__init__.py @@ -8,18 +8,20 @@ import londiste.compare import londiste.setup import londiste.table_copy import londiste.repair +import londiste.handler from londiste.playback import * from londiste.compare import * from londiste.setup import * from londiste.table_copy import * from londiste.repair import * +from londiste.handler import * __all__ = ( londiste.playback.__all__ + londiste.compare.__all__ + + londiste.handler.__all__ + londiste.setup.__all__ + londiste.table_copy.__all__ + londiste.repair.__all__ ) - diff --git a/python/londiste/handler.py b/python/londiste/handler.py new file mode 100644 index 00000000..9346cf7c --- /dev/null +++ b/python/londiste/handler.py @@ -0,0 +1,166 @@ + +"""Table handler. + +Per-table decision how to create trigger, copy data and apply events. +""" + +""" +-- redirect & create table +partition by batch_time +partition by date field + +-- sql handling: +cube1 - I/U/D -> partition, insert +cube2 - I/U/D -> partition, del/insert +field remap +name remap + +bublin filter +- replay: filter events +- copy: additional where +- add: add trigger args + +multimaster +- replay: conflict handling, add fncall to sql queue? +- add: add 'backup' arg to trigger + +plain londiste: +- replay: add to sql queue + +""" + +import sys, skytools + +__all__ = ['BaseHandler', 'parse_handler', 'build_handler', 'load_handlers'] + +class BaseHandler: + handler_name = 'fwd' + def __init__(self, name, next, args): + self.name = name + self.next = next + self.args = args + + def add(self, trigger_arg_list): + """Called when table is added. + + Can modify trigger args. + """ + if self.next: + self.next.add(trigger_arg_list) + + def reset(self): + """Called before starting to process a batch. + Should clean any pending data. + """ + if self.next: + self.next.reset() + + def prepare_batch(self, batch_info, dst_curs): + """Called on first event for this table in current batch.""" + if self.next: + self.next.prepare_batch(batch_info, dst_curs) + + def process_event(self, ev, sql_queue_func, arg): + """Process a event. + + Event should be added to sql_queue or executed directly. + """ + if self.next: + self.next.process_event(ev, sql_queue_func, arg) + + def finish_batch(self, batch_info): + """Called when batch finishes.""" + if self.next: + self.next.finish_batch(batch_info) + + def prepare_copy(self, expr_list, dst_curs): + """Can change COPY behaviour. + + Returns new expr. + """ + if self.next: + self.next.prepare_copy(expr_list, dst_curs) + +class TableHandler(BaseHandler): + handler_name = 'londiste' + + sql_command = { + 'I': "insert into %s %s;", + 'U': "update only %s set %s;", + 'D': "delete from only %s where %s;", + } + + def process_event(self, ev, sql_queue_func, arg): + if len(ev.type) == 1: + # sql event + fqname = skytools.quote_fqident(ev.extra1) + fmt = self.sql_command[ev.type] + sql = fmt % (fqname, ev.data) + else: + # urlenc event + pklist = ev.type[2:].split(',') + row = skytools.db_urldecode(ev.data) + op = ev.type[0] + tbl = ev.extra1 + if op == 'I': + sql = skytools.mk_insert_sql(row, tbl, pklist) + elif op == 'U': + sql = skytools.mk_update_sql(row, tbl, pklist) + elif op == 'D': + sql = skytools.mk_delete_sql(row, tbl, pklist) + + sql_queue_func(sql, arg) + +_handler_map = { + 'londiste': TableHandler, +} + +def register_handler_module(modname): + """Import and module and register handlers.""" + __import__(modname) + m = sys.modules[modname] + for h in m.__londiste_handlers__: + _handler_map[h.handler_name] = h + +def build_handler(tblname, hlist): + """Execute array of handler initializers.""" + klist = [] + for h in hlist: + if not h: + continue + pos = h.find('(') + if pos >= 0: + if h[-1] != ')': + raise Exception("handler fmt error") + name = h[:pos].strip() + args = h[pos+1 : -1].split(',') + args = [a.strip() for a in args] + else: + name = h + args = [] + + klass = _handler_map[name] + klist.append( (klass, args) ) + + # always append default handler + klist.append( (TableHandler, []) ) + + # link them together + p = None + klist.reverse() + for klass, args in klist: + p = klass(tblname, p, args) + return p + +def parse_handler(tblname, hstr): + """Parse and execute string of colon-separated handler initializers.""" + hlist = hstr.split(':') + return build_handler(tblname, hlist) + +def load_handlers(cf): + """Load and register modules from config.""" + lst = cf.getlist('handler_modules', []) + + for m in lst: + register_handler_module(m) + diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 4225e441..0d13a81b 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -7,6 +7,8 @@ import skytools from pgq.cascade.worker import CascadedWorker +from londiste.handler import * + __all__ = ['Replicator', 'TableState', 'TABLE_MISSING', 'TABLE_IN_COPY', 'TABLE_CATCHING_UP', 'TABLE_WANNA_SYNC', 'TABLE_DO_SYNC', 'TABLE_OK'] @@ -69,6 +71,7 @@ class TableState(object): self.table_attrs = {} self.copy_role = None self.dropped_ddl = None + self.plugin = None # except this self.changed = 0 @@ -83,6 +86,7 @@ class TableState(object): self.last_tick = 0 self.table_attrs = {} self.changed = 1 + self.plugin = None def change_snapshot(self, str_snapshot, tag_changed = 1): """Set snapshot.""" @@ -171,6 +175,9 @@ class TableState(object): if row['merge_state'] == "?": self.changed = 1 + hstr = row.get('handler', '') + self.plugin = parse_handler(self.name, hstr) + def interesting(self, ev, tick_id, copy_thread): """Check if table wants this event.""" @@ -233,6 +240,12 @@ class TableState(object): if self.last_snapshot_tick < prev_tick: self.change_snapshot(None) + def process_data_event(self, ev, sql_queue_func, batch_info): + self.plugin.process_event(ev, sql_queue_func) + + def get_plugin(self): + return self.plugin + class Replicator(CascadedWorker): """Replication core. @@ -254,12 +267,6 @@ class Replicator(CascadedWorker): #compare_fmt = %(cnt)d rows, checksum=%(chksum)s """ - sql_command = { - 'I': "insert into %s %s;", - 'U': "update only %s set %s;", - 'D': "delete from only %s where %s;", - } - # batch info cur_tick = 0 prev_tick = 0 @@ -275,11 +282,14 @@ class Replicator(CascadedWorker): self.copy_thread = 0 self.set_name = self.queue_name + self.used_plugins = {} self.parallel_copies = self.cf.getint('parallel_copies', 1) if self.parallel_copies < 1: raise Exception('Bad value for parallel_copies: %d' % self.parallel_copies) + load_handlers(self.cf) + def connection_hook(self, dbname, db): if dbname == 'db': curs = db.cursor() @@ -306,6 +316,11 @@ class Replicator(CascadedWorker): if not self.copy_thread: self.restore_fkeys(dst_db) + + for p in self.used_plugins.values(): + p.reset() + self.used_plugins = {} + # now the actual event processing happens. # they must be done all in one tx in dst side # and the transaction must be kept open so that @@ -315,6 +330,10 @@ class Replicator(CascadedWorker): CascadedWorker.process_remote_batch(self, src_db, tick_id, ev_list, dst_db) self.flush_sql(dst_curs) + for p in self.used_plugins.values(): + p.finish_batch(self.batch_info) + self.used_plugins = {} + # finalize table changes self.save_table_state(dst_curs) @@ -456,7 +475,7 @@ class Replicator(CascadedWorker): if ev.type in ('I', 'U', 'D'): self.handle_data_event(ev, dst_curs) elif ev.type[:2] in ('I:', 'U:', 'D:'): - self.handle_urlenc_event(ev, dst_curs) + self.handle_data_event(ev, dst_curs) elif ev.type == "TRUNCATE": self.flush_sql(dst_curs) self.handle_truncate_event(ev, dst_curs) @@ -479,42 +498,20 @@ class Replicator(CascadedWorker): CascadedWorker.process_remote_event(self, src_curs, dst_curs, ev) def handle_data_event(self, ev, dst_curs): - """handle one data event""" - t = self.get_table_by_name(ev.extra1) - if t and t.interesting(ev, self.cur_tick, self.copy_thread): - # buffer SQL statements, then send them together - fqname = skytools.quote_fqident(ev.extra1) - fmt = self.sql_command[ev.type] - sql = fmt % (fqname, ev.data) - - self.apply_sql(sql, dst_curs) - else: - self.stat_increase('ignored_events') - - def handle_urlenc_event(self, ev, dst_curs): """handle one truncate event""" t = self.get_table_by_name(ev.extra1) if not t or not t.interesting(ev, self.cur_tick, self.copy_thread): self.stat_increase('ignored_events') return - # parse event - pklist = ev.type[2:].split(',') - row = skytools.db_urldecode(ev.data) - op = ev.type[0] - tbl = ev.extra1 - - # generate sql - if op == 'I': - sql = skytools.mk_insert_sql(row, tbl, pklist) - elif op == 'U': - sql = skytools.mk_update_sql(row, tbl, pklist) - elif op == 'D': - sql = skytools.mk_delete_sql(row, tbl, pklist) - else: - raise Exception('bug: bad op') - - self.apply_sql(sql, dst_curs) + try: + p = self.used_plugins[ev.extra1] + except KeyError: + p = t.get_plugin() + self.used_plugins[ev.extra1] = p + p.prepare_batch(self.batch_info, dst_curs) + + p.process_event(ev, self.apply_sql, dst_curs) def handle_truncate_event(self, ev, dst_curs): """handle one truncate event""" @@ -525,7 +522,9 @@ class Replicator(CascadedWorker): fqname = skytools.quote_fqident(ev.extra1) sql = "TRUNCATE %s;" % fqname - self.apply_sql(sql, dst_curs, True) + + self.flush_sql(dst_curs) + dst_curs.execute(sql) def handle_execute_event(self, ev, dst_curs): """handle one EXECUTE event""" @@ -549,15 +548,14 @@ class Replicator(CascadedWorker): q = "select * from londiste.execute_finish(%s, %s)" self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False) - def apply_sql(self, sql, dst_curs, force = False): - if force: - self.flush_sql(dst_curs) - - self.sql_list.append(sql) + def apply_sql(self, sql, dst_curs): + # how many queries to batch together, drop batching on error limit = 200 - if self.work_state == -1 or force: + if self.work_state == -1: limit = 0 + + self.sql_list.append(sql) if len(self.sql_list) >= limit: self.flush_sql(dst_curs) @@ -577,10 +575,11 @@ class Replicator(CascadedWorker): if ev.type not in ('I', 'U', 'D'): raise Exception('bug - bad event type in .interesting') t = self.get_table_by_name(ev.extra1) - if t: - return t.interesting(ev, self.cur_tick, self.copy_thread) - else: + if not t: + return 0 + if not t.interesting(ev, self.cur_tick, self.copy_thread): return 0 + return 1 def add_set_table(self, dst_curs, tbl): """There was new table added to root, remember it.""" diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 20101e2a..f7a4730b 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -8,6 +8,8 @@ import sys, os, re, skytools from pgq.cascade.admin import CascadeAdmin from skytools.scripting import UsageError +import londiste.handler + __all__ = ['LondisteSetup'] class LondisteSetup(CascadeAdmin): @@ -37,6 +39,8 @@ class LondisteSetup(CascadeAdmin): self.set_name = self.queue_name + londiste.handler.load_handlers(self.cf) + def connection_hook(self, dbname, db): if dbname == 'db': curs = db.cursor() @@ -63,6 +67,8 @@ class LondisteSetup(CascadeAdmin): help="pkey,fkeys,indexes") p.add_option("--trigger-arg", action="append", help="Custom trigger arg") + p.add_option("--handler", action="append", + help="add: Custom handler for table") return p def extra_init(self, node_type, node_db, provider_db): @@ -163,23 +169,30 @@ class LondisteSetup(CascadeAdmin): self.log.warning('Table "%s" missing on subscriber, use --create if necessary' % tbl) return + attrs = {} + + hlist = self.options.handler + if hlist: + p = londiste.handler.build_handler(tbl, hlist) + attrs['handlers'] = ":".join(hlist) + # actual table registration tgargs = self.options.trigger_arg # None by default q = "select * from londiste.local_add_table(%s, %s, %s)" self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs]) + if self.options.expect_sync: q = "select * from londiste.local_set_table_state(%s, %s, NULL, 'ok')" self.exec_cmd(dst_curs, q, [self.set_name, tbl]) else: - attrs = {} if self.options.skip_truncate: attrs['skip_truncate'] = 1 if self.options.copy_condition: attrs['copy_condition'] = self.options.copy_condition - if attrs: - enc_attrs = skytools.db_urlencode(attrs) - q = "select * from londiste.local_set_table_attrs(%s, %s, %s)" - self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs]) + if attrs: + enc_attrs = skytools.db_urlencode(attrs) + q = "select * from londiste.local_set_table_attrs(%s, %s, %s)" + self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs]) dst_db.commit() def sync_table_list(self, dst_curs, src_tbls, dst_tbls): diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index 07c9594a..5e10c889 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -200,7 +200,13 @@ class CopyTable(Replicator): tablename = tbl_stat.name # do copy self.log.info("%s: start copy" % tablename) - w_cond = tbl_stat.table_attrs.get('copy_condition') + p = tbl_stat.get_plugin() + cond_list = [] + cond = tbl_stat.table_attrs.get('copy_condition') + if cond: + cond_list.append(cond) + p.prepare_copy(cond_list, dstcurs) + w_cond = ' and '.join(cond_list) stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list, w_cond) if stats: self.log.info("%s: copy finished: %d bytes, %d rows" % ( -- 2.39.5