Pluggable per-table handlers for event processing.
authorMarko Kreen <markokr@gmail.com>
Fri, 7 May 2010 12:17:53 +0000 (15:17 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 30 Aug 2010 10:50:44 +0000 (13:50 +0300)
1. Make handler modules known for Londiste:

  [londiste3]
  handler_modules = londiste.bublin, some_other.module

Modules are imported and classes found in __londiste_handlers__
module variable are registered.

2. Generic table setup:

  londiste3 cf.ini add-table foo --handler='handler1' --handler='handler2(arg1, arg2)'

Londiste standard handler is default and always appended to custom plugins.

python/londiste.py
python/londiste/__init__.py
python/londiste/handler.py [new file with mode: 0644]
python/londiste/playback.py
python/londiste/setup.py
python/londiste/table_copy.py

index 1939b2d0ff3c91d0d08becef4ca1e0a688dc5fbe..b1d4298555f5298c05456d753833e1b0891217ca 100755 (executable)
@@ -111,6 +111,8 @@ class Londiste(skytools.DBScript):
                 help = "takeover: old node was branch")
         p.add_option("--trigger-arg", action="append",
                 help="add: Custom trigger arg")
+        p.add_option("--handler", action="append",
+                help="add: Custom handler for table")
         p.add_option_group(g)
         return p
 
index 1b06de8a342d6b832ec568826c36d6a3d835d6f1..a47f602f8845dd8535f626cedf22be8e8bb2bbc7 100644 (file)
@@ -8,18 +8,20 @@ import londiste.compare
 import londiste.setup
 import londiste.table_copy
 import londiste.repair
+import londiste.handler
 
 from londiste.playback import *
 from londiste.compare import *
 from londiste.setup import *
 from londiste.table_copy import *
 from londiste.repair import *
+from londiste.handler import *
 
 __all__ = (
     londiste.playback.__all__ +
     londiste.compare.__all__ +
+    londiste.handler.__all__ +
     londiste.setup.__all__ +
     londiste.table_copy.__all__ +
     londiste.repair.__all__ )
 
-
diff --git a/python/londiste/handler.py b/python/londiste/handler.py
new file mode 100644 (file)
index 0000000..9346cf7
--- /dev/null
@@ -0,0 +1,166 @@
+
+"""Table handler.
+
+Per-table decision how to create trigger, copy data and apply events.
+"""
+
+"""
+-- redirect & create table
+partition by batch_time
+partition by date field
+
+-- sql handling:
+cube1 - I/U/D -> partition, insert
+cube2 - I/U/D -> partition, del/insert
+field remap
+name remap
+
+bublin filter
+- replay: filter events
+- copy: additional where
+- add: add trigger args
+
+multimaster
+- replay: conflict handling, add fncall to sql queue?
+- add: add 'backup' arg to trigger
+
+plain londiste:
+- replay: add to sql queue
+
+"""
+
+import sys, skytools
+
+__all__ = ['BaseHandler', 'parse_handler', 'build_handler', 'load_handlers']
+
+class BaseHandler:
+    handler_name = 'fwd'
+    def __init__(self, name, next, args):
+        self.name = name
+        self.next = next
+        self.args = args
+
+    def add(self, trigger_arg_list):
+        """Called when table is added.
+
+        Can modify trigger args.
+        """
+        if self.next:
+            self.next.add(trigger_arg_list)
+
+    def reset(self):
+        """Called before starting to process a batch.
+        Should clean any pending data.
+        """
+        if self.next:
+            self.next.reset()
+
+    def prepare_batch(self, batch_info, dst_curs):
+        """Called on first event for this table in current batch."""
+        if self.next:
+            self.next.prepare_batch(batch_info, dst_curs)
+
+    def process_event(self, ev, sql_queue_func, arg):
+        """Process a event.
+        
+        Event should be added to sql_queue or executed directly.
+        """
+        if self.next:
+            self.next.process_event(ev, sql_queue_func, arg)
+
+    def finish_batch(self, batch_info):
+        """Called when batch finishes."""
+        if self.next:
+            self.next.finish_batch(batch_info)
+
+    def prepare_copy(self, expr_list, dst_curs):
+        """Can change COPY behaviour.
+        
+        Returns new expr.
+        """
+        if self.next:
+            self.next.prepare_copy(expr_list, dst_curs)
+
+class TableHandler(BaseHandler):
+    handler_name = 'londiste'
+
+    sql_command = {
+        'I': "insert into %s %s;",
+        'U': "update only %s set %s;",
+        'D': "delete from only %s where %s;",
+    }
+
+    def process_event(self, ev, sql_queue_func, arg):
+        if len(ev.type) == 1:
+            # sql event
+            fqname = skytools.quote_fqident(ev.extra1)
+            fmt = self.sql_command[ev.type]
+            sql = fmt % (fqname, ev.data)
+        else:
+            # urlenc event
+            pklist = ev.type[2:].split(',')
+            row = skytools.db_urldecode(ev.data)
+            op = ev.type[0]
+            tbl = ev.extra1
+            if op == 'I':
+                sql = skytools.mk_insert_sql(row, tbl, pklist)
+            elif op == 'U':
+                sql = skytools.mk_update_sql(row, tbl, pklist)
+            elif op == 'D':
+                sql = skytools.mk_delete_sql(row, tbl, pklist)
+
+        sql_queue_func(sql, arg)
+
+_handler_map = {
+    'londiste': TableHandler,
+}
+
+def register_handler_module(modname):
+    """Import and module and register handlers."""
+    __import__(modname)
+    m = sys.modules[modname]
+    for h in m.__londiste_handlers__:
+        _handler_map[h.handler_name] = h
+
+def build_handler(tblname, hlist):
+    """Execute array of handler initializers."""
+    klist = []
+    for h in hlist:
+        if not h:
+            continue
+        pos = h.find('(')
+        if pos >= 0:
+            if h[-1] != ')':
+                raise Exception("handler fmt error")
+            name = h[:pos].strip()
+            args = h[pos+1 : -1].split(',')
+            args = [a.strip() for a in args]
+        else:
+            name = h
+            args = []
+
+        klass = _handler_map[name]
+        klist.append( (klass, args) )
+
+    # always append default handler
+    klist.append( (TableHandler, []) )
+
+    # link them together
+    p = None
+    klist.reverse()
+    for klass, args in klist:
+        p = klass(tblname, p, args)
+    return p
+
+def parse_handler(tblname, hstr):
+    """Parse and execute string of colon-separated handler initializers."""
+    hlist = hstr.split(':')
+    return build_handler(tblname, hlist)
+
+def load_handlers(cf):
+    """Load and register modules from config."""
+    lst = cf.getlist('handler_modules', [])
+
+    for m in lst:
+        register_handler_module(m)
+
index 4225e441b76be2c1445ff39131de970790844ef9..0d13a81b19c29bf084b2faa81f7d2ef08a40346e 100644 (file)
@@ -7,6 +7,8 @@ import skytools
 
 from pgq.cascade.worker import CascadedWorker
 
+from londiste.handler import *
+
 __all__ = ['Replicator', 'TableState',
     'TABLE_MISSING', 'TABLE_IN_COPY', 'TABLE_CATCHING_UP',
     'TABLE_WANNA_SYNC', 'TABLE_DO_SYNC', 'TABLE_OK']
@@ -69,6 +71,7 @@ class TableState(object):
         self.table_attrs = {}
         self.copy_role = None
         self.dropped_ddl = None
+        self.plugin = None
         # except this
         self.changed = 0
 
@@ -83,6 +86,7 @@ class TableState(object):
         self.last_tick = 0
         self.table_attrs = {}
         self.changed = 1
+        self.plugin = None
 
     def change_snapshot(self, str_snapshot, tag_changed = 1):
         """Set snapshot."""
@@ -171,6 +175,9 @@ class TableState(object):
         if row['merge_state'] == "?":
             self.changed = 1
 
+        hstr = row.get('handler', '')
+        self.plugin = parse_handler(self.name, hstr)
+
     def interesting(self, ev, tick_id, copy_thread):
         """Check if table wants this event."""
 
@@ -233,6 +240,12 @@ class TableState(object):
         if self.last_snapshot_tick < prev_tick:
             self.change_snapshot(None)
 
+    def process_data_event(self, ev, sql_queue_func, batch_info):
+        self.plugin.process_event(ev, sql_queue_func)
+
+    def get_plugin(self):
+        return self.plugin
+
 class Replicator(CascadedWorker):
     """Replication core.
 
@@ -254,12 +267,6 @@ class Replicator(CascadedWorker):
         #compare_fmt = %(cnt)d rows, checksum=%(chksum)s
     """
 
-    sql_command = {
-        'I': "insert into %s %s;",
-        'U': "update only %s set %s;",
-        'D': "delete from only %s where %s;",
-    }
-
     # batch info
     cur_tick = 0
     prev_tick = 0
@@ -275,11 +282,14 @@ class Replicator(CascadedWorker):
 
         self.copy_thread = 0
         self.set_name = self.queue_name
+        self.used_plugins = {}
 
         self.parallel_copies = self.cf.getint('parallel_copies', 1)
         if self.parallel_copies < 1:
             raise Exception('Bad value for parallel_copies: %d' % self.parallel_copies)
 
+        load_handlers(self.cf)
+
     def connection_hook(self, dbname, db):
         if dbname == 'db':
             curs = db.cursor()
@@ -306,6 +316,11 @@ class Replicator(CascadedWorker):
         if not self.copy_thread:
             self.restore_fkeys(dst_db)
 
+
+        for p in self.used_plugins.values():
+            p.reset()
+        self.used_plugins = {}
+
         # now the actual event processing happens.
         # they must be done all in one tx in dst side
         # and the transaction must be kept open so that
@@ -315,6 +330,10 @@ class Replicator(CascadedWorker):
         CascadedWorker.process_remote_batch(self, src_db, tick_id, ev_list, dst_db)
         self.flush_sql(dst_curs)
 
+        for p in self.used_plugins.values():
+            p.finish_batch(self.batch_info)
+        self.used_plugins = {}
+
         # finalize table changes
         self.save_table_state(dst_curs)
 
@@ -456,7 +475,7 @@ class Replicator(CascadedWorker):
         if ev.type in ('I', 'U', 'D'):
             self.handle_data_event(ev, dst_curs)
         elif ev.type[:2] in ('I:', 'U:', 'D:'):
-            self.handle_urlenc_event(ev, dst_curs)
+            self.handle_data_event(ev, dst_curs)
         elif ev.type == "TRUNCATE":
             self.flush_sql(dst_curs)
             self.handle_truncate_event(ev, dst_curs)
@@ -479,42 +498,20 @@ class Replicator(CascadedWorker):
             CascadedWorker.process_remote_event(self, src_curs, dst_curs, ev)
 
     def handle_data_event(self, ev, dst_curs):
-        """handle one data event"""
-        t = self.get_table_by_name(ev.extra1)
-        if t and t.interesting(ev, self.cur_tick, self.copy_thread):
-            # buffer SQL statements, then send them together
-            fqname = skytools.quote_fqident(ev.extra1)
-            fmt = self.sql_command[ev.type]
-            sql = fmt % (fqname, ev.data)
-
-            self.apply_sql(sql, dst_curs)
-        else:
-            self.stat_increase('ignored_events')
-
-    def handle_urlenc_event(self, ev, dst_curs):
         """handle one truncate event"""
         t = self.get_table_by_name(ev.extra1)
         if not t or not t.interesting(ev, self.cur_tick, self.copy_thread):
             self.stat_increase('ignored_events')
             return
         
-        # parse event
-        pklist = ev.type[2:].split(',')
-        row = skytools.db_urldecode(ev.data)
-        op = ev.type[0]
-        tbl = ev.extra1
-
-        # generate sql
-        if op == 'I':
-            sql = skytools.mk_insert_sql(row, tbl, pklist)
-        elif op == 'U':
-            sql = skytools.mk_update_sql(row, tbl, pklist)
-        elif op == 'D':
-            sql = skytools.mk_delete_sql(row, tbl, pklist)
-        else:
-            raise Exception('bug: bad op')
-
-        self.apply_sql(sql, dst_curs)
+        try:
+            p = self.used_plugins[ev.extra1]
+        except KeyError:
+            p = t.get_plugin()
+            self.used_plugins[ev.extra1] = p
+            p.prepare_batch(self.batch_info, dst_curs)
+     
+        p.process_event(ev, self.apply_sql, dst_curs)
 
     def handle_truncate_event(self, ev, dst_curs):
         """handle one truncate event"""
@@ -525,7 +522,9 @@ class Replicator(CascadedWorker):
 
         fqname = skytools.quote_fqident(ev.extra1)
         sql = "TRUNCATE %s;" % fqname
-        self.apply_sql(sql, dst_curs, True)
+
+        self.flush_sql(dst_curs)
+        dst_curs.execute(sql)
 
     def handle_execute_event(self, ev, dst_curs):
         """handle one EXECUTE event"""
@@ -549,15 +548,14 @@ class Replicator(CascadedWorker):
         q = "select * from londiste.execute_finish(%s, %s)"
         self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False)
 
-    def apply_sql(self, sql, dst_curs, force = False):
-        if force:
-            self.flush_sql(dst_curs)
-
-        self.sql_list.append(sql)
+    def apply_sql(self, sql, dst_curs):
 
+        # how many queries to batch together, drop batching on error
         limit = 200
-        if self.work_state == -1 or force:
+        if self.work_state == -1:
             limit = 0
+
+        self.sql_list.append(sql)
         if len(self.sql_list) >= limit:
             self.flush_sql(dst_curs)
 
@@ -577,10 +575,11 @@ class Replicator(CascadedWorker):
         if ev.type not in ('I', 'U', 'D'):
             raise Exception('bug - bad event type in .interesting')
         t = self.get_table_by_name(ev.extra1)
-        if t:
-            return t.interesting(ev, self.cur_tick, self.copy_thread)
-        else:
+        if not t:
+            return 0
+        if not t.interesting(ev, self.cur_tick, self.copy_thread):
             return 0
+        return 1
 
     def add_set_table(self, dst_curs, tbl):
         """There was new table added to root, remember it."""
index 20101e2acf61e3dd213da513094361a69589638a..f7a4730b16b237de001ae4c7a17dcb696ff254a4 100644 (file)
@@ -8,6 +8,8 @@ import sys, os, re, skytools
 from pgq.cascade.admin import CascadeAdmin
 from skytools.scripting import UsageError
 
+import londiste.handler
+
 __all__ = ['LondisteSetup']
 
 class LondisteSetup(CascadeAdmin):
@@ -37,6 +39,8 @@ class LondisteSetup(CascadeAdmin):
 
         self.set_name = self.queue_name
 
+        londiste.handler.load_handlers(self.cf)
+
     def connection_hook(self, dbname, db):
         if dbname == 'db':
             curs = db.cursor()
@@ -63,6 +67,8 @@ class LondisteSetup(CascadeAdmin):
                     help="pkey,fkeys,indexes")
         p.add_option("--trigger-arg", action="append",
                     help="Custom trigger arg")
+        p.add_option("--handler", action="append",
+                    help="add: Custom handler for table")
         return p
 
     def extra_init(self, node_type, node_db, provider_db):
@@ -163,23 +169,30 @@ class LondisteSetup(CascadeAdmin):
             self.log.warning('Table "%s" missing on subscriber, use --create if necessary' % tbl)
             return
 
+        attrs = {}
+
+        hlist = self.options.handler
+        if hlist:
+            p = londiste.handler.build_handler(tbl, hlist)
+            attrs['handlers'] = ":".join(hlist)
+
         # actual table registration
         tgargs = self.options.trigger_arg # None by default
         q = "select * from londiste.local_add_table(%s, %s, %s)"
         self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs])
+
         if self.options.expect_sync:
             q = "select * from londiste.local_set_table_state(%s, %s, NULL, 'ok')"
             self.exec_cmd(dst_curs, q, [self.set_name, tbl])
         else:
-            attrs = {}
             if self.options.skip_truncate:
                 attrs['skip_truncate'] = 1
             if self.options.copy_condition:
                 attrs['copy_condition'] = self.options.copy_condition
-            if attrs:
-                enc_attrs = skytools.db_urlencode(attrs)
-                q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
-                self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
+        if attrs:
+            enc_attrs = skytools.db_urlencode(attrs)
+            q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
+            self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
         dst_db.commit()
 
     def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
index 07c9594a0a39ba71f9ba63a5a8d68a370a0204ea..5e10c889f9f9e640d719044faf63c1bcec16af8a 100644 (file)
@@ -200,7 +200,13 @@ class CopyTable(Replicator):
         tablename = tbl_stat.name
         # do copy
         self.log.info("%s: start copy" % tablename)
-        w_cond = tbl_stat.table_attrs.get('copy_condition')
+        p = tbl_stat.get_plugin()
+        cond_list = []
+        cond = tbl_stat.table_attrs.get('copy_condition')
+        if cond:
+            cond_list.append(cond)
+        p.prepare_copy(cond_list, dstcurs)
+        w_cond = ' and '.join(cond_list)
         stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list, w_cond)
         if stats:
             self.log.info("%s: copy finished: %d bytes, %d rows" % (