fake_local and qsplitter handlers
authorEgon Valdmees <egon.valdmees@skype.net>
Wed, 1 Jun 2011 14:07:36 +0000 (17:07 +0300)
committerEgon Valdmees <egon.valdmees@skype.net>
Tue, 7 Jun 2011 12:43:53 +0000 (15:43 +0300)
added new handlers:
* fake_local  - dummy handler to setup queue tables. All events are ignored. Table structure is not required. Use in branch/leaf
* qsplitter  -  dummy handler to setup queue tables. All events are ignored. Table structure is not required. All table events are inserted to destination queue, specified with handler arg 'queue'.

changed local_add_table to allow adding of non-existing tables

python/londiste/handlers/qtable.py
python/londiste/setup.py
sql/londiste/functions/londiste.local_add_table.sql
tests/qtable/init.sh [new file with mode: 0755]
tests/qtable/regen.sh [new file with mode: 0755]

index f32bb1eb14719cf85f3b4d32527c29175a03a569..a06280bcc6da513f480fc772fbe7526b3cde3dad 100644 (file)
@@ -1,10 +1,21 @@
 """
-Dummy handler to setup queue tables. All events are ignored.
+
+Handlers:
+
+qtable      - dummy handler to setup queue tables. All events are ignored. use in 
+              root node
+fake_local  - dummy handler to setup queue tables. All events are ignored. Table
+              structure is not required. Use in branch/leaf
+qsplitter  -  dummy handler to setup queue tables. All events are ignored. Table
+              structure is not required. All table events are inserted to
+              destination queue, specified with handler arg 'queue'.
+
 """
 
 from londiste.handler import BaseHandler
 
-__all__ = ['QueueTableHandler']
+__all__ = ['QueueTableHandler', 'FakeLocalHandler', 'QueueSplitterHandler']
+
 
 class QueueTableHandler(BaseHandler):
     """Queue table handler. Do nothing"""
@@ -14,9 +25,60 @@ class QueueTableHandler(BaseHandler):
         """Create SKIP and BEFORE INSERT trigger"""
         trigger_arg_list.append('tgflags=BI')
         trigger_arg_list.append('SKIP')
+        trigger_arg_list.append('expect_sync')
+
+    def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+        """Force copy not to start"""
+        return (0,0)
+
+
+
+class FakeLocalHandler(BaseHandler):
+    handler_name = 'fake_local'
+
+    def add(self, trigger_arg_list):
+        trigger_arg_list.append('virtual_table')
+
+
+
+class QueueSplitterHandler(BaseHandler):
+    handler_name = 'qsplitter'
+
+    def __init__(self, table_name, args, log):
+        """Init per-batch table data cache."""
+        BaseHandler.__init__(self, table_name, args, log)
+        try:
+            self.dst_queue_name = args['queue']
+        except KeyError:
+            raise Exception('specify queue with handler-arg')
+        self.rows = []
+
+    def add(self, trigger_arg_list):
+        trigger_arg_list.append('virtual_table')
+
+    def prepare_batch(self, batch_info, dst_curs):
+        """Called on first event for this table in current batch."""
+        self.rows = []
 
     def process_event(self, ev, sql_queue_func, arg):
-        """Ignore events for this table"""
+        """Process a event.
+
+        Event should be added to sql_queue or executed directly.
+        """
+        if self.dst_queue_name is None: return
+
+        data = [ev.type, ev.data,
+                ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time]
+        self.rows.append(data)
+
+    def finish_batch(self, batch_info, dst_curs):
+        """Called when batch finishes."""
+        if self.dst_queue_name is None: return
+
+        fields = ['type', 'data',
+                  'extra1', 'extra2', 'extra3', 'extra4', 'time']
+        pgq.bulk_insert_events(dst_curs, self.rows, fields, self.dst_queue_name)
 
-__londiste_handlers__ = [QueueTableHandler]
 
+__londiste_handlers__ = [QueueTableHandler, FakeLocalHandler,
+                         QueueSplitterHandler]
index 4f2e3d9dc46991559e5f61a54407f7a7a748c772..ac65cb6554248e675ba425ea05d1597b7916202a 100644 (file)
@@ -155,9 +155,6 @@ class LondisteSetup(CascadeAdmin):
                 s = skytools.TableStruct(src_curs, tbl)
                 src_db.commit()
                 s.create(dst_curs, create_flags, log = self.log)
-        elif not tbl_exists:
-            self.log.warning('Table "%s" missing on subscriber, use --create if necessary' % tbl)
-            return
 
         tgargs = []
         if self.options.trigger_arg:
index 3a1d1eddbdcda47133e17b6722c0507f2d5117a9..ba3e31c5269a0abfa9e79a9d7c8f0c936243a17e 100644 (file)
@@ -23,6 +23,10 @@ as $$
 --      skip_truncate   - set 'skip_truncate' table attribute
 --      expect_sync     - set table state to 'ok'
 --      tgflags=X       - trigger creation flags
+--      merge_all       - merge table from all sources. required for 
+--                        multi-source table
+--      skip            - create skip trigger. same as S flag
+--      virtual_table   - skips structure check and trigger creation
 --
 -- Trigger creation flags (default: AIUDL):
 --      I - ON INSERT
@@ -32,6 +36,7 @@ as $$
 --      L - use pgq.logutriga() as trigger function
 --      B - BEFORE
 --      A - AFTER
+--      S - SKIP
 --
 -- Example:
 --      > londiste.local_add_table('q', 'tbl', array['tgflags=BI', 'SKIP', 'pkey=col1,col2'])
@@ -40,12 +45,11 @@ as $$
 --      200 - Ok
 --      301 - Warning, trigger exists that will fire before londiste one
 --      400 - No such set
--- ----------------------------------------------------------------------
+------------------------------------------------------------------------
 declare
     col_types text;
     fq_table_name text;
     new_state text;
-
     trunctrg_name text;
     pgversion int;
     logtrg_previous text;
@@ -54,49 +58,102 @@ declare
     lg_pos text;
     lg_event text;
     lg_args text;
+    _extra_args text;
     tbl record;
     i integer;
+    j integer;
     sql text;
     arg text;
     _node record;
     _tbloid oid;
-    _extra_args text;
+    -- skip trigger
     _skip_prefix text := 'zzz_';
     _skip_trg_count integer;
     _skip_trg_name text;
+    -- check local tables from all sources
+    _queue_name text;
+    _local boolean;
+    -- array with all tgflags values
+    _check_flags char[] := array['B','A','Q','L','I','U','D','S'];
+    -- given tgflags array
+    _tgflags char[];
+    -- ordinary argument array
+    _args text[];
+    -- argument flags
+    _expect_sync boolean := false;
     _merge_all boolean := false;
     _skip_truncate boolean := false;
     _no_triggers boolean := false;
     _skip boolean := false;
-    _queue_name text;
-    _local boolean;
+    _virtual_table boolean := false;
 begin
-    _extra_args := '';
-    fq_table_name := londiste.make_fqname(i_table_name);
-    _tbloid := londiste.find_table_oid(fq_table_name);
-    if _tbloid is null then
-        select 404, 'Table does not exist: ' || fq_table_name into ret_code, ret_note;
-        return;
+
+    -------- i_trg_args ARGUMENTS PARSING
+
+    if array_lower(i_trg_args, 1) is not null then
+        for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
+            arg := i_trg_args[i];
+            if arg like 'tgflags=%' then
+                -- special flag handling
+                arg := upper(substr(arg, 9));
+                for j in array_lower(_check_flags, 1) .. array_upper(_check_flags, 1) loop
+                    if position(_check_flags[j] in arg) > 0 then
+                        _tgflags := array_append(_tgflags, _check_flags[j]);
+                    end if;
+                end loop;
+            elsif arg = 'expect_sync' then
+                _expect_sync := true;
+            elsif arg = 'skip_truncate' then
+                _skip_truncate := true;
+            elsif arg = 'no_triggers' then
+                _no_triggers := true;
+            elsif arg = 'merge_all' then
+                _merge_all = true;
+            elsif lower(arg) = 'skip' then
+                _skip := true;
+            elsif arg = 'virtual_table' then
+                _virtual_table := true;
+                _expect_sync := true;   -- do not copy
+                _no_triggers := true;   -- do not create triggers
+            else
+                -- ordinary arg
+                _args = array_append(_args, quote_literal(arg));
+            end if;
+        end loop;
     end if;
-    col_types := londiste.find_column_types(fq_table_name);
-    if position('k' in col_types) < 1 then
-        -- allow missing primary key in case of combined table where
-        -- pkey was removed by londiste
-        perform 1 from londiste.table_info t,
-            pgq_node.node_info n_this,
-            pgq_node.node_info n_other
-          where n_this.queue_name = i_queue_name
-            and n_other.combined_queue = n_this.combined_queue
-            and n_other.queue_name <> n_this.queue_name
-            and t.queue_name = n_other.queue_name
-            and t.table_name = fq_table_name
-            and t.dropped_ddl is not null;
-        if not found then
-            select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
+
+    fq_table_name := londiste.make_fqname(i_table_name);
+
+    -------- TABLE STRUCTURE CHECK
+
+    if not _virtual_table then
+        _tbloid := londiste.find_table_oid(fq_table_name);
+        if _tbloid is null then
+            select 404, 'Table does not exist: ' || fq_table_name into ret_code, ret_note;
             return;
         end if;
+        col_types := londiste.find_column_types(fq_table_name);
+        if position('k' in col_types) < 1 then
+            -- allow missing primary key in case of combined table where
+            -- pkey was removed by londiste
+            perform 1 from londiste.table_info t,
+                pgq_node.node_info n_this,
+                pgq_node.node_info n_other
+              where n_this.queue_name = i_queue_name
+                and n_other.combined_queue = n_this.combined_queue
+                and n_other.queue_name <> n_this.queue_name
+                and t.queue_name = n_other.queue_name
+                and t.table_name = fq_table_name
+                and t.dropped_ddl is not null;
+            if not found then
+                select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
+                return;
+            end if;
+        end if;
     end if;
 
+    -------- TABLE REGISTRATION LOGIC
+
     select * from pgq_node.get_node_info(i_queue_name) into _node;
     if not found or _node.ret_code >= 400 then
         select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
@@ -136,7 +193,7 @@ begin
         perform londiste.root_notify_change(i_queue_name, 'londiste.add-table', fq_table_name);
     elsif _node.node_type = 'leaf' and _node.combined_type = 'branch' then
         new_state := 'ok';
-    elsif 'expect_sync' = any (i_trg_args) then
+    elsif _expect_sync then
         new_state := 'ok';
     else
         new_state := NULL;
@@ -150,61 +207,6 @@ begin
         raise exception 'lost table: %', fq_table_name;
     end if;
 
-    -- new trigger
-    lg_name := '_londiste_' || i_queue_name;
-    lg_func := 'pgq.logutriga';
-    lg_event := '';
-    lg_args := quote_literal(i_queue_name);
-    lg_pos := 'after';
-
-    -- parse extra args
-    if array_lower(i_trg_args, 1) is not null then
-        for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
-            arg := i_trg_args[i];
-            if arg like 'tgflags=%' then
-                -- special flag handling
-                arg := upper(substr(arg, 9));
-                if position('B' in arg) > 0 then
-                    lg_pos := 'before';
-                end if;
-                if position('A' in arg) > 0 then
-                    lg_pos := 'after';
-                end if;
-                if position('Q' in arg) > 0 then
-                    lg_func := 'pgq.sqltriga';
-                end if;
-                if position('L' in arg) > 0 then
-                    lg_func := 'pgq.logutriga';
-                end if;
-                if position('I' in arg) > 0 then
-                    lg_event := lg_event || ' or insert';
-                end if;
-                if position('U' in arg) > 0 then
-                    lg_event := lg_event || ' or update';
-                end if;
-                if position('D' in arg) > 0 then
-                    lg_event := lg_event || ' or delete';
-                end if;
-                if position('S' in arg) > 0 then
-                    _skip := true;
-                end if;
-            elsif arg = 'expect_sync' then
-                -- already handled
-            elsif arg = 'skip_truncate' then
-                _skip_truncate := true;
-            elsif arg = 'no_triggers' then
-                _no_triggers := true;
-            elsif arg = 'merge_all' then
-                _merge_all = true;
-            elsif lower(arg) = 'skip' then
-                _skip := true;
-            else
-                -- ordinary arg
-                lg_args := lg_args || ', ' || quote_literal(arg);
-            end if;
-        end loop;
-    end if;
-
     -- merge all table sources on leaf
     if _node.node_type = 'leaf' then
         for _queue_name, _local in
@@ -216,7 +218,6 @@ begin
             left join londiste.table_info t2 on (t2.table_name = t.table_name and t2.queue_name = n2.queue_name)
             where t.queue_name = i_queue_name
               and t.table_name = fq_table_name
-              -- and t2.local = false
               and t2.queue_name != i_queue_name -- skip self
         loop
             -- if table from some other source is already marked as local,
@@ -247,7 +248,55 @@ begin
                 raise exception 'lost table: %', fq_table_name;
             end if;
         end loop;
+    end if;
+
+    if _skip_truncate then
+        perform 1
+        from londiste.local_set_table_attrs(i_queue_name,
+                                            fq_table_name,
+                                            'skip_truncate=1');
+    end if;
 
+    -------- TRIGGER LOGIC
+
+    -- new trigger
+    _extra_args := '';
+    lg_name := '_londiste_' || i_queue_name;
+    lg_func := 'pgq.logutriga';
+    lg_event := '';
+    lg_args := quote_literal(i_queue_name);
+    lg_pos := 'after';
+
+    if array_lower(_args, 1) is not null then
+        lg_args := lg_args || ', ' || array_to_string(_args, ', ');
+    end if;
+
+    if 'B' = any(_tgflags) then
+        lg_pos := 'before';
+    end if;
+    if 'A' = any(_tgflags)  then
+        lg_pos := 'after';
+    end if;
+    if 'Q' = any(_tgflags) then
+        lg_func := 'pgq.sqltriga';
+    end if;
+    if 'L' = any(_tgflags) then
+        lg_func := 'pgq.logutriga';
+    end if;
+    if 'I' = any(_tgflags) then
+        lg_event := lg_event || ' or insert';
+    end if;
+    if 'U' = any(_tgflags) then
+        lg_event := lg_event || ' or update';
+    end if;
+    if 'D' = any(_tgflags) then
+        lg_event := lg_event || ' or delete';
+    end if;
+    if 'S' = any(_tgflags) then
+        _skip := true;
+    end if;
+
+    if _node.node_type = 'leaf' then
         -- on weird leafs the trigger funcs may not exist
         perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
             where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
@@ -259,7 +308,7 @@ begin
         _extra_args := ', ' || quote_literal('deny');
     end if;
 
-    -- if skip param given
+    -- if skip param given, rename previous skip triggers and prefix current
     if _skip then
         -- get count and name of existing skip triggers
         select count(*), min(t.tgname)
@@ -293,19 +342,13 @@ begin
         where tgrelid = londiste.find_table_oid(fq_table_name)
             and tgname = lg_name;
     if not found then
+
         if _no_triggers then
             select 200, 'Table added with no triggers: ' || fq_table_name
             into ret_code, ret_note;
             return;
         end if;
 
-        if _skip_truncate then
-            perform 1
-            from londiste.local_set_table_attrs(i_queue_name,
-                                                fq_table_name,
-                                                'skip_truncate=1');
-        end if;
-
         -- finalize event
         lg_event := substr(lg_event, 4);
         if lg_event = '' then
@@ -321,7 +364,7 @@ begin
         execute sql;
     end if;
 
-    -- create tRuncate trigger if it does not exists already
+    -- create truncate trigger if it does not exists already
     show server_version_num into pgversion;
     if pgversion >= 80400 then
         trunctrg_name  := '_londiste_' || i_queue_name || '_truncate';
diff --git a/tests/qtable/init.sh b/tests/qtable/init.sh
new file mode 100755 (executable)
index 0000000..b7b98d2
--- /dev/null
@@ -0,0 +1,16 @@
+#! /bin/sh
+
+. ../env.sh
+
+db_list="hsrc hdst"
+
+for db in $db_list; do
+  echo dropdb $db
+  dropdb $db
+done
+
+
+for db in $db_list; do
+  echo createdb $db
+  createdb $db
+done
diff --git a/tests/qtable/regen.sh b/tests/qtable/regen.sh
new file mode 100755 (executable)
index 0000000..f602eef
--- /dev/null
@@ -0,0 +1,122 @@
+#! /bin/bash
+
+. ../testlib.sh
+
+../zstop.sh
+
+v='-v'
+
+# bulkloader method
+meth=0
+
+db_list="hsrc hdst"
+
+kdb_list=`echo $db_list | sed 's/ /,/g'`
+
+#( cd ../..; make -s install )
+
+echo " * create configs * "
+
+# create ticker conf
+cat > conf/pgqd.ini <<EOF
+[pgqd]
+database_list = $kdb_list
+logfile = log/pgqd.log
+pidfile = pid/pgqd.pid
+EOF
+
+# londiste3 configs
+for db in $db_list; do
+cat > conf/londiste_$db.ini <<EOF
+[londiste3]
+job_name = londiste_$db
+db = dbname=$db
+queue_name = replika
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+EOF
+done
+
+for db in $db_list; do
+  cleardb $db
+done
+
+clearlogs
+
+set -e
+
+msg "Basic config"
+run cat conf/pgqd.ini
+run cat conf/londiste_hsrc.ini
+
+msg "Install londiste3 and initialize nodes"
+run londiste3 $v conf/londiste_hsrc.ini create-root hsrc 'dbname=hsrc'
+run londiste3 $v conf/londiste_hdst.ini create-leaf hdst 'dbname=hdst' --provider='dbname=hsrc'
+for db in $db_list; do
+  run_sql $db "update pgq.queue set queue_ticker_idle_period='5 secs'"
+done
+
+msg "Run ticker"
+run pgqd -d conf/pgqd.ini
+run sleep 5
+
+msg "See topology"
+run londiste3 $v conf/londiste_hsrc.ini status
+
+msg "Run londiste3 daemon for each node"
+for db in $db_list; do
+  run londiste3 $v -d conf/londiste_$db.ini replay
+done
+
+msg "Create table on root node and fill couple of rows"
+run_sql hsrc "create table mytable (id int4 primary key, data text, tstamp timestamptz default now())"
+for n in 1 2 3; do
+  run_sql hsrc "insert into mytable values ($n, 'row$n')"
+done
+
+msg "Register table on root node"
+run londiste3 $v conf/londiste_hsrc.ini add-table mytable
+
+msg "Create queue replika on hdst"
+run_sql hdst "select pgq.create_queue('replika')"
+
+msg "Register table on other node with creation, qtable->replika handler"
+run londiste3 $v conf/londiste_hdst.ini add-table mytable --handler=qsplitter --handler-arg="queue=replika"
+
+msg "Wait until table is in sync"
+cnt=0
+while test $cnt -ne 1; do
+  sleep 3
+  cnt=`psql -A -t -d hdst -c "select count(*) from londiste.table_info where merge_state = 'ok'"`
+  echo "  cnt=$cnt"
+done
+
+msg "Do some updates"
+run_sql hsrc "insert into mytable values (5, 'row5')"
+run_sql hsrc "update mytable set data = 'row5x' where id = 5"
+
+run_sql hsrc "insert into mytable values (6, 'row6')"
+run_sql hsrc "delete from mytable where id = 6"
+
+run_sql hsrc "insert into mytable values (7, 'row7')"
+run_sql hsrc "update mytable set data = 'row7x' where id = 7"
+run_sql hsrc "delete from mytable where id = 7"
+
+run_sql hsrc "delete from mytable where id = 1"
+run_sql hsrc "update mytable set data = 'row2x' where id = 2"
+
+run sleep 5
+
+msg "Check status"
+run londiste3 $v conf/londiste_hsrc.ini status
+
+run sleep 5
+
+tbl=$(psql hdst -qAtc "select * from pgq.current_event_table('replika');")
+msg "Check queue 'replika' form table $tbl"
+run_sql hdst "select * from $tbl"
+
+#run_sql hdst 'select * from mytable order by id'
+
+../zcheck.sh
+