OCM-2279: londiste3 should translate everything to utf8, in a lossy way
authorEgon Valdmees <egon.valdmees@skype.net>
Thu, 5 May 2011 12:37:54 +0000 (15:37 +0300)
committerMarko Kreen <markokr@gmail.com>
Wed, 11 May 2011 09:39:40 +0000 (12:39 +0300)
where appropriate

* added argument 'encoding' to dispatcher handler
* tests for invalid utf8 sequences
* support for renamed table copy in dispatcher handler

python/londiste/bublin.py
python/londiste/handler.py
python/londiste/handlers/dispatch.py
python/londiste/table_copy.py
python/skytools/sqltools.py
tests/handler/init.sh
tests/handler/regen.sh
tests/noqueue_merge/regen.sh

index 05ffc80e8c4261440b43d172889a012d069b5f07..78e2ae5ce9f6a1abd90325d196820fcc7e45f99f 100644 (file)
@@ -48,16 +48,17 @@ class Bublin(BaseHandler):
                 return
         BaseHandler.process_event(self, ev, sql_queue_func, arg)
 
-    def prepare_copy(self, expr_list, dst_curs):
+    def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
         """Copy only slots needed locally."""
         self.load_bubbles(dst_curs)
 
         slist = self.bubbles_local_slots.keys()
         fn = 'hashtext(%s)' % skytools.quote_ident(self.key)
         w = "(((%s) & %d) in (%s))" % (fn, self.bubbles_max_slot, slist)
-        expr_list.append(w)
+        cond_list.append(w)
 
-        BaseHandler.prepare_copy(self, expr_list, dst_curs)
+        return BaseHandler.real_copy(self, tablename, src_curs, dst_curs,
+                                     column_list, cond_list)
 
     def load_bubbles(self, curs):
         """Load slot info from database."""
index 7477675fe0ea1e6c8f999f322cd58a5ebd6785d0..a0c72b0742ea2db13ad28ea607b72f4cf88a1ad2 100644 (file)
@@ -99,12 +99,13 @@ class BaseHandler:
         """Called when batch finishes."""
         pass
 
-    def prepare_copy(self, expr_list, dst_curs):
-        """Can change COPY behaviour.
-
-        Returns new expr.
+    def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+        """do actual table copy and return tuple with number of bytes and rows
+        copyed
         """
-        pass
+        condition = ' and '.join(cond_list)
+        return skytools.full_copy(tablename, src_curs, dst_curs, column_list,
+                                  condition)
 
 class TableHandler(BaseHandler):
     """Default Londiste handler, inserts events into tables with plain SQL."""
index cdb6086cb3e4f5c617aad208fdf952e3626cbc51..72704ac2ec443d3624087c9f121ed384319fbcd7 100644 (file)
@@ -34,6 +34,7 @@
 * bulk_yearly_batch
 * bulk_yearly_field
 * bulk_yearly_time
+* bulk_direct - functionally identical to bulk
 
 == HANDLER ARGUMENTS ==
 
@@ -130,6 +131,10 @@ post_part:
     sql statement(s) to execute after creating partition table. Usable
     variables are the same as in part_template
 
+encoding:
+    name of destination encoding. handler replaces all invalid encoding symbols
+    and logs them as warnings
+
 NB! londiste3 does not currently support table renaming and field mapping when
 creating or coping initial data to destination table.  --expect-sync and
 --skip-truncate should be used and --create switch is to be avoided.
@@ -138,6 +143,7 @@ creating or coping initial data to destination table.  --expect-sync and
 import sys
 import datetime
 import new
+import codecs
 import skytools
 from londiste.handler import BaseHandler
 from skytools import quote_ident, quote_fqident, UsageError
@@ -172,9 +178,10 @@ PART_FUNC_CALL = 'select %s(%s)' % (PART_FUNC,
         ', '.join('%%(%s)s' % arg for arg in PART_FUNC_ARGS))
 
 
-#----------------------------------------
+
+#------------------------------------------------------------------------------
 # LOADERS
-#----------------------------------------
+#------------------------------------------------------------------------------
 
 
 class BaseLoader:
@@ -449,7 +456,7 @@ class BulkLoader(BaseBulkTempLoader):
             else:
                 # fscking problems with long-lived temp tables
                 self.drop(curs)
-                
+
     def create_temp(self, curs):
         """ check if temp table exists. Returns False if using existing temp
         table and True if creating new
@@ -463,7 +470,7 @@ class BulkLoader(BaseBulkTempLoader):
         return True
 
     def bulk_insert(self, curs, data, table = None):
-        """Copy data to table. If table not provided, use temp table. 
+        """Copy data to table. If table not provided, use temp table.
         When re-using existing temp table, it is always truncated first and
         analyzed after copy.
         """
@@ -472,10 +479,10 @@ class BulkLoader(BaseBulkTempLoader):
         _use_temp = table is None
         # if table not specified use temp
         if _use_temp:
-            table = self.temp            
+            table = self.temp
             # truncate when re-using existing table
             if not self.create_temp(curs):
-                self.truncate(curs)        
+                self.truncate(curs)
         self.log.debug("bulk: COPY %d rows into %s" % (len(data), table))
         skytools.magic_insert(curs, table, data, self.fields,
                               quoted_table = True)
@@ -505,9 +512,10 @@ class BulkLoader(BaseBulkTempLoader):
 LOADERS = {'direct': DirectLoader, 'bulk': BulkLoader}
 
 
-#----------------------------------------
+
+#------------------------------------------------------------------------------
 # ROW HANDLERS
-#----------------------------------------
+#------------------------------------------------------------------------------
 
 
 class RowHandler:
@@ -562,16 +570,61 @@ ROW_HANDLERS = {'plain': RowHandler,
                 'keep_latest': KeepLatestRowHandler}
 
 
-#----------------------------------------
-# DISPATCHER
-#----------------------------------------
 
-class AttrDict(dict):
-    """Dict with values accessible with attributes"""
-    def __getattr__(self, name):
-        return self[name]
-    def __setattr__(self, name, value):
-        self[name] = value
+#------------------------------------------------------------------------------
+# ENCODING VALIDATOR
+#------------------------------------------------------------------------------
+
+
+class EncodingValidator:
+    def __init__(self, log, encoding = 'utf-8', replacement = u'\ufffd'):
+        self.log = log
+        self.encoding = encoding
+        self.replacement = replacement
+        self.columns = None
+        self.error_count = 0
+        codecs.register_error("error_handler", self._error_handler)
+
+    def _error_handler(self, exc):
+        # process only UnicodeDecodeError
+        if not isinstance(exc, UnicodeDecodeError):
+            raise exc
+        # find starting position of line with error and log warning
+        _line_start = exc.object.rfind('\n', 0, exc.start) + 1
+        _col = self.columns[exc.object.count('\t', _line_start, exc.start)]
+        _msg = "replacing invalid %s sequence %r in column %s"%\
+               (self.encoding, exc.object[exc.start:exc.end], _col)
+        self.log.warning(_msg)
+        # increase error count
+        self.error_count += 1
+        # return replacement char and position to continue from
+        # NB! doesn't replace multiple symbols, so it's harder to break file
+        # structure like replace \t or \n
+        return self.replacement, exc.start + 1
+
+    def validate(self, data, columns):
+        self.columns = columns
+        self.error_count = 0
+        _unicode = data.decode(self.encoding, "error_handler")
+        # when no erros then return input data as is, else re-encode fixed data
+        if self.error_count == 0:
+            return data
+        else:
+            return _unicode.encode(self.encoding)
+
+    def validate_dict(self, data):
+        _cols, _vals = zip(*data.items())
+        _fixed = self.validate('\t'.join(_vals), _cols)
+        if self.error_count == 0:
+            return data
+        else:
+            return dict(zip(_cols, _fixed.split('\t')))
+
+
+
+#------------------------------------------------------------------------------
+# DISPATCHER
+#------------------------------------------------------------------------------
 
 
 class Dispatcher(BaseHandler):
@@ -596,10 +649,15 @@ class Dispatcher(BaseHandler):
         self.conf = self.get_config()
         hdlr_cls = ROW_HANDLERS[self.conf.row_mode]
         self.row_handler = hdlr_cls(self.log)
+        if self.conf.encoding:
+            self.encoding_validator = EncodingValidator(self.log,
+                                                        self.conf.encoding)
+        else:
+            self.encoding_validator = None
 
     def get_config(self):
         """Processes args dict"""
-        conf = AttrDict()
+        conf = skytools.dbdict()
         # set table mode
         conf.table_mode = self.get_arg('table_mode', TABLE_MODES)
         if conf.table_mode == 'part':
@@ -641,6 +699,8 @@ class Dispatcher(BaseHandler):
                     conf.field_map[tmp[0]] = tmp[0]
                 else:
                     conf.field_map[tmp[0]] = tmp[1]
+        # encoding validator
+        conf.encoding = self.args.get('encoding')
         return conf
 
     def get_arg(self, name, value_list, default = None):
@@ -718,6 +778,8 @@ class Dispatcher(BaseHandler):
         if dst not in self.row_handler.table_map:
             self.row_handler.add_table(dst, LOADERS[self.conf.load_mode],
                                     self.pkeys, self.conf)
+        if self.encoding_validator:
+            data = self.encoding_validator.validate_dict(data)
         self.row_handler.process(dst, op, data)
         #BaseHandler.process_event(self, ev, sql_queue_func, arg)
 
@@ -800,11 +862,47 @@ class Dispatcher(BaseHandler):
         exec_with_vals(self.conf.post_part)
         self.log.info("Created table: %s" % dst)
 
+    def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+        """do actual table copy and return tuple with number of bytes and rows
+        copyed
+        """
+        _src_cols = _dst_cols = column_list
+        _write_hook = None
+        condition = ' and '.join(cond_list)
+
+        if self.conf.skip_fields:
+            _src_cols = [col for col in column_list
+                         if col not in self.conf.skip_fields]
+            _dst_cols = _src_cols
 
+        if self.conf.field_map:
+            _src_cols = [col for col in _src_cols if col in self.conf.field_map]
+            _dst_cols = [self.conf.field_map[col] for col in _src_cols]
+
+        if self.encoding_validator:
+            def _write_hook(obj, data):
+                return self.encoding_validator.validate(data, _src_cols)
+
+        return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols,
+                                  condition, self.table_name, _dst_cols,
+                                  write_hook = _write_hook)
+
+
+
+#------------------------------------------------------------------------------
 # register handler class
+#------------------------------------------------------------------------------
+
+
 __londiste_handlers__ = [Dispatcher]
 
+
+
+#------------------------------------------------------------------------------
 # helper function for creating dispachers with different default values
+#------------------------------------------------------------------------------
+
+
 def handler(name):
     def wrapper(func):
         def _init_override(self, table_name, args, log):
@@ -818,12 +916,20 @@ def handler(name):
         return func
     return wrapper
 
-def dupd(*p):
+
+def update(*p):
     """ Update dicts given in params with its precessor param dict
     in reverse order """
     return reduce(lambda x, y: x.update(y) or x,
             (p[i] for i in range(len(p)-1,-1,-1)), {})
 
+
+
+#------------------------------------------------------------------------------
+# build set of handlers with different default values for easier use
+#------------------------------------------------------------------------------
+
+
 LOAD = { '': { 'load_mode': 'direct' },
          'bulk': { 'load_mode': 'bulk' }
 }
@@ -841,17 +947,19 @@ BASE = { 'table_mode': 'part',
          'row_mode': 'keep_latest',
 }
 
-# build set of handlers with different default values for easier use
 for load, load_dict in LOAD.items():
     for period, period_dict in PERIOD.items():
         for mode, mode_dict in MODE.items():
             # define creator func to keep default dicts in separate context
             def create_handler():
                 handler_name = '_'.join(p for p in (load, period, mode) if p)
-                default = dupd(mode_dict, period_dict, load_dict, BASE)
+                default = update(mode_dict, period_dict, load_dict, BASE)
                 @handler(handler_name)
                 def handler_func(args):
-                    return dupd(args, default)
+                    return update(args, default)
             create_handler()
 
-# TODO: bulk & ignore handlers
+
+@handler('bulk_direct')
+def bulk_direct_handler(args):
+    return update(args, {'load_mode': 'bulk', 'table_mode': 'direct'})
index 5e10c889f9f9e640d719044faf63c1bcec16af8a..447047e97b45b501842f8dd54a0cec6366baa064 100644 (file)
@@ -205,9 +205,7 @@ class CopyTable(Replicator):
         cond = tbl_stat.table_attrs.get('copy_condition')
         if cond:
             cond_list.append(cond)
-        p.prepare_copy(cond_list, dstcurs)
-        w_cond = ' and '.join(cond_list)
-        stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list, w_cond)
+        stats = p.real_copy(tablename, srccurs, dstcurs, col_list, cond_list)
         if stats:
             self.log.info("%s: copy finished: %d bytes, %d rows" % (
                           tablename, stats[0], stats[1]))
index b8a179be9fa830cdea98fa685a3c639301ca4be2..970432316ad7e8cbbc18a95f3c91d9e285362b5e 100644 (file)
@@ -331,18 +331,28 @@ def magic_insert(curs, tablename, data, fields = None, use_insert = 0, quoted_ta
 class CopyPipe(object):
     "Splits one big COPY to chunks."
 
-    def __init__(self, dstcurs, tablename = None, limit = 512*1024, cancel_func=None, sql_from = None):
+    def __init__(self, dstcurs, tablename = None, limit = 512*1024,
+                 sql_from = None):
         self.tablename = tablename
         self.sql_from = sql_from
         self.dstcurs = dstcurs
         self.buf = StringIO()
         self.limit = limit
-        self.cancel_func = None
+        #hook for new data, hook func should return new data
+        #def write_hook(obj, data):
+        #   return data
+        self.write_hook = None
+        #hook for flush, hook func result is discarded
+        # def flush_hook(obj):
+        #   return None
+        self.flush_hook = None
         self.total_rows = 0
         self.total_bytes = 0
 
     def write(self, data):
         "New data from psycopg"
+        if self.write_hook:
+            data = self.write_hook(self, data)
 
         self.total_bytes += len(data)
         self.total_rows += data.count("\n")
@@ -363,8 +373,8 @@ class CopyPipe(object):
     def flush(self):
         "Send data out."
 
-        if self.cancel_func:
-            self.cancel_func()
+        if self.flush_hook:
+            self.flush_hook(self)
 
         if self.buf.tell() <= 0:
             return
@@ -377,8 +387,10 @@ class CopyPipe(object):
         self.buf.seek(0)
         self.buf.truncate()
 
+
 def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None,
-        dst_tablename = None, dst_column_list = None):
+        dst_tablename = None, dst_column_list = None,
+        write_hook = None, flush_hook = None):
     """COPY table from one db to another."""
 
     # default dst table and dst columns to source ones
@@ -413,12 +425,16 @@ def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None,
         sql_to = "COPY %s TO stdout" % src
         sql_from = "COPY %s FROM stdin" % dst
         buf = CopyPipe(dst_curs, sql_from = sql_from)
+        buf.write_hook = write_hook
+        buf.flush_hook = flush_hook
         src_curs.copy_expert(sql_to, buf)
     else:
         if condition:
             # regular psycopg copy_to generates invalid sql for subselect copy
             raise Exception('copy_expert() is needed for conditional copy')
         buf = CopyPipe(dst_curs, dst)
+        buf.write_hook = write_hook
+        buf.flush_hook = flush_hook
         src_curs.copy_to(buf, src)
     buf.flush()
 
@@ -601,7 +617,7 @@ def mk_delete_sql(row, tbl, pkey_list, field_map = None):
         col = skytools.quote_ident(new_k)
         val = skytools.quote_literal(row[k])
         whe_list.append("%s = %s" % (col, val))
-    whe_str = " and ".join(whe_list) 
+    whe_str = " and ".join(whe_list)
     return "delete from only %s where %s;" % (skytools.quote_fqident(tbl), whe_str)
 
 if __name__ == '__main__':
index a0eb185caa3864a02334d69550fa91ebeab9b1fc..1ac0b9d40ea91b83ed6e8399db0e1309fdc8ca83 100755 (executable)
@@ -2,14 +2,13 @@
 
 . ../env.sh
 
-lst="hsrc hdst"
-
-for db in $lst; do
+for db in hsrc hdst; do
   echo dropdb $db
   dropdb $db
 done
-for db in $lst; do
-  echo createdb $db
-  createdb $db
-done
 
+echo createdb hsrc
+createdb hsrc --encoding=sql_ascii --template=template0
+
+echo createdb hdst
+createdb hdst --encoding=utf-8 --template=template0
index a5ccb7059da96081684d5d16d1e88585a4e183d1..182c27c30eb5184a1da245f052c1b644aaddd42d 100755 (executable)
@@ -72,16 +72,19 @@ done
 
 msg "Create table on root node and fill couple of rows"
 run_sql hsrc "create table mytable (id int4 primary key, data text, tstamp timestamptz default now())"
-for n in 1 2 3 4; do
+for n in 1 2 3; do
   run_sql hsrc "insert into mytable values ($n, 'row$n')"
 done
 
+msg "Insert row with encoding error"
+run_sql hsrc "insert into mytable values(4, E'row\xab4')"
+
 msg "Register table on root node"
-run londiste3 $v conf/londiste_hsrc.ini add-table mytable --handler="bulk(method=$meth)"
+run londiste3 $v conf/londiste_hsrc.ini add-table mytable
 
 msg "Register table on other node with creation"
 for db in hdst; do
-  run londiste3 $v conf/londiste_$db.ini add-table mytable --create-only=pkey --handler="bulk(method=$meth)"
+  run londiste3 $v conf/londiste_$db.ini add-table mytable --create --handler=bulk_direct --handler-arg="method=$meth" --handler-arg="encoding=utf8"
 done
 
 msg "Wait until table is in sync"
@@ -106,6 +109,9 @@ run_sql hsrc "delete from mytable where id = 7"
 run_sql hsrc "delete from mytable where id = 1"
 run_sql hsrc "update mytable set data = 'row2x' where id = 2"
 
+# row with error
+run_sql hsrc "insert into mytable values(8, E'row8\xaf')"
+
 run sleep 5
 
 msg "Check status"
index d67bc43c030af62ec07f09aa84cc372292d21590..93c47320a6d5ab06afd36fe459b8cfa8fb4b02e8 100755 (executable)
@@ -123,13 +123,18 @@ for n in 1 2 3 4; do
   run_sql part$n "insert into mydata values ($n, 'part$n')"
 done
 
+msg "Sleep a bit"
+run sleep 10
+
 msg "Create table and register it in full nodes"
 for db in $full_list; do
     job=l3_part1_q_${db}
-    run londiste3 $v conf/$job.ini add-table mydata --create
-    for src in $part_list; do
-        run londiste3 $v conf/l3_${src}_q_${db}.ini add-table mydata
-    done
+    run_sql $db "select * from londiste.table_info order by queue_name"
+    run londiste3 $v conf/$job.ini add-table mydata --create --merge-all
+    run_sql $db "select * from londiste.table_info order by queue_name"
+    #for src in $part_list; do
+    #    run londiste3 $v conf/l3_${src}_q_${db}.ini add-table mydata
+    #done
 done
 
 msg "Sleep a bit"