# handlers module
+import new
+import sys
DEFAULT_HANDLERS = [
'londiste.handlers.bulk',
'londiste.handlers.qtable',
'londiste.handlers.dispatch',
'londiste.handlers.applyfn',
- 'londiste.handlers.part'
+ 'londiste.handlers.part',
+ 'londiste.handlers.multimaster',
]
+def handler_args(name, cls):
+ """Handler arguments initialization decorator
+
+ Define successor for handler class cls with func as argument generator
+ """
+ def wrapper(func):
+ def _init_override(self, table_name, args, log):
+ cls.__init__(self, table_name, func(args.copy()), log)
+ dct = {'__init__': _init_override, 'handler_name': name}
+ module = sys.modules[cls.__module__]
+ newname = '%s_%s' % (cls.__name__, name.replace('.','_'))
+ newcls = new.classobj(newname, (cls,), dct)
+ setattr(module, newname, newcls)
+ module.__londiste_handlers__.append(newcls)
+ module.__all__.append(newname)
+ return func
+ return wrapper
+
+def update(*p):
+ """ Update dicts given in params with its precessor param dict
+ in reverse order """
+ return reduce(lambda x, y: x.update(y) or x,
+ (p[i] for i in range(len(p)-1,-1,-1)), {})
qfn = skytools.quote_fqident(fn)
qargs = [skytools.quote_literal(a) for a in args]
sql = "select %s(%s);" % (qfn, ', '.join(qargs))
-
+ self.log.debug('applyfn.sql: %s' % sql)
sql_queue_func(sql, qfunc_arg)
+#------------------------------------------------------------------------------
+# register handler class
+#------------------------------------------------------------------------------
+
__londiste_handlers__ = [ApplyFuncHandler]
+
import sys
import datetime
-import new
import codecs
import re
import skytools
from skytools import quote_ident, quote_fqident, UsageError
from skytools.dbstruct import *
from skytools.utf8 import safe_utf8_decode
+from functools import partial
+from londiste.handlers import handler_args, update
+
__all__ = ['Dispatcher']
__londiste_handlers__ = [Dispatcher]
-
#------------------------------------------------------------------------------
# helper function for creating dispachers with different default values
#------------------------------------------------------------------------------
-
-def handler(name):
- def wrapper(func):
- def _init_override(self, table_name, args, log):
- Dispatcher.__init__(self, table_name, func(args.copy()), log)
- dct = {'__init__': _init_override, 'handler_name': name}
- clsname = 'Dispatcher_%s' % name.replace('.','_')
- cls = new.classobj(clsname, (Dispatcher,), dct)
- setattr(sys.modules[__name__], clsname, cls)
- __londiste_handlers__.append(cls)
- __all__.append(clsname)
- return func
- return wrapper
-
-
-def update(*p):
- """ Update dicts given in params with its precessor param dict
- in reverse order """
- return reduce(lambda x, y: x.update(y) or x,
- (p[i] for i in range(len(p)-1,-1,-1)), {})
-
-
+handler_args = partial(handler_args, cls=Dispatcher)
#------------------------------------------------------------------------------
# build set of handlers with different default values for easier use
def create_handler():
handler_name = '_'.join(p for p in (load, period, mode) if p)
default = update(mode_dict, period_dict, load_dict, BASE)
- @handler(handler_name)
+ @handler_args(handler_name)
def handler_func(args):
return update(args, default)
create_handler()
-@handler('bulk_direct')
+@handler_args('bulk_direct')
def bulk_direct_handler(args):
return update(args, {'load_mode': 'bulk', 'table_mode': 'direct'})
-@handler('direct')
+@handler_args('direct')
def direct_handler(args):
return update(args, {'load_mode': 'direct', 'table_mode': 'direct'})
--- /dev/null
+#!/usr/bin/env python
+# encoding: utf-8
+"""
+Handler for replica with multiple master nodes.
+
+Can only handle initial copy from one master. Add other masters with
+expect-sync option.
+
+NB! needs merge_on_time function to be compiled on database first.
+"""
+
+import skytools
+from londiste.handlers.applyfn import ApplyFuncHandler
+from londiste.handlers import update
+
+__all__ = ['MultimasterHandler']
+
+class MultimasterHandler(ApplyFuncHandler):
+ """Handle multimaster replicas"""
+ handler_name = 'multimaster'
+
+ def __init__(self, table_name, args, log):
+ """Init per-batch table data cache."""
+ conf = args.copy()
+ # remove Multimaster args from conf
+ for name in ['func_name','func_conf']:
+ if name in conf:
+ conf.pop(name)
+ conf = skytools.db_urlencode(conf)
+ args = update(args, {'func_name': 'merge_on_time', 'func_conf': conf})
+ ApplyFuncHandler.__init__(self, table_name, args, log)
+
+ def add(self, trigger_arg_list):
+ """Create SKIP and BEFORE INSERT trigger"""
+ trigger_arg_list.append('no_merge')
+
+
+#------------------------------------------------------------------------------
+# register handler class
+#------------------------------------------------------------------------------
+
+__londiste_handlers__ = [MultimasterHandler]
Merge function to be used with londiste 'applyfn' handler.
-londiste3 add-table foo --handler="applyfn(func_name=merge_on_time,func_conf=timefield%3dmodified_date)"
+londiste3 add-table foo --handler=applyfn --handler-arg="func_name=merge_on_time" --handler-arg="func_conf=timefield=modified_date"
timecol timestamp
);
-- insert to empty
-select merge_on_time('timefield=timecol', 'I:intcol', 'intcol=5&txtcol=v1&timecol=2010-09-09+12:12', 'mergetest', null, null, null);
+select merge_on_time('timefield=timecol', null, null, null, null, null, 'I:intcol', 'intcol=5&txtcol=v1&timecol=2010-09-09+12:12', 'mergetest', null, null, null);
merge_on_time
---------------------
data ok, no old row
(1 row)
-- insert to with time earlier
-select merge_on_time('timefield=timecol', 'I:intcol', 'intcol=5&txtcol=v2&timecol=2010-09-08+12:12', 'mergetest', null, null, null);
+select merge_on_time('timefield=timecol', null, null, null, null, null, 'I:intcol', 'intcol=5&txtcol=v2&timecol=2010-09-08+12:12', 'mergetest', null, null, null);
merge_on_time
---------------------------------------------------
IGN:data ok, old row, current row more up-to-date
(1 row)
-- insert to with time later
-select merge_on_time('timefield=timecol', 'I:intcol', 'intcol=5&txtcol=v3&timecol=2010-09-10+12:12', 'mergetest', null, null, null);
+select merge_on_time('timefield=timecol', null, null, null, null, null, 'I:intcol', 'intcol=5&txtcol=v3&timecol=2010-09-10+12:12', 'mergetest', null, null, null);
merge_on_time
----------------------------------
data ok, old row, new row better
-
create or replace function merge_on_time(
fn_conf text,
+ cur_tick text,
+ ev_id text,
+ ev_time text,
+ ev_txid text,
+ ev_retry text,
ev_type text,
ev_data text,
ev_extra1 text,
import pkgloader
pkgloader.require('skytools', '3.0')
from skytools.plpy_applyrow import ts_conflict_handler
-
+ args = [fn_conf, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4]
return ts_conflict_handler(SD, args)
except:
import traceback
$$ language plpythonu;
--- select merge_on_time('timefield=modified_date', 'I:id_ccard', 'key_user=foo&id_ccard=1&modified_date=2005-01-01', 'ccdb.ccard', '', '', '');
-
+-- select merge_on_time('timefield=modified_date', 'I:id_ccard', 'key_user=foo&id_ccard=1&modified_date=2005-01-01', 'ccdb.ccard', '', '', '');
\ No newline at end of file
);
-- insert to empty
-select merge_on_time('timefield=timecol', 'I:intcol', 'intcol=5&txtcol=v1&timecol=2010-09-09+12:12', 'mergetest', null, null, null);
+select merge_on_time('timefield=timecol', null, null, null, null, null, 'I:intcol', 'intcol=5&txtcol=v1&timecol=2010-09-09+12:12', 'mergetest', null, null, null);
select * from mergetest;
-- insert to with time earlier
-select merge_on_time('timefield=timecol', 'I:intcol', 'intcol=5&txtcol=v2&timecol=2010-09-08+12:12', 'mergetest', null, null, null);
+select merge_on_time('timefield=timecol', null, null, null, null, null, 'I:intcol', 'intcol=5&txtcol=v2&timecol=2010-09-08+12:12', 'mergetest', null, null, null);
select * from mergetest;
-- insert to with time later
-select merge_on_time('timefield=timecol', 'I:intcol', 'intcol=5&txtcol=v3&timecol=2010-09-10+12:12', 'mergetest', null, null, null);
+select merge_on_time('timefield=timecol', null, null, null, null, null, 'I:intcol', 'intcol=5&txtcol=v3&timecol=2010-09-10+12:12', 'mergetest', null, null, null);
select * from mergetest;
--- /dev/null
+#! /bin/sh
+
+. ../env.sh
+
+lst="src1 src2 dst"
+
+../zstop.sh
+
+for db in $lst; do
+ echo dropdb $db
+ dropdb $db
+done
+for db in $lst; do
+ echo createdb $db
+ createdb $db
+done
--- /dev/null
+#! /bin/bash
+
+. ../testlib.sh
+
+../zstop.sh
+
+v='-v'
+
+# bulkloader method
+meth=0
+
+src_db_list="src1 src2"
+dst_db_list="dst"
+db_list="$src_db_list $dst_db_list"
+kdb_list=`echo $db_list | sed 's/ /,/g'`
+
+#( cd ../..; make -s install )
+
+echo " * create configs * "
+
+# create ticker conf
+cat > conf/pgqd.ini <<EOF
+[pgqd]
+database_list = $kdb_list
+logfile = log/pgqd.log
+pidfile = pid/pgqd.pid
+EOF
+
+# londiste configurations
+for db in $src_db_list; do
+
+# londiste on source
+cat > conf/londiste_$db.ini << EOF
+[londiste3]
+job_name = londiste_$db
+db = dbname=$db
+queue_name = replika_$db
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+EOF
+
+# londiste on source to target
+for dst in $dst_db_list; do
+cat > conf/londiste_${db}_${dst}.ini << EOF
+[londiste3]
+job_name = londiste_${db}_${dst}
+db = dbname=$dst
+queue_name = replika_$db
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+EOF
+
+done
+done
+
+for db in $db_list; do
+ cleardb $db
+done
+
+clearlogs
+
+set -e
+
+msg "Install londiste3 and initialize nodes"
+
+for db in $src_db_list; do
+run londiste3 $v conf/londiste_$db.ini create-root $db "dbname=$db"
+for dst in $dst_db_list; do
+run londiste3 $v conf/londiste_${db}_${dst}.ini create-leaf $dst "dbname=$dst" --provider="dbname=$db"
+done
+done
+
+for db in $db_list; do
+ run_sql $db "update pgq.queue set queue_ticker_idle_period='5 secs'"
+done
+
+msg "Run ticker"
+run pgqd -d conf/pgqd.ini
+run sleep 5
+
+msg "See topology"
+for db in $src_db_list; do
+run londiste3 $v conf/londiste_$db.ini status
+done
+
+msg "Run londiste3 daemon for each node"
+for db in $src_db_list; do
+run londiste3 $v -d conf/londiste_$db.ini replay
+for dst in $dst_db_list; do
+run londiste3 $v -d conf/londiste_${db}_${dst}.ini replay
+done
+done
+
+for db in $dst_db_list; do
+ run createlang -d $db plpythonu
+ run psql $db -f ../../sql/conflicthandler/merge_on_time.sql
+done
+
+msg "Create table on root nodes, fill couple of rows and register"
+for db in $src_db_list; do
+run_sql $db "create table mytable (id int4 primary key, data text, tstamp timestamptz default now())"
+for n in 1 2 3; do
+ run_sql $db "insert into mytable values ($n, 'row$n')"
+done
+run londiste3 $v conf/londiste_$db.ini add-table mytable
+done
+
+sleep 10
+
+msg "Register table on dst node with creation"
+#run londiste3 $v conf/londiste_src1_dst.ini add-table mytable --create --no-merge --handler=applyfn --handler-arg="func_name=merge_on_time" --handler-arg="func_conf=timefield=tstamp"
+run londiste3 $v conf/londiste_src1_dst.ini add-table mytable --create --handler=multimaster --handler-arg="timefield=tstamp"
+sleep 10
+#run londiste3 $v conf/londiste_src2_dst.ini add-table mytable --expect-sync --no-merge --handler=applyfn --handler-arg="func_name=merge_on_time" --handler-arg="func_conf=timefield=tstamp"
+run londiste3 $v conf/londiste_src2_dst.ini add-table mytable --expect-sync --handler=multimaster --handler-arg="timefield=tstamp"
+
+
+for db in $src_db_list; do
+for n in 4 5 6; do
+ run_sql $db "insert into mytable values ($n, 'row$n::$db')"
+done
+sleep 3
+done
+
+
+for n in 2 3 4; do
+ run_sql src1 "update mytable set data = 'ok', tstamp = now() where id = $n"
+done
+
+for n in 1 5 6; do
+ run_sql src2 "update mytable set data = 'ok', tstamp = now() where id = $n"
+done
+
+run sleep 10
+
+for dst in $dst_db_list; do
+run_sql $dst "select * from mytable"
+done
+
+../zcheck.sh
+