From de83f8095d7eb100f21778b61922b6e71be4741a Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 11 Feb 2011 15:31:47 +0200 Subject: [PATCH] londiste; drop chained handler, use single handler per table Original idea was to let admins cascade different handlers, but that seems too complicated. Instead let programmer use subclassing to create handlers with required functionality. --- python/londiste.py | 4 +- python/londiste/handler.py | 110 ++++++++++++++++++------------------ python/londiste/playback.py | 7 ++- python/londiste/setup.py | 24 +++++--- 4 files changed, 78 insertions(+), 67 deletions(-) diff --git a/python/londiste.py b/python/londiste.py index 5a852da2..b260dc00 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -122,8 +122,10 @@ class Londiste(skytools.DBScript): help="add: Custom trigger arg (can be specified multiply times)") g.add_option("--no-triggers", action="store_true", help="add: Dont put triggers on table (makes sense on leaf)") - g.add_option("--handler", action="append", + g.add_option("--handler", action="store", help="add: Custom handler for table") + g.add_option("--handler-arg", action="append", + help="add: Argument to custom handler") g.add_option("--copy-condition", dest="copy_condition", help = "add: set WHERE expression for copy") p.add_option_group(g) diff --git a/python/londiste/handler.py b/python/londiste/handler.py index 265f83af..27e23262 100644 --- a/python/londiste/handler.py +++ b/python/londiste/handler.py @@ -31,7 +31,8 @@ plain londiste: import sys, skytools, londiste.handlers -__all__ = ['RowCache', 'BaseHandler', 'parse_handler', 'build_handler', 'load_handlers'] +__all__ = ['RowCache', 'BaseHandler', 'build_handler', + 'load_handler_modules', 'create_handler_string'] class RowCache: def __init__(self, table_name): @@ -62,10 +63,11 @@ class RowCache: skytools.magic_insert(curs, self.table_name, self.rows, fields) class BaseHandler: - handler_name = 'fwd' - def __init__(self, table_name, next, args, log): + """Defines base API, does nothing. + """ + handler_name = 'nop' + def __init__(self, table_name, args, log): self.table_name = table_name - self.next = next self.args = args self.log = log @@ -74,43 +76,38 @@ class BaseHandler: Can modify trigger args. """ - if self.next: - self.next.add(trigger_arg_list) + pass def reset(self): """Called before starting to process a batch. Should clean any pending data. """ - if self.next: - self.next.reset() + pass def prepare_batch(self, batch_info, dst_curs): """Called on first event for this table in current batch.""" - if self.next: - self.next.prepare_batch(batch_info, dst_curs) + pass def process_event(self, ev, sql_queue_func, arg): """Process a event. - + Event should be added to sql_queue or executed directly. """ - if self.next: - self.next.process_event(ev, sql_queue_func, arg) + pass def finish_batch(self, batch_info, dst_curs): """Called when batch finishes.""" - if self.next: - self.next.finish_batch(batch_info, dst_curs) + pass def prepare_copy(self, expr_list, dst_curs): """Can change COPY behaviour. - + Returns new expr. """ - if self.next: - self.next.prepare_copy(expr_list, dst_curs) + pass class TableHandler(BaseHandler): + """Default Londiste handler, inserts events into tables with plain SQL.""" handler_name = 'londiste' sql_command = { @@ -151,42 +148,47 @@ def register_handler_module(modname): for h in m.__londiste_handlers__: _handler_map[h.handler_name] = h -def build_handler(tblname, hlist, log): - """Execute array of handler initializers.""" - klist = [] - for h in hlist: - if not h: - continue - pos = h.find('(') - if pos >= 0: - if h[-1] != ')': - raise Exception("handler fmt error") - name = h[:pos].strip() - args = h[pos+1 : -1].split(',') - args = [a.strip() for a in args] - else: - name = h - args = [] - - klass = _handler_map[name] - klist.append( (klass, args) ) - - # always append default handler - klist.append( (TableHandler, []) ) - - # link them together - p = None - klist.reverse() - for klass, args in klist: - p = klass(tblname, p, args, log) - return p - -def parse_handler(tblname, hstr, log): - """Parse and execute string of colon-separated handler initializers.""" - hlist = hstr.split(':') - return build_handler(tblname, hlist, log) - -def load_handlers(cf): +def _parse_arglist(arglist): + args = {} + for arg in arglist or []: + key, _, val = arg.partition('=') + if key in args: + raise Exception('multiple handler arguments: %s' % key) + args[key] = val.strip() + return args + +def create_handler_string(name, arglist): + handler = name + if arglist: + args = _parse_arglist(arglist) + astr = skytools.db_urlencode(args) + handler = '%s(%s)' % (handler, astr) + return handler + +def _parse_handler(hstr): + """Parse result of create_handler_string().""" + args = {} + name = hstr + pos = hstr.find('(') + if pos > 0: + name = hstr[ : pos] + if hstr[-1] != ')': + raise Exception('invalid handler format: %s' % hstr) + astr = hstr[pos + 1 : -1] + if astr: + astr = astr.replace(',', '&') + args = skytools.db_urldecode(astr) + return (name, args) + +def build_handler(tblname, hstr, log): + """Parse and initialize handler. + + hstr is result of create_handler_string().""" + hname, args = _parse_handler(hstr) + klass = _handler_map[hname] + return klass(tblname, args, log) + +def load_handler_modules(cf): """Load and register modules from config.""" lst = londiste.handlers.DEFAULT_HANDLERS lst += cf.getlist('handler_modules', []) diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 1b44a5d9..c5fca72f 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -175,8 +175,9 @@ class TableState(object): if row['merge_state'] == "?": self.changed = 1 - hstr = self.table_attrs.get('handlers', '') - self.plugin = parse_handler(self.name, hstr, self.log) + hstr = self.table_attrs.get('handlers', '') # compat + hstr = self.table_attrs.get('handler', hstr) + self.plugin = build_handler(self.name, hstr, self.log) def interesting(self, ev, tick_id, copy_thread): """Check if table wants this event.""" @@ -290,7 +291,7 @@ class Replicator(CascadedWorker): self.consumer_filter = None - load_handlers(self.cf) + load_handler_modules(self.cf) def connection_hook(self, dbname, db): if dbname == 'db' and db.server_version >= 80300: diff --git a/python/londiste/setup.py b/python/londiste/setup.py index a2b4c899..b82c03ac 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -36,7 +36,7 @@ class LondisteSetup(CascadeAdmin): self.set_name = self.queue_name - londiste.handler.load_handlers(self.cf) + londiste.handler.load_handler_modules(self.cf) def init_optparse(self, parser=None): """Add londiste switches to cascadeadmin ones.""" @@ -62,8 +62,10 @@ class LondisteSetup(CascadeAdmin): help="Custom trigger arg") p.add_option("--no-triggers", action="store_true", help="Custom trigger arg") - p.add_option("--handler", action="append", - help="add: Custom handler for table") + p.add_option("--handler", action="store", + help="add: Custom handler for table") + p.add_option("--handler-arg", action="append", + help="add: Argument to custom handler") return p def extra_init(self, node_type, node_db, provider_db): @@ -166,10 +168,11 @@ class LondisteSetup(CascadeAdmin): tgargs.append('no_triggers') attrs = {} - hlist = self.options.handler - if hlist: - p = londiste.handler.build_handler(tbl, hlist, self.log) - attrs['handlers'] = ":".join(hlist) + if self.options.handler: + hstr = londiste.handler.create_handler_string( + self.options.handler, self.options.handler_arg) + p = londiste.handler.build_handler(tbl, hstr, self.log) + attrs['handler'] = hstr p.add(tgargs) # actual table registration @@ -310,9 +313,12 @@ class LondisteSetup(CascadeAdmin): def cmd_tables(self): """Show attached tables.""" - q = "select table_name, merge_state from londiste.get_table_list(%s) where local" + q = """select table_name, merge_state, table_attrs + from londiste.get_table_list(%s) where local""" db = self.get_database('db') - self.display_table(db, "Tables on node", q, [self.set_name]) + self.display_table(db, "Tables on node", q, [self.set_name], + fieldfmt = {'table_attrs': lambda f: '' if f is None + else skytools.db_urldecode(f)}) def cmd_seqs(self): """Show attached seqs.""" -- 2.39.5