help = "takeover: old node was branch")
p.add_option("--trigger-arg", action="append",
help="add: Custom trigger arg")
+ p.add_option("--handler", action="append",
+ help="add: Custom handler for table")
p.add_option_group(g)
return p
import londiste.setup
import londiste.table_copy
import londiste.repair
+import londiste.handler
from londiste.playback import *
from londiste.compare import *
from londiste.setup import *
from londiste.table_copy import *
from londiste.repair import *
+from londiste.handler import *
__all__ = (
londiste.playback.__all__ +
londiste.compare.__all__ +
+ londiste.handler.__all__ +
londiste.setup.__all__ +
londiste.table_copy.__all__ +
londiste.repair.__all__ )
-
--- /dev/null
+
+"""Table handler.
+
+Per-table decision how to create trigger, copy data and apply events.
+"""
+
+"""
+-- redirect & create table
+partition by batch_time
+partition by date field
+
+-- sql handling:
+cube1 - I/U/D -> partition, insert
+cube2 - I/U/D -> partition, del/insert
+field remap
+name remap
+
+bublin filter
+- replay: filter events
+- copy: additional where
+- add: add trigger args
+
+multimaster
+- replay: conflict handling, add fncall to sql queue?
+- add: add 'backup' arg to trigger
+
+plain londiste:
+- replay: add to sql queue
+
+"""
+
+import sys, skytools
+
+__all__ = ['BaseHandler', 'parse_handler', 'build_handler', 'load_handlers']
+
+class BaseHandler:
+ handler_name = 'fwd'
+ def __init__(self, name, next, args):
+ self.name = name
+ self.next = next
+ self.args = args
+
+ def add(self, trigger_arg_list):
+ """Called when table is added.
+
+ Can modify trigger args.
+ """
+ if self.next:
+ self.next.add(trigger_arg_list)
+
+ def reset(self):
+ """Called before starting to process a batch.
+ Should clean any pending data.
+ """
+ if self.next:
+ self.next.reset()
+
+ def prepare_batch(self, batch_info, dst_curs):
+ """Called on first event for this table in current batch."""
+ if self.next:
+ self.next.prepare_batch(batch_info, dst_curs)
+
+ def process_event(self, ev, sql_queue_func, arg):
+ """Process a event.
+
+ Event should be added to sql_queue or executed directly.
+ """
+ if self.next:
+ self.next.process_event(ev, sql_queue_func, arg)
+
+ def finish_batch(self, batch_info):
+ """Called when batch finishes."""
+ if self.next:
+ self.next.finish_batch(batch_info)
+
+ def prepare_copy(self, expr_list, dst_curs):
+ """Can change COPY behaviour.
+
+ Returns new expr.
+ """
+ if self.next:
+ self.next.prepare_copy(expr_list, dst_curs)
+
+class TableHandler(BaseHandler):
+ handler_name = 'londiste'
+
+ sql_command = {
+ 'I': "insert into %s %s;",
+ 'U': "update only %s set %s;",
+ 'D': "delete from only %s where %s;",
+ }
+
+ def process_event(self, ev, sql_queue_func, arg):
+ if len(ev.type) == 1:
+ # sql event
+ fqname = skytools.quote_fqident(ev.extra1)
+ fmt = self.sql_command[ev.type]
+ sql = fmt % (fqname, ev.data)
+ else:
+ # urlenc event
+ pklist = ev.type[2:].split(',')
+ row = skytools.db_urldecode(ev.data)
+ op = ev.type[0]
+ tbl = ev.extra1
+ if op == 'I':
+ sql = skytools.mk_insert_sql(row, tbl, pklist)
+ elif op == 'U':
+ sql = skytools.mk_update_sql(row, tbl, pklist)
+ elif op == 'D':
+ sql = skytools.mk_delete_sql(row, tbl, pklist)
+
+ sql_queue_func(sql, arg)
+
+_handler_map = {
+ 'londiste': TableHandler,
+}
+
+def register_handler_module(modname):
+ """Import and module and register handlers."""
+ __import__(modname)
+ m = sys.modules[modname]
+ for h in m.__londiste_handlers__:
+ _handler_map[h.handler_name] = h
+
+def build_handler(tblname, hlist):
+ """Execute array of handler initializers."""
+ klist = []
+ for h in hlist:
+ if not h:
+ continue
+ pos = h.find('(')
+ if pos >= 0:
+ if h[-1] != ')':
+ raise Exception("handler fmt error")
+ name = h[:pos].strip()
+ args = h[pos+1 : -1].split(',')
+ args = [a.strip() for a in args]
+ else:
+ name = h
+ args = []
+
+ klass = _handler_map[name]
+ klist.append( (klass, args) )
+
+ # always append default handler
+ klist.append( (TableHandler, []) )
+
+ # link them together
+ p = None
+ klist.reverse()
+ for klass, args in klist:
+ p = klass(tblname, p, args)
+ return p
+
+def parse_handler(tblname, hstr):
+ """Parse and execute string of colon-separated handler initializers."""
+ hlist = hstr.split(':')
+ return build_handler(tblname, hlist)
+
+def load_handlers(cf):
+ """Load and register modules from config."""
+ lst = cf.getlist('handler_modules', [])
+
+ for m in lst:
+ register_handler_module(m)
+
from pgq.cascade.worker import CascadedWorker
+from londiste.handler import *
+
__all__ = ['Replicator', 'TableState',
'TABLE_MISSING', 'TABLE_IN_COPY', 'TABLE_CATCHING_UP',
'TABLE_WANNA_SYNC', 'TABLE_DO_SYNC', 'TABLE_OK']
self.table_attrs = {}
self.copy_role = None
self.dropped_ddl = None
+ self.plugin = None
# except this
self.changed = 0
self.last_tick = 0
self.table_attrs = {}
self.changed = 1
+ self.plugin = None
def change_snapshot(self, str_snapshot, tag_changed = 1):
"""Set snapshot."""
if row['merge_state'] == "?":
self.changed = 1
+ hstr = row.get('handler', '')
+ self.plugin = parse_handler(self.name, hstr)
+
def interesting(self, ev, tick_id, copy_thread):
"""Check if table wants this event."""
if self.last_snapshot_tick < prev_tick:
self.change_snapshot(None)
+ def process_data_event(self, ev, sql_queue_func, batch_info):
+ self.plugin.process_event(ev, sql_queue_func)
+
+ def get_plugin(self):
+ return self.plugin
+
class Replicator(CascadedWorker):
"""Replication core.
#compare_fmt = %(cnt)d rows, checksum=%(chksum)s
"""
- sql_command = {
- 'I': "insert into %s %s;",
- 'U': "update only %s set %s;",
- 'D': "delete from only %s where %s;",
- }
-
# batch info
cur_tick = 0
prev_tick = 0
self.copy_thread = 0
self.set_name = self.queue_name
+ self.used_plugins = {}
self.parallel_copies = self.cf.getint('parallel_copies', 1)
if self.parallel_copies < 1:
raise Exception('Bad value for parallel_copies: %d' % self.parallel_copies)
+ load_handlers(self.cf)
+
def connection_hook(self, dbname, db):
if dbname == 'db':
curs = db.cursor()
if not self.copy_thread:
self.restore_fkeys(dst_db)
+
+ for p in self.used_plugins.values():
+ p.reset()
+ self.used_plugins = {}
+
# now the actual event processing happens.
# they must be done all in one tx in dst side
# and the transaction must be kept open so that
CascadedWorker.process_remote_batch(self, src_db, tick_id, ev_list, dst_db)
self.flush_sql(dst_curs)
+ for p in self.used_plugins.values():
+ p.finish_batch(self.batch_info)
+ self.used_plugins = {}
+
# finalize table changes
self.save_table_state(dst_curs)
if ev.type in ('I', 'U', 'D'):
self.handle_data_event(ev, dst_curs)
elif ev.type[:2] in ('I:', 'U:', 'D:'):
- self.handle_urlenc_event(ev, dst_curs)
+ self.handle_data_event(ev, dst_curs)
elif ev.type == "TRUNCATE":
self.flush_sql(dst_curs)
self.handle_truncate_event(ev, dst_curs)
CascadedWorker.process_remote_event(self, src_curs, dst_curs, ev)
def handle_data_event(self, ev, dst_curs):
- """handle one data event"""
- t = self.get_table_by_name(ev.extra1)
- if t and t.interesting(ev, self.cur_tick, self.copy_thread):
- # buffer SQL statements, then send them together
- fqname = skytools.quote_fqident(ev.extra1)
- fmt = self.sql_command[ev.type]
- sql = fmt % (fqname, ev.data)
-
- self.apply_sql(sql, dst_curs)
- else:
- self.stat_increase('ignored_events')
-
- def handle_urlenc_event(self, ev, dst_curs):
"""handle one truncate event"""
t = self.get_table_by_name(ev.extra1)
if not t or not t.interesting(ev, self.cur_tick, self.copy_thread):
self.stat_increase('ignored_events')
return
- # parse event
- pklist = ev.type[2:].split(',')
- row = skytools.db_urldecode(ev.data)
- op = ev.type[0]
- tbl = ev.extra1
-
- # generate sql
- if op == 'I':
- sql = skytools.mk_insert_sql(row, tbl, pklist)
- elif op == 'U':
- sql = skytools.mk_update_sql(row, tbl, pklist)
- elif op == 'D':
- sql = skytools.mk_delete_sql(row, tbl, pklist)
- else:
- raise Exception('bug: bad op')
-
- self.apply_sql(sql, dst_curs)
+ try:
+ p = self.used_plugins[ev.extra1]
+ except KeyError:
+ p = t.get_plugin()
+ self.used_plugins[ev.extra1] = p
+ p.prepare_batch(self.batch_info, dst_curs)
+
+ p.process_event(ev, self.apply_sql, dst_curs)
def handle_truncate_event(self, ev, dst_curs):
"""handle one truncate event"""
fqname = skytools.quote_fqident(ev.extra1)
sql = "TRUNCATE %s;" % fqname
- self.apply_sql(sql, dst_curs, True)
+
+ self.flush_sql(dst_curs)
+ dst_curs.execute(sql)
def handle_execute_event(self, ev, dst_curs):
"""handle one EXECUTE event"""
q = "select * from londiste.execute_finish(%s, %s)"
self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False)
- def apply_sql(self, sql, dst_curs, force = False):
- if force:
- self.flush_sql(dst_curs)
-
- self.sql_list.append(sql)
+ def apply_sql(self, sql, dst_curs):
+ # how many queries to batch together, drop batching on error
limit = 200
- if self.work_state == -1 or force:
+ if self.work_state == -1:
limit = 0
+
+ self.sql_list.append(sql)
if len(self.sql_list) >= limit:
self.flush_sql(dst_curs)
if ev.type not in ('I', 'U', 'D'):
raise Exception('bug - bad event type in .interesting')
t = self.get_table_by_name(ev.extra1)
- if t:
- return t.interesting(ev, self.cur_tick, self.copy_thread)
- else:
+ if not t:
+ return 0
+ if not t.interesting(ev, self.cur_tick, self.copy_thread):
return 0
+ return 1
def add_set_table(self, dst_curs, tbl):
"""There was new table added to root, remember it."""
from pgq.cascade.admin import CascadeAdmin
from skytools.scripting import UsageError
+import londiste.handler
+
__all__ = ['LondisteSetup']
class LondisteSetup(CascadeAdmin):
self.set_name = self.queue_name
+ londiste.handler.load_handlers(self.cf)
+
def connection_hook(self, dbname, db):
if dbname == 'db':
curs = db.cursor()
help="pkey,fkeys,indexes")
p.add_option("--trigger-arg", action="append",
help="Custom trigger arg")
+ p.add_option("--handler", action="append",
+ help="add: Custom handler for table")
return p
def extra_init(self, node_type, node_db, provider_db):
self.log.warning('Table "%s" missing on subscriber, use --create if necessary' % tbl)
return
+ attrs = {}
+
+ hlist = self.options.handler
+ if hlist:
+ p = londiste.handler.build_handler(tbl, hlist)
+ attrs['handlers'] = ":".join(hlist)
+
# actual table registration
tgargs = self.options.trigger_arg # None by default
q = "select * from londiste.local_add_table(%s, %s, %s)"
self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs])
+
if self.options.expect_sync:
q = "select * from londiste.local_set_table_state(%s, %s, NULL, 'ok')"
self.exec_cmd(dst_curs, q, [self.set_name, tbl])
else:
- attrs = {}
if self.options.skip_truncate:
attrs['skip_truncate'] = 1
if self.options.copy_condition:
attrs['copy_condition'] = self.options.copy_condition
- if attrs:
- enc_attrs = skytools.db_urlencode(attrs)
- q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
- self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
+ if attrs:
+ enc_attrs = skytools.db_urlencode(attrs)
+ q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
+ self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
dst_db.commit()
def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
tablename = tbl_stat.name
# do copy
self.log.info("%s: start copy" % tablename)
- w_cond = tbl_stat.table_attrs.get('copy_condition')
+ p = tbl_stat.get_plugin()
+ cond_list = []
+ cond = tbl_stat.table_attrs.get('copy_condition')
+ if cond:
+ cond_list.append(cond)
+ p.prepare_copy(cond_list, dstcurs)
+ w_cond = ' and '.join(cond_list)
stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list, w_cond)
if stats:
self.log.info("%s: copy finished: %d bytes, %d rows" % (