part handler
authorEgon Valdmees <egon.valdmees@skype.net>
Tue, 14 Jun 2011 13:50:53 +0000 (16:50 +0300)
committerEgon Valdmees <egon.valdmees@skype.net>
Wed, 15 Jun 2011 13:15:07 +0000 (16:15 +0300)
handler adds trigger argument to full extra3 field with hash value and
filter to event_processing and full_copy

python/londiste/handlers/__init__.py
python/londiste/handlers/part.py [new file with mode: 0644]
tests/part/init.sh [new file with mode: 0755]
tests/part/regen.sh [new file with mode: 0755]

index b94935edbe965d32d00e7ae9e2cd6d609c433c41..ef3bc87deff70b17f29ab7739dfa26a3263138ce 100644 (file)
@@ -5,5 +5,6 @@ DEFAULT_HANDLERS = [
     'londiste.handlers.qtable',
     'londiste.handlers.dispatch',
     'londiste.handlers.applyfn',
+    'londiste.handlers.part'
 ]
 
diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/part.py
new file mode 100644 (file)
index 0000000..784162f
--- /dev/null
@@ -0,0 +1,73 @@
+"""
+Experimental event filtering by hash.
+"""
+
+import skytools
+from londiste.handler import TableHandler
+
+__all__ = ['PartHandler']
+
+class PartHandler(TableHandler):
+    handler_name = 'part'
+
+    def __init__(self, table_name, args, log):
+        TableHandler.__init__(self, table_name, args, log)
+        self.max_part = None       # max part number
+        self.local_part = None     # part number of local node
+        self.key = args.get('key')        
+        if self.key is None:
+            raise Exception('Specify key field as key agument')
+
+    def reset(self):
+        """Forget config info."""
+        self.max_part = None
+        self.local_part = None
+        TableHandler.reset(self)
+
+    def add(self, trigger_arg_list):
+        """Let trigger put hash into extra3"""
+
+        arg = "ev_extra3='hash='||hashtext(%s)" % skytools.quote_ident(self.key)
+        trigger_arg_list.append(arg)        
+        TableHandler.add(self, trigger_arg_list)
+
+    def prepare_batch(self, batch_info, dst_curs):
+        """Called on first event for this table in current batch."""
+        if not self.max_part:
+            self.load_part_info(dst_curs)
+        TableHandler.prepare_batch(self, batch_info, dst_curs)
+
+    def process_event(self, ev, sql_queue_func, arg):
+        """Filter event by hash in extra3, apply only local part."""
+        if ev.extra3:
+            meta = skytools.db_urldecode(ev.extra3)
+            self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d' %\
+                           (int(meta['hash']), self.max_part, self.local_part))
+            if (int(meta['hash']) & self.max_part) != self.local_part:
+                self.log.debug('part.process_event: not my event')
+                return
+        self.log.debug('part.process_event: my event, processing')
+        TableHandler.process_event(self, ev, sql_queue_func, arg)
+
+    def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+        """Copy only slots needed locally."""
+        self.load_part_info(dst_curs)
+        fn = 'hashtext(%s)' % skytools.quote_ident(self.key)
+        w = "%s & %d = %d" % (fn, self.max_part, self.local_part)
+        self.log.debug('part: copy_condition=%s' % w)
+        cond_list.append(w)
+
+        return TableHandler.real_copy(self, tablename, src_curs, dst_curs,
+                                     column_list, cond_list)
+
+    def load_part_info(self, curs):
+        """Load slot info from database."""
+        q = "select part_nr, max_part from partconf.conf"
+        curs.execute(q)
+        self.local_part, self.max_part = curs.fetchone()
+        if self.local_part is None or self.max_part is None:
+            raise Exeption('Error loading part info')
+
+# register handler class
+__londiste_handlers__ = [PartHandler]
+
diff --git a/tests/part/init.sh b/tests/part/init.sh
new file mode 100755 (executable)
index 0000000..aa3444d
--- /dev/null
@@ -0,0 +1,16 @@
+#! /bin/sh
+
+. ../env.sh
+
+lst="full1 part1 part2 part3 part4"
+
+../zstop.sh
+
+for db in $lst; do
+  echo dropdb $db
+  dropdb $db
+done
+for db in $lst; do
+  echo createdb $db
+  createdb $db
+done
diff --git a/tests/part/regen.sh b/tests/part/regen.sh
new file mode 100755 (executable)
index 0000000..431c5b3
--- /dev/null
@@ -0,0 +1,179 @@
+#! /bin/sh
+
+. ../testlib.sh
+
+title "Part"
+v=-v
+
+part_list="part1 part2 part3 part4"
+full_list="full1"
+all_list="$part_list $full_list"
+kdb_list="`echo $all_list|sed 's/ /,/g'`"
+
+for db in $all_list; do
+  cleardb $db
+done
+msg "clean logs"
+rm -f log/*.log
+
+msg "Create configs"
+
+# create ticker conf
+cat > conf/pgqd.ini << EOF
+[pgqd]
+database_list = $kdb_list
+logfile = log/pgqd.log
+pidfile = pid/pgqd.pid
+EOF
+
+# full replicas
+for db in $full_list; do
+cat > conf/londiste_$db.ini << EOF
+[londiste3]
+job_name = londiste_$db
+db = dbname=$db
+queue_name = replika
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+EOF
+
+# part replicas
+for dst in $part_list; do
+cat > conf/londiste_${db}_${dst}.ini << EOF
+[londiste3]
+job_name = londiste_${db}_${dst}
+db = dbname=$dst
+queue_name = replika
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+EOF
+done
+done
+
+set -e
+
+msg "Create nodes for full queue"
+run londiste3 $v conf/londiste_full1.ini create-root root_full1 'dbname=full1'
+#run londiste3 $v conf/londiste_full2.ini create-branch branch_full2 'dbname=full2' --provider='dbname=full1'
+
+msg "Create nodes for replicas"
+for dst in $part_list; do
+  for src in $full_list; do
+    run londiste3 $v conf/londiste_${src}_${dst}.ini \
+                    create-leaf leaf_${src}_${dst} "dbname=$dst" \
+                    --provider="dbname=$src"
+  done
+done
+
+#msg "Create nodes for partition root queues"
+#for db in $part_list; do
+#run londiste3 $v conf/londiste_$db.ini create-root root_$db "dbname=$db"
+#done
+
+msg "Tune PgQ"
+for db in $all_list; do
+  run_sql $db "update pgq.queue set queue_ticker_idle_period='3 secs'"
+done
+
+msg "Launch ticker"
+run pgqd $v -d conf/pgqd.ini
+
+msg "Launch londiste replay"
+for db in $full_list; do
+  run londiste3 $v -d conf/londiste_$db.ini worker
+done
+
+msg "Launch merge londiste"
+for src in $full_list; do
+  for dst in $part_list; do
+    run londiste3 $v -d conf/londiste_${src}_${dst}.ini worker
+  done
+done
+
+msg "Create partconf in partition nodes"
+part_count=$(echo $part_list|wc -w)
+max_part=$(( $part_count-1 ))
+i=0
+for db in $part_list; do
+run psql $db <<EOF
+create schema partconf;
+CREATE TABLE partconf.conf (
+    part_nr integer,
+    max_part integer,
+    db_code bigint,
+    is_primary boolean,
+    max_slot integer,
+    cluster_name text
+);
+insert into partconf.conf(part_nr, max_part) values($i, $max_part);
+EOF
+i=$(( $i+1 ))
+done
+
+msg "Create table in root node"
+run_sql full1 "create table mydata (id int4 primary key, data text)"
+
+msg "Insert few rows"
+for n in 1 2 3 4; do
+  run_sql full1 "insert into mydata values ($n, 'foo$n')"
+done
+
+msg "Register table in root node"
+run londiste3 $v conf/londiste_full1.ini add-table mydata --handler=part --handler-arg="key=data"
+
+#msg "Register and create table in branch node"
+#run londiste3 $v conf/londiste_full2.ini add-table mydata --create --handler="part" --handler-arg="key=data"
+
+msg "Wait until add-table events are distributed to leafs"
+parts=$(echo "$part_list"|wc -w)
+for db in $part_list; do
+cnt=0
+while test $cnt -ne 1; do
+ #sleep 5
+ cnt=`psql ${db} -Atc "select count(*) from londiste.table_info"`
+ echo "$db: cnt=$cnt"
+ if [ $cnt != 1 ]; then
+     sleep 5
+ fi
+done
+done
+
+msg "Register table in partition nodes"
+for src in $full_list; do
+  for dst in $part_list; do
+    run londiste3 $v -d conf/londiste_${src}_${dst}.ini add-table mydata --create --handler=part --handler-arg="key=data"
+  done
+done
+
+msg "Wait until tables are sync in part nodes"
+for db in $part_list; do
+cnt=0
+while test $cnt -ne 1; do
+  #sleep 5
+  cnt=`psql -A -t -d $db -c "select count(*) from londiste.table_info where merge_state = 'ok'"`
+  echo "$db: cnt=$cnt"
+   if [ $cnt != 1 ]; then
+      sleep 5
+   fi
+done
+done
+
+msg "Sleep a bit"
+run sleep 10
+
+msg "Insert few rows"
+for n in $(seq 5 10); do
+  run_sql full1 "insert into mydata values ($n, 'foo$n')"
+done
+
+msg "Sleep a bit"
+run sleep 10
+
+msg "Now check if data apprered"
+for db in $part_list; do
+run_sql $db "select * from mydata order by id"
+#run_sql $db "select * from londiste.table_info order by queue_name"
+done
+
+../zcheck.sh
+