import sys, skytools, londiste.handlers
-__all__ = ['RowCache', 'BaseHandler', 'parse_handler', 'build_handler', 'load_handlers']
+__all__ = ['RowCache', 'BaseHandler', 'build_handler',
+ 'load_handler_modules', 'create_handler_string']
class RowCache:
def __init__(self, table_name):
skytools.magic_insert(curs, self.table_name, self.rows, fields)
class BaseHandler:
- handler_name = 'fwd'
- def __init__(self, table_name, next, args, log):
+ """Defines base API, does nothing.
+ """
+ handler_name = 'nop'
+ def __init__(self, table_name, args, log):
self.table_name = table_name
- self.next = next
self.args = args
self.log = log
Can modify trigger args.
"""
- if self.next:
- self.next.add(trigger_arg_list)
+ pass
def reset(self):
"""Called before starting to process a batch.
Should clean any pending data.
"""
- if self.next:
- self.next.reset()
+ pass
def prepare_batch(self, batch_info, dst_curs):
"""Called on first event for this table in current batch."""
- if self.next:
- self.next.prepare_batch(batch_info, dst_curs)
+ pass
def process_event(self, ev, sql_queue_func, arg):
"""Process a event.
-
+
Event should be added to sql_queue or executed directly.
"""
- if self.next:
- self.next.process_event(ev, sql_queue_func, arg)
+ pass
def finish_batch(self, batch_info, dst_curs):
"""Called when batch finishes."""
- if self.next:
- self.next.finish_batch(batch_info, dst_curs)
+ pass
def prepare_copy(self, expr_list, dst_curs):
"""Can change COPY behaviour.
-
+
Returns new expr.
"""
- if self.next:
- self.next.prepare_copy(expr_list, dst_curs)
+ pass
class TableHandler(BaseHandler):
+ """Default Londiste handler, inserts events into tables with plain SQL."""
handler_name = 'londiste'
sql_command = {
for h in m.__londiste_handlers__:
_handler_map[h.handler_name] = h
-def build_handler(tblname, hlist, log):
- """Execute array of handler initializers."""
- klist = []
- for h in hlist:
- if not h:
- continue
- pos = h.find('(')
- if pos >= 0:
- if h[-1] != ')':
- raise Exception("handler fmt error")
- name = h[:pos].strip()
- args = h[pos+1 : -1].split(',')
- args = [a.strip() for a in args]
- else:
- name = h
- args = []
-
- klass = _handler_map[name]
- klist.append( (klass, args) )
-
- # always append default handler
- klist.append( (TableHandler, []) )
-
- # link them together
- p = None
- klist.reverse()
- for klass, args in klist:
- p = klass(tblname, p, args, log)
- return p
-
-def parse_handler(tblname, hstr, log):
- """Parse and execute string of colon-separated handler initializers."""
- hlist = hstr.split(':')
- return build_handler(tblname, hlist, log)
-
-def load_handlers(cf):
+def _parse_arglist(arglist):
+ args = {}
+ for arg in arglist or []:
+ key, _, val = arg.partition('=')
+ if key in args:
+ raise Exception('multiple handler arguments: %s' % key)
+ args[key] = val.strip()
+ return args
+
+def create_handler_string(name, arglist):
+ handler = name
+ if arglist:
+ args = _parse_arglist(arglist)
+ astr = skytools.db_urlencode(args)
+ handler = '%s(%s)' % (handler, astr)
+ return handler
+
+def _parse_handler(hstr):
+ """Parse result of create_handler_string()."""
+ args = {}
+ name = hstr
+ pos = hstr.find('(')
+ if pos > 0:
+ name = hstr[ : pos]
+ if hstr[-1] != ')':
+ raise Exception('invalid handler format: %s' % hstr)
+ astr = hstr[pos + 1 : -1]
+ if astr:
+ astr = astr.replace(',', '&')
+ args = skytools.db_urldecode(astr)
+ return (name, args)
+
+def build_handler(tblname, hstr, log):
+ """Parse and initialize handler.
+
+ hstr is result of create_handler_string()."""
+ hname, args = _parse_handler(hstr)
+ klass = _handler_map[hname]
+ return klass(tblname, args, log)
+
+def load_handler_modules(cf):
"""Load and register modules from config."""
lst = londiste.handlers.DEFAULT_HANDLERS
lst += cf.getlist('handler_modules', [])
self.set_name = self.queue_name
- londiste.handler.load_handlers(self.cf)
+ londiste.handler.load_handler_modules(self.cf)
def init_optparse(self, parser=None):
"""Add londiste switches to cascadeadmin ones."""
help="Custom trigger arg")
p.add_option("--no-triggers", action="store_true",
help="Custom trigger arg")
- p.add_option("--handler", action="append",
- help="add: Custom handler for table")
+ p.add_option("--handler", action="store",
+ help="add: Custom handler for table")
+ p.add_option("--handler-arg", action="append",
+ help="add: Argument to custom handler")
return p
def extra_init(self, node_type, node_db, provider_db):
tgargs.append('no_triggers')
attrs = {}
- hlist = self.options.handler
- if hlist:
- p = londiste.handler.build_handler(tbl, hlist, self.log)
- attrs['handlers'] = ":".join(hlist)
+ if self.options.handler:
+ hstr = londiste.handler.create_handler_string(
+ self.options.handler, self.options.handler_arg)
+ p = londiste.handler.build_handler(tbl, hstr, self.log)
+ attrs['handler'] = hstr
p.add(tgargs)
# actual table registration
def cmd_tables(self):
"""Show attached tables."""
- q = "select table_name, merge_state from londiste.get_table_list(%s) where local"
+ q = """select table_name, merge_state, table_attrs
+ from londiste.get_table_list(%s) where local"""
db = self.get_database('db')
- self.display_table(db, "Tables on node", q, [self.set_name])
+ self.display_table(db, "Tables on node", q, [self.set_name],
+ fieldfmt = {'table_attrs': lambda f: '' if f is None
+ else skytools.db_urldecode(f)})
def cmd_seqs(self):
"""Show attached seqs."""