londiste; drop chained handler, use single handler per table
authorMarko Kreen <markokr@gmail.com>
Fri, 11 Feb 2011 13:31:47 +0000 (15:31 +0200)
committerMarko Kreen <markokr@gmail.com>
Fri, 11 Feb 2011 13:54:22 +0000 (15:54 +0200)
Original idea was to let admins cascade different handlers,
but that seems too complicated.  Instead let programmer
use subclassing to create handlers with required functionality.

python/londiste.py
python/londiste/handler.py
python/londiste/playback.py
python/londiste/setup.py

index 5a852da2ab8b545bf91357b3153ab4bee5d84632..b260dc009b7fbb5d55cb6252a07e3fed7d6d4627 100755 (executable)
@@ -122,8 +122,10 @@ class Londiste(skytools.DBScript):
                 help="add: Custom trigger arg (can be specified multiply times)")
         g.add_option("--no-triggers", action="store_true",
                 help="add: Dont put triggers on table (makes sense on leaf)")
-        g.add_option("--handler", action="append",
+        g.add_option("--handler", action="store",
                 help="add: Custom handler for table")
+        g.add_option("--handler-arg", action="append",
+                    help="add: Argument to custom handler")
         g.add_option("--copy-condition", dest="copy_condition",
                 help = "add: set WHERE expression for copy")
         p.add_option_group(g)
index 265f83afa80cb1c15b9bef1f8424180965600ee4..27e23262aa28d5e288d0b6a9c23fa2cc09415cdc 100644 (file)
@@ -31,7 +31,8 @@ plain londiste:
 
 import sys, skytools, londiste.handlers
 
-__all__ = ['RowCache', 'BaseHandler', 'parse_handler', 'build_handler', 'load_handlers']
+__all__ = ['RowCache', 'BaseHandler', 'build_handler',
+           'load_handler_modules', 'create_handler_string']
 
 class RowCache:
     def __init__(self, table_name):
@@ -62,10 +63,11 @@ class RowCache:
         skytools.magic_insert(curs, self.table_name, self.rows, fields)
 
 class BaseHandler:
-    handler_name = 'fwd'
-    def __init__(self, table_name, next, args, log):
+    """Defines base API, does nothing.
+    """
+    handler_name = 'nop'
+    def __init__(self, table_name, args, log):
         self.table_name = table_name
-        self.next = next
         self.args = args
         self.log = log
 
@@ -74,43 +76,38 @@ class BaseHandler:
 
         Can modify trigger args.
         """
-        if self.next:
-            self.next.add(trigger_arg_list)
+        pass
 
     def reset(self):
         """Called before starting to process a batch.
         Should clean any pending data.
         """
-        if self.next:
-            self.next.reset()
+        pass
 
     def prepare_batch(self, batch_info, dst_curs):
         """Called on first event for this table in current batch."""
-        if self.next:
-            self.next.prepare_batch(batch_info, dst_curs)
+        pass
 
     def process_event(self, ev, sql_queue_func, arg):
         """Process a event.
-        
+
         Event should be added to sql_queue or executed directly.
         """
-        if self.next:
-            self.next.process_event(ev, sql_queue_func, arg)
+        pass
 
     def finish_batch(self, batch_info, dst_curs):
         """Called when batch finishes."""
-        if self.next:
-            self.next.finish_batch(batch_info, dst_curs)
+        pass
 
     def prepare_copy(self, expr_list, dst_curs):
         """Can change COPY behaviour.
-        
+
         Returns new expr.
         """
-        if self.next:
-            self.next.prepare_copy(expr_list, dst_curs)
+        pass
 
 class TableHandler(BaseHandler):
+    """Default Londiste handler, inserts events into tables with plain SQL."""
     handler_name = 'londiste'
 
     sql_command = {
@@ -151,42 +148,47 @@ def register_handler_module(modname):
     for h in m.__londiste_handlers__:
         _handler_map[h.handler_name] = h
 
-def build_handler(tblname, hlist, log):
-    """Execute array of handler initializers."""
-    klist = []
-    for h in hlist:
-        if not h:
-            continue
-        pos = h.find('(')
-        if pos >= 0:
-            if h[-1] != ')':
-                raise Exception("handler fmt error")
-            name = h[:pos].strip()
-            args = h[pos+1 : -1].split(',')
-            args = [a.strip() for a in args]
-        else:
-            name = h
-            args = []
-
-        klass = _handler_map[name]
-        klist.append( (klass, args) )
-
-    # always append default handler
-    klist.append( (TableHandler, []) )
-
-    # link them together
-    p = None
-    klist.reverse()
-    for klass, args in klist:
-        p = klass(tblname, p, args, log)
-    return p
-
-def parse_handler(tblname, hstr, log):
-    """Parse and execute string of colon-separated handler initializers."""
-    hlist = hstr.split(':')
-    return build_handler(tblname, hlist, log)
-
-def load_handlers(cf):
+def _parse_arglist(arglist):
+    args = {}
+    for arg in arglist or []:
+        key, _, val = arg.partition('=')
+        if key in args:
+            raise Exception('multiple handler arguments: %s' % key)
+        args[key] = val.strip()
+    return args
+
+def create_handler_string(name, arglist):
+    handler = name
+    if arglist:
+        args = _parse_arglist(arglist)
+        astr = skytools.db_urlencode(args)
+        handler = '%s(%s)' % (handler, astr)
+    return handler
+
+def _parse_handler(hstr):
+    """Parse result of create_handler_string()."""
+    args = {}
+    name = hstr
+    pos = hstr.find('(')
+    if pos > 0:
+        name = hstr[ : pos]
+        if hstr[-1] != ')':
+            raise Exception('invalid handler format: %s' % hstr)
+        astr = hstr[pos + 1 : -1]
+        if astr:
+            astr = astr.replace(',', '&')
+            args = skytools.db_urldecode(astr)
+    return (name, args)
+
+def build_handler(tblname, hstr, log):
+    """Parse and initialize handler.
+
+    hstr is result of create_handler_string()."""
+    hname, args = _parse_handler(hstr)
+    klass = _handler_map[hname]
+    return klass(tblname, args, log)
+
+def load_handler_modules(cf):
     """Load and register modules from config."""
     lst = londiste.handlers.DEFAULT_HANDLERS
     lst += cf.getlist('handler_modules', [])
index 1b44a5d9c9b7f15b51da94514826917dcb152fc0..c5fca72fe640a7581c2eb5ff931a205ca37eeb9b 100644 (file)
@@ -175,8 +175,9 @@ class TableState(object):
         if row['merge_state'] == "?":
             self.changed = 1
 
-        hstr = self.table_attrs.get('handlers', '')
-        self.plugin = parse_handler(self.name, hstr, self.log)
+        hstr = self.table_attrs.get('handlers', '') # compat
+        hstr = self.table_attrs.get('handler', hstr)
+        self.plugin = build_handler(self.name, hstr, self.log)
 
     def interesting(self, ev, tick_id, copy_thread):
         """Check if table wants this event."""
@@ -290,7 +291,7 @@ class Replicator(CascadedWorker):
 
         self.consumer_filter = None
 
-        load_handlers(self.cf)
+        load_handler_modules(self.cf)
 
     def connection_hook(self, dbname, db):
         if dbname == 'db' and db.server_version >= 80300:
index a2b4c899382a5e398cc6c674ff01f95f02c4e4e0..b82c03ac01f56ecbb11d681a23d0019f94100adc 100644 (file)
@@ -36,7 +36,7 @@ class LondisteSetup(CascadeAdmin):
 
         self.set_name = self.queue_name
 
-        londiste.handler.load_handlers(self.cf)
+        londiste.handler.load_handler_modules(self.cf)
 
     def init_optparse(self, parser=None):
         """Add londiste switches to cascadeadmin ones."""
@@ -62,8 +62,10 @@ class LondisteSetup(CascadeAdmin):
                     help="Custom trigger arg")
         p.add_option("--no-triggers", action="store_true",
                     help="Custom trigger arg")
-        p.add_option("--handler", action="append",
-                    help="add: Custom handler for table")
+        p.add_option("--handler", action="store",
+                help="add: Custom handler for table")
+        p.add_option("--handler-arg", action="append",
+                    help="add: Argument to custom handler")
         return p
 
     def extra_init(self, node_type, node_db, provider_db):
@@ -166,10 +168,11 @@ class LondisteSetup(CascadeAdmin):
             tgargs.append('no_triggers')
 
         attrs = {}
-        hlist = self.options.handler
-        if hlist:
-            p = londiste.handler.build_handler(tbl, hlist, self.log)
-            attrs['handlers'] = ":".join(hlist)
+        if self.options.handler:
+            hstr = londiste.handler.create_handler_string(
+                            self.options.handler, self.options.handler_arg)
+            p = londiste.handler.build_handler(tbl, hstr, self.log)
+            attrs['handler'] = hstr
             p.add(tgargs)
 
         # actual table registration
@@ -310,9 +313,12 @@ class LondisteSetup(CascadeAdmin):
 
     def cmd_tables(self):
         """Show attached tables."""
-        q = "select table_name, merge_state from londiste.get_table_list(%s) where local"
+        q = """select table_name, merge_state, table_attrs
+        from londiste.get_table_list(%s) where local"""
         db = self.get_database('db')
-        self.display_table(db, "Tables on node", q, [self.set_name])
+        self.display_table(db, "Tables on node", q, [self.set_name],
+                           fieldfmt = {'table_attrs': lambda f: '' if f is None
+                                       else skytools.db_urldecode(f)})
 
     def cmd_seqs(self):
         """Show attached seqs."""