londiste: support table renaming
authorMarko Kreen <markokr@gmail.com>
Thu, 27 Oct 2011 13:28:06 +0000 (16:28 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 27 Oct 2011 19:47:45 +0000 (22:47 +0300)
- add .dest_table field to londiste.table_info
- use it for mapping merged tables on leaf
- make handlers use it for sql generaion
- fkey-related functions keep using real table name

26 files changed:
python/londiste.py
python/londiste/compare.py
python/londiste/handler.py
python/londiste/handlers/__init__.py
python/londiste/handlers/bulk.py
python/londiste/handlers/dispatch.py
python/londiste/handlers/multimaster.py
python/londiste/handlers/part.py
python/londiste/handlers/qtable.py
python/londiste/playback.py
python/londiste/repair.py
python/londiste/setup.py
python/londiste/syncer.py
python/londiste/table_copy.py
sql/londiste/expected/londiste_leaf.out
sql/londiste/expected/londiste_merge.out
sql/londiste/expected/londiste_provider.out
sql/londiste/expected/londiste_subscriber.out
sql/londiste/functions/londiste.drop_table_triggers.sql
sql/londiste/functions/londiste.get_table_list.sql
sql/londiste/functions/londiste.handle_fkeys.sql
sql/londiste/functions/londiste.local_add_table.sql
sql/londiste/functions/londiste.local_remove_table.sql
sql/londiste/functions/londiste.local_show_missing.sql
sql/londiste/functions/londiste.upgrade_schema.sql
sql/londiste/structure/tables.sql

index 542c9c5d5e86a5034560f7080c78e873290a8342..66b196480cbfd2066f196cb26fb5d82d93d4d083 100755 (executable)
@@ -108,6 +108,8 @@ class Londiste(skytools.DBScript):
         g = optparse.OptionGroup(p, "options for add")
         g.add_option("--all", action="store_true",
                 help = "add: include add possible tables")
+        g.add_option("--dest-table",
+                help = "add: redirect changes to different table")
         g.add_option("--force", action="store_true",
                 help = "add: ignore table differences, repair: ignore lag")
         g.add_option("--expect-sync", action="store_true", dest="expect_sync",
index a52d24bed975be1aada74179e32b3b6b285a5880..7274d18080c8e427958c3fc0e7415160bb9acd11 100644 (file)
@@ -15,35 +15,38 @@ class Comparator(Syncer):
     """Simple checker based in Syncer.
     When tables are in sync runs simple SQL query on them.
     """
-    def process_sync(self, tbl, src_db, dst_db):
+    def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
         """Actual comparision."""
 
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
 
-        self.log.info('Counting %s' % tbl)
+        self.log.info('Counting %s' % dst_tbl)
 
         q = "select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t"
         q = self.cf.get('compare_sql', q)
-        q = q.replace('_TABLE_', skytools.quote_fqident(tbl))
+        src_q = q.replace('_TABLE_', skytools.quote_fqident(src_tbl))
+        dst_q = q.replace('_TABLE_', skytools.quote_fqident(dst_tbl))
 
         f = "%(cnt)d rows, checksum=%(chksum)s"
         f = self.cf.get('compare_fmt', f)
 
-        self.log.debug("srcdb: " + q)
-        src_curs.execute(q)
+        self.log.debug("srcdb: " + src_q)
+        src_curs.execute(src_q)
         src_row = src_curs.fetchone()
         src_str = f % src_row
         self.log.info("srcdb: %s" % src_str)
+        src_db.commit()
 
-        self.log.debug("dstdb: " + q)
-        dst_curs.execute(q)
+        self.log.debug("dstdb: " + dst_q)
+        dst_curs.execute(dst_q)
         dst_row = dst_curs.fetchone()
         dst_str = f % dst_row
         self.log.info("dstdb: %s" % dst_str)
+        dst_db.commit()
 
         if src_str != dst_str:
-            self.log.warning("%s: Results do not match!" % tbl)
+            self.log.warning("%s: Results do not match!" % dst_tbl)
 
 if __name__ == '__main__':
     script = Comparator(sys.argv[1:])
index aff0e4cedbf140f10bc584f05f4b89a2ef084bab..2fe69f213de9321ac20974bcfffee69e35c8d0a9 100644 (file)
@@ -29,7 +29,10 @@ plain londiste:
 
 """
 
-import sys, skytools, londiste.handlers
+import sys
+import logging
+import skytools
+import londiste.handlers
 
 __all__ = ['RowCache', 'BaseHandler', 'build_handler',
            'load_handler_modules', 'create_handler_string']
@@ -66,10 +69,14 @@ class BaseHandler:
     """Defines base API, does nothing.
     """
     handler_name = 'nop'
-    def __init__(self, table_name, args, log):
+    log = logging.getLogger('basehandler')
+
+    def __init__(self, table_name, args, dest_table):
         self.table_name = table_name
+        self.dest_table = dest_table or table_name
+        self.fq_table_name = skytools.quote_fqident(self.table_name)
+        self.fq_dest_table = skytools.quote_fqident(self.dest_table)
         self.args = args
-        self.log = log
 
     def add(self, trigger_arg_list):
         """Called when table is added.
@@ -99,13 +106,14 @@ class BaseHandler:
         """Called when batch finishes."""
         pass
 
-    def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+    def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
         """do actual table copy and return tuple with number of bytes and rows
         copyed
         """
         condition = ' and '.join(cond_list)
-        return skytools.full_copy(tablename, src_curs, dst_curs, column_list,
-                                  condition)
+        return skytools.full_copy(src_tablename, src_curs, dst_curs,
+                                  column_list, condition,
+                                  dst_tablename = self.dest_table)
 
     def needs_table(self):
         """Does the handler need the table to exist on destination."""
@@ -124,7 +132,7 @@ class TableHandler(BaseHandler):
     def process_event(self, ev, sql_queue_func, arg):
         if len(ev.type) == 1:
             # sql event
-            fqname = skytools.quote_fqident(ev.extra1)
+            fqname = self.fq_dest_table
             fmt = self.sql_command[ev.type]
             sql = fmt % (fqname, ev.data)
         else:
@@ -132,7 +140,7 @@ class TableHandler(BaseHandler):
             pklist = ev.type[2:].split(',')
             row = skytools.db_urldecode(ev.data)
             op = ev.type[0]
-            tbl = ev.extra1
+            tbl = self.dest_table
             if op == 'I':
                 sql = skytools.mk_insert_sql(row, tbl, pklist)
             elif op == 'U':
@@ -188,7 +196,7 @@ def _parse_handler(hstr):
             args = skytools.db_urldecode(astr)
     return (name, args)
 
-def build_handler(tblname, hstr, log):
+def build_handler(tblname, hstr, dest_table=None):
     """Parse and initialize handler.
 
     hstr is result of create_handler_string()."""
@@ -196,7 +204,9 @@ def build_handler(tblname, hstr, log):
     # when no handler specified, use londiste
     hname = hname or 'londiste'
     klass = _handler_map[hname]
-    return klass(tblname, args, log)
+    if not dest_table:
+        dest_table = tblname
+    return klass(tblname, args, dest_table)
 
 def load_handler_modules(cf):
     """Load and register modules from config."""
index 764518efa476268adbe068e55112628e81c67d62..8467916ab7f82ccbdfd98e2037b02b58b4316423 100644 (file)
@@ -19,8 +19,8 @@ def handler_args(name, cls):
     Define successor for handler class cls with func as argument generator
     """
     def wrapper(func):
-        def _init_override(self, table_name, args, log):
-            cls.__init__(self, table_name, func(args.copy()), log)
+        def _init_override(self, table_name, args, dest_table):
+            cls.__init__(self, table_name, func(args.copy()), dest_table)
         dct = {'__init__': _init_override, 'handler_name': name}
         module = sys.modules[cls.__module__]
         newname = '%s_%s' % (cls.__name__, name.replace('.','_'))
@@ -36,3 +36,4 @@ def update(*p):
     in reverse order """
     return reduce(lambda x, y: x.update(y) or x,
             (p[i] for i in range(len(p)-1,-1,-1)), {})
+
index 5b5dcc01117d4b3befb60e2df29aa9705fadd6ef..7bc3797be710dee931b79ca18e662aaac2dda1d8 100644 (file)
@@ -57,10 +57,10 @@ class BulkLoader(BaseHandler):
     """
     handler_name = 'bulk'
     fake_seq = 0
-    def __init__(self, table_name, args, log):
+    def __init__(self, table_name, args, dest_table):
         """Init per-batch table data cache."""
 
-        BaseHandler.__init__(self, table_name, args, log)
+        BaseHandler.__init__(self, table_name, args, dest_table)
 
         self.pkey_list = None
         self.dist_fields = None
@@ -194,8 +194,8 @@ class BulkLoader(BaseHandler):
 
         # create temp table
         temp, qtemp = self.create_temp_table(curs)
-        tbl = self.table_name
-        qtbl = quote_fqident(self.table_name)
+        tbl = self.dest_table
+        qtbl = self.fq_dest_table
 
         # where expr must have pkey and dist fields
         klist = []
@@ -307,10 +307,10 @@ class BulkLoader(BaseHandler):
 
     def create_temp_table(self, curs):
         if USE_REAL_TABLE:
-            tempname = self.table_name + "_loadertmpx"
+            tempname = self.dest_table + "_loadertmpx"
         else:
             # create temp table for loading
-            tempname = self.table_name.replace('.', '_') + "_loadertmp"
+            tempname = self.dest_table.replace('.', '_') + "_loadertmp"
 
         # check if exists
         if USE_REAL_TABLE:
@@ -321,7 +321,7 @@ class BulkLoader(BaseHandler):
             # create non-temp table
             q = "create table %s (like %s)" % (
                         quote_fqident(tempname),
-                        quote_fqident(self.table_name))
+                        quote_fqident(self.dest_table))
             self.log.debug("bulk: Creating real table: %s" % q)
             curs.execute(q)
             return tempname, quote_fqident(tempname)
@@ -335,7 +335,7 @@ class BulkLoader(BaseHandler):
         arg = "on commit preserve rows"
         # create temp table for loading
         q = "create temp table %s (like %s) %s" % (
-                quote_ident(tempname), quote_fqident(self.table_name), arg)
+                quote_ident(tempname), quote_fqident(self.dest_table), arg)
         self.log.debug("bulk: Creating temp table: %s" % q)
         curs.execute(q)
         return tempname, quote_ident(tempname)
@@ -343,7 +343,7 @@ class BulkLoader(BaseHandler):
     def find_dist_fields(self, curs):
         if not skytools.exists_table(curs, "pg_catalog.gp_distribution_policy"):
             return []
-        schema, name = skytools.fq_name_parts(self.table_name)
+        schema, name = skytools.fq_name_parts(self.dest_table)
         q = "select a.attname"\
             "  from pg_class t, pg_namespace n, pg_attribute a,"\
             "       gp_distribution_policy p"\
index 0c67061c8f81755f30563b3d5ee14e82fab23b4f..b9c3e3fb7bd0d6c8e8627b16e93ea222eb488c21 100644 (file)
@@ -663,14 +663,16 @@ class Dispatcher(BaseHandler):
     """
     handler_name = 'dispatch'
 
-    def __init__(self, table_name, args, log):
-        BaseHandler.__init__(self, table_name, args, log)
+    def __init__(self, table_name, args, dest_table):
+
+        # compat for dest-table
+        dest_table = args.get('table', dest_table)
+
+        BaseHandler.__init__(self, table_name, args, dest_table)
+
         # show args
         self.log.debug("dispatch.init: table_name=%r, args=%r" % \
                        (table_name, args))
-        # get table name
-        self.table_name = args.get('table', self.table_name)
-        self.quoted_name = quote_fqident(self.table_name)
         self.batch_info = None
         self.dst_curs = None
         self.pkeys = None
@@ -805,7 +807,7 @@ class Dispatcher(BaseHandler):
             if dst not in self.row_handler.table_map:
                 self.check_part(dst, part_time)
         else:
-            dst = self.table_name
+            dst = self.dest_table
 
         if dst not in self.row_handler.table_map:
             self.row_handler.add_table(dst, LOADERS[self.conf.load_mode],
@@ -841,7 +843,7 @@ class Dispatcher(BaseHandler):
         else:
             raise UsageError('Bad value for part_mode: %s' %\
                     self.conf.part_mode)
-        vals = {'parent': self.table_name,
+        vals = {'parent': self.dest_table,
                 'year': "%04d" % dtm.year,
                 'month': "%02d" % dtm.month,
                 'day': "%02d" % dtm.day,
@@ -861,7 +863,7 @@ class Dispatcher(BaseHandler):
         dst = quote_fqident(dst)
         vals = {'dest': dst,
                 'part': dst,
-                'parent': self.quoted_name,
+                'parent': self.fq_dest_table,
                 'pkeys': ",".join(self.pkeys), # quoting?
                 # we do this to make sure that constraints for
                 # tables who contain a schema will still work
@@ -887,7 +889,7 @@ class Dispatcher(BaseHandler):
             else:
                 self.log.debug('part func %s not found, cloning table' %\
                         PART_FUNC)
-                struct = TableStruct(curs, self.table_name)
+                struct = TableStruct(curs, self.dest_table)
                 struct.create(curs, T_ALL, dst)
         exec_with_vals(self.conf.post_part)
         self.log.info("Created table: %s" % dst)
@@ -914,8 +916,9 @@ class Dispatcher(BaseHandler):
         else:
             _write_hook = None
 
-        return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols,
-                                  condition, self.table_name, _dst_cols,
+        return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols, condition,
+                                  dst_tablename = self.dest_table,
+                                  dst_column_list = _dst_cols,
                                   write_hook = _write_hook)
 
 
index 494bd86685eaaf61f62aaf61fd9f8e70a8de5873..872b77e167900819802ecc3a463442f9f39005a0 100644 (file)
@@ -19,7 +19,7 @@ class MultimasterHandler(ApplyFuncHandler):
     """Handle multimaster replicas"""
     handler_name = 'multimaster'
 
-    def __init__(self, table_name, args, log):
+    def __init__(self, table_name, args, dest_table):
         """Init per-batch table data cache."""
         conf = args.copy()
         # remove Multimaster args from conf
@@ -28,7 +28,7 @@ class MultimasterHandler(ApplyFuncHandler):
                 conf.pop(name)
         conf = skytools.db_urlencode(conf)
         args = update(args, {'func_name': 'merge_on_time', 'func_conf': conf})  
-        ApplyFuncHandler.__init__(self, table_name, args, log)
+        ApplyFuncHandler.__init__(self, table_name, args, dest_table)
     
     def add(self, trigger_arg_list):
         """Create SKIP and BEFORE INSERT trigger"""
index fa5ccb7787af67ed83d1b825012192cc3414787a..6e6440270df25a470406178f326919cc11085eb0 100644 (file)
@@ -10,8 +10,8 @@ __all__ = ['PartHandler']
 class PartHandler(TableHandler):
     handler_name = 'part'
 
-    def __init__(self, table_name, args, log):
-        TableHandler.__init__(self, table_name, args, log)
+    def __init__(self, table_name, args, dest_table):
+        TableHandler.__init__(self, table_name, args, dest_table)
         self.max_part = None       # max part number
         self.local_part = None     # part number of local node
         self.key = args.get('key')        
index b904e0e1c460584c7882317fad5cec4bfe836fae..cd8cb03d486772924d11bcbb4c14fe42b0d2a225 100644 (file)
@@ -39,9 +39,9 @@ class QueueTableHandler(BaseHandler):
 class QueueSplitterHandler(BaseHandler):
     handler_name = 'qsplitter'
 
-    def __init__(self, table_name, args, log):
+    def __init__(self, table_name, args, dest_table):
         """Init per-batch table data cache."""
-        BaseHandler.__init__(self, table_name, args, log)
+        BaseHandler.__init__(self, table_name, args, dest_table)
         try:
             self.dst_queue_name = args['queue']
         except KeyError:
index b7a3ac9595d9d1cc8c9f41c199ac4e08cf75df4d..e9c20ea8c13ca68639c94c081337686c3d8ccae4 100644 (file)
@@ -62,6 +62,7 @@ class TableState(object):
     def __init__(self, name, log):
         """Init TableState for one table."""
         self.name = name
+        self.dest_table = name
         self.log = log
         # same as forget:
         self.state = TABLE_MISSING
@@ -188,9 +189,14 @@ class TableState(object):
         self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy',
                                                         self.max_parallel_copy))
 
+        if 'dest_table' in row and row['dest_table']:
+            self.dest_table = row['dest_table']
+        else:
+            self.dest_table = self.name
+
         hstr = self.table_attrs.get('handlers', '') # compat
         hstr = self.table_attrs.get('handler', hstr)
-        self.plugin = build_handler(self.name, hstr, self.log)
+        self.plugin = build_handler(self.name, hstr, self.dest_table)
 
     def max_parallel_copies_reached(self):
         return self.max_parallel_copy and\
@@ -480,7 +486,7 @@ class Replicator(CascadedWorker):
                 npossible -= 1
 
                 # drop all foreign keys to and from this table
-                self.drop_fkeys(dst_db, t.name)
+                self.drop_fkeys(dst_db, t.dest_table)
 
                 # change state after fkeys are dropped thus allowing
                 # failure inbetween
@@ -627,7 +633,7 @@ class Replicator(CascadedWorker):
             self.stat_increase('ignored_events')
             return
 
-        fqname = skytools.quote_fqident(ev.extra1)
+        fqname = skytools.quote_fqident(t.dest_table)
         if dst_curs.connection.server_version >= 80400:
             sql = "TRUNCATE ONLY %s;" % fqname
         else:
index dd0081ab17e4ab381752223d43327bdb9fc75c89..984920b812a8dbf0afa797b3aec6fa538d3e1cbc 100644 (file)
@@ -54,29 +54,30 @@ class Repairer(Syncer):
     pkey_list = []
     common_fields = []
 
-    def process_sync(self, tbl, src_db, dst_db):
+    def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
         """Actual comparision."""
 
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
 
-        self.log.info('Checking %s' % tbl)
+        self.log.info('Checking %s' % dst_tbl)
 
         self.common_fields = []
+        self.fq_common_fields = []
         self.pkey_list = []
-        copy_tbl = self.gen_copy_tbl(tbl, src_curs, dst_curs)
+        self.load_common_columns(src_tbl, dst_tbl, src_curs, dst_curs)
 
-        dump_src = tbl + ".src"
-        dump_dst = tbl + ".dst"
+        dump_src = dst_tbl + ".src"
+        dump_dst = dst_tbl + ".dst"
 
-        self.log.info("Dumping src table: %s" % tbl)
-        self.dump_table(tbl, copy_tbl, src_curs, dump_src)
+        self.log.info("Dumping src table: %s" % src_tbl)
+        self.dump_table(src_tbl, src_curs, dump_src)
         src_db.commit()
-        self.log.info("Dumping dst table: %s" % tbl)
-        self.dump_table(tbl, copy_tbl, dst_curs, dump_dst)
+        self.log.info("Dumping dst table: %s" % dst_tbl)
+        self.dump_table(dst_tbl, dst_curs, dump_dst)
         dst_db.commit()
         
-        self.log.info("Sorting src table: %s" % tbl)
+        self.log.info("Sorting src table: %s" % dump_src)
 
         s_in, s_out = os.popen4("sort --version")
         s_ver = s_out.read()
@@ -86,26 +87,27 @@ class Repairer(Syncer):
         else:
             args = ""
         os.system("sort %s -T . -o %s.sorted %s" % (args, dump_src, dump_src))
-        self.log.info("Sorting dst table: %s" % tbl)
+        self.log.info("Sorting dst table: %s" % dump_dst)
         os.system("sort %s -T . -o %s.sorted %s" % (args, dump_dst, dump_dst))
 
-        self.dump_compare(tbl, dump_src + ".sorted", dump_dst + ".sorted")
+        self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted")
 
         os.unlink(dump_src)
         os.unlink(dump_dst)
         os.unlink(dump_src + ".sorted")
         os.unlink(dump_dst + ".sorted")
 
-    def gen_copy_tbl(self, tbl, src_curs, dst_curs):
-        """Create COPY expession from common fields."""
-        self.pkey_list = get_pkey_list(src_curs, tbl)
-        dst_pkey = get_pkey_list(dst_curs, tbl)
+    def load_common_columns(self, src_tbl, dst_tbl, src_curs, dst_curs):
+        """Get common fields, put pkeys in start."""
+
+        self.pkey_list = get_pkey_list(src_curs, src_tbl)
+        dst_pkey = get_pkey_list(dst_curs, dst_tbl)
         if dst_pkey != self.pkey_list:
             self.log.error('pkeys do not match')
             sys.exit(1)
 
-        src_cols = get_column_list(src_curs, tbl)
-        dst_cols = get_column_list(dst_curs, tbl)
+        src_cols = get_column_list(src_curs, src_tbl)
+        dst_cols = get_column_list(dst_curs, dst_tbl)
         field_list = []
         for f in self.pkey_list:
             field_list.append(f)
@@ -118,17 +120,18 @@ class Repairer(Syncer):
         self.common_fields = field_list
 
         fqlist = [skytools.quote_ident(col) for col in field_list]
+        self.fq_common_fields = fqlist
 
-        tbl_expr = "%s (%s)" % (skytools.quote_fqident(tbl), ",".join(fqlist))
-
-        self.log.debug("using copy expr: %s" % tbl_expr)
+        cols = ",".join(fqlist)
+        self.log.debug("using columns: %s" % cols)
 
-        return tbl_expr
-
-    def dump_table(self, tbl, copy_tbl, curs, fn):
+    def dump_table(self, tbl, curs, fn):
         """Dump table to disk."""
+        cols = ','.join(self.fq_common_fields)
+        q = "copy %s (%s) to stdout" % (skytools.quote_fqident(tbl), cols)
+
         f = open(fn, "w", 64*1024)
-        curs.copy_to(f, copy_tbl)
+        curs.copy_expert(q, f)
         size = f.tell()
         f.close()
         self.log.info('%s: Got %d bytes' % (tbl, size))
index b59fd56f2536d1a7a01dfeef8ab838ea61f5c170..b9f69dc6f9acf052564fde12104ee59632fa24e0 100644 (file)
@@ -71,6 +71,8 @@ class LondisteSetup(CascadeAdmin):
                     help="don't merge tables from source queues", default=False)
         p.add_option("--max-parallel-copy", type = "int",
                     help="max number of parallel copy processes")
+        p.add_option("--dest-table",
+                    help="add: name for actual table")
 
         return p
 
@@ -122,7 +124,7 @@ class LondisteSetup(CascadeAdmin):
         # dont check for exist/not here (root handling)
         problems = False
         for tbl in args:
-            if (tbl in src_tbls) and not src_tbls[tbl]:
+            if (tbl in src_tbls) and not src_tbls[tbl]['local']:
                 self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl)
                 problems = True
         if problems:
@@ -137,30 +139,42 @@ class LondisteSetup(CascadeAdmin):
         else:
             create_flags = 0
 
+        # sanity check
+        if self.options.dest_table and len(args) > 1:
+            self.log.error("--dest-table can be given only for single table")
+            sys.exit(1)
+
         # seems ok
         for tbl in args:
             tbl = skytools.fq_name(tbl)
-            self.add_table(src_db, dst_db, tbl, create_flags)
+            self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
 
-    def add_table(self, src_db, dst_db, tbl, create_flags):
+    def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls):
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
-        tbl_exists = skytools.exists_table(dst_curs, tbl)
+        src_dest_table = src_tbls[tbl]['dest_table']
+        dest_table = self.options.dest_table or tbl
+        tbl_exists = skytools.exists_table(dst_curs, dest_table)
         if create_flags:
             if tbl_exists:
-                self.log.info('Table %s already exist, not touching' % tbl)
+                self.log.info('Table %s already exist, not touching' % dest_table)
             else:
-                if not skytools.exists_table(src_curs, tbl):
+                if not skytools.exists_table(src_curs, src_dest_table):
                     # table not present on provider - nowhere to get the DDL from
                     self.log.warning('Table "%s" missing on provider, skipping' % tbl)
                     return
-                schema = skytools.fq_name_parts(tbl)[0]
+                schema = skytools.fq_name_parts(dest_table)[0]
                 if not skytools.exists_schema(dst_curs, schema):
                     q = "create schema %s" % skytools.quote_ident(schema)
                     dst_curs.execute(q)
-                s = skytools.TableStruct(src_curs, tbl)
+                s = skytools.TableStruct(src_curs, src_dest_table)
                 src_db.commit()
-                s.create(dst_curs, create_flags, log = self.log)
+
+                # create, using rename logic only when necessary
+                newname = None
+                if src_dest_table != dest_table:
+                    newname = dest_table
+                s.create(dst_curs, create_flags, log = self.log, new_table_name = newname)
 
         tgargs = []
         if self.options.trigger_arg:
@@ -179,7 +193,7 @@ class LondisteSetup(CascadeAdmin):
         if self.options.handler:
             hstr = londiste.handler.create_handler_string(
                             self.options.handler, self.options.handler_arg)
-            p = londiste.handler.build_handler(tbl, hstr, self.log)
+            p = londiste.handler.build_handler(tbl, hstr, self.options.dest_table)
             attrs['handler'] = hstr
             p.add(tgargs)
 
@@ -211,7 +225,7 @@ class LondisteSetup(CascadeAdmin):
         if self.options.handler:
             hstr = londiste.handler.create_handler_string(
                             self.options.handler, self.options.handler_arg)
-            p = londiste.handler.build_handler('unused.string', hstr, self.log)
+            p = londiste.handler.build_handler('unused.string', hstr, None)
             return p.needs_table()
         return True
 
@@ -221,7 +235,7 @@ class LondisteSetup(CascadeAdmin):
             if tbl not in dst_tbls:
                 self.log.info("Table %s info missing from subscriber, adding" % tbl)
                 self.exec_cmd(dst_curs, q, [self.set_name, tbl])
-                dst_tbls[tbl] = False
+                dst_tbls[tbl] = {'local': False, 'dest_table': tbl}
         for tbl in dst_tbls.keys():
             q = "select * from londiste.global_remove_table(%s, %s)"
             if tbl not in src_tbls:
@@ -230,11 +244,13 @@ class LondisteSetup(CascadeAdmin):
                 del dst_tbls[tbl]
 
     def fetch_set_tables(self, curs):
-        q = "select table_name, local from londiste.get_table_list(%s)"
+        q = "select table_name, local, "\
+            " coalesce(dest_table, table_name) as dest_table "\
+            " from londiste.get_table_list(%s)"
         curs.execute(q, [self.set_name])
         res = {}
         for row in curs.fetchall():
-            res[row[0]] = row[1]
+            res[row[0]] = row
         return res
 
     def cmd_remove_table(self, *args):
index 5bce2485b6554bb7c2774168420b9e8ed402cf4a..02955f934d778dbec6659f0a21642d4267a0f982 100644 (file)
@@ -4,6 +4,12 @@
 
 import sys, time, skytools
 
+class ATable:
+    def __init__(self, row):
+        self.table_name = row['table_name']
+        self.dest_table = row['dest_table'] or row['table_name']
+        self.merge_state = row['merge_state']
+
 class Syncer(skytools.DBScript):
     """Walks tables in primary key order and checks if data matches."""
 
@@ -60,14 +66,25 @@ class Syncer(skytools.DBScript):
             self.log.error('Consumer lagging too much, cannot proceed')
             sys.exit(1)
 
-    def get_subscriber_table_state(self, dst_db):
-        """Load table states from subscriber."""
-        dst_curs = dst_db.cursor()
-        q = "select * from londiste.get_table_list(%s) where local"
-        dst_curs.execute(q, [self.queue_name])
-        res = dst_curs.dictfetchall()
-        dst_db.commit()
-        return res
+    def get_tables(self, db):
+        """Load table info.
+
+        Returns tuple of (dict(name->ATable), namelist)"""
+
+        curs = db.cursor()
+        q = "select table_name, merge_state, dest_table"\
+            " from londiste.get_table_list(%s) where local"
+        curs.execute(q, [self.queue_name])
+        rows = curs.fetchall()
+        db.commit()
+
+        res = {}
+        names = []
+        for row in rows:
+            t = ATable(row)
+            res[t.table_name] = t
+            names.append(t.table_name)
+        return res, names
 
     def work(self):
         """Syncer main function."""
@@ -84,29 +101,32 @@ class Syncer(skytools.DBScript):
 
         self.check_consumer(setup_curs)
 
-        state_list = self.get_subscriber_table_state(dst_db)
-        state_map = {}
-        full_list = []
-        for ts in state_list:
-            name = ts['table_name']
-            full_list.append(name)
-            state_map[name] = ts
+        src_tables, ignore = self.get_tables(src_db)
+        dst_tables, names = self.get_tables(dst_db)
 
         if len(self.args) > 2:
             tlist = self.args[2:]
         else:
-            tlist = full_list
+            tlist = names
 
         for tbl in tlist:
             tbl = skytools.fq_name(tbl)
-            if not tbl in state_map:
+            if not tbl in dst_tables:
                 self.log.warning('Table not subscribed: %s' % tbl)
                 continue
-            st = state_map[tbl]
-            if st['merge_state'] != 'ok':
-                self.log.info('Table %s not synced yet, no point' % tbl)
+            if not tbl in src_tables:
+                self.log.warning('Table not available on provider: %s' % tbl)
+                continue
+            t1 = src_tables[tbl]
+            t2 = dst_tables[tbl]
+
+            if t1.merge_state != 'ok':
+                self.log.warning('Table %s not ready yet on provider' % tbl)
                 continue
-            self.check_table(tbl, lock_db, src_db, dst_db, setup_curs)
+            if t2.merge_state != 'ok':
+                self.log.warning('Table %s not synced yet, no point' % tbl)
+                continue
+            self.check_table(t1.dest_table, t2.dest_table, lock_db, src_db, dst_db, setup_curs)
             lock_db.commit()
             src_db.commit()
             dst_db.commit()
@@ -131,7 +151,7 @@ class Syncer(skytools.DBScript):
             if dur > 10 and not self.options.force:
                 raise Exception("Ticker seems dead")
 
-    def check_table(self, tbl, lock_db, src_db, dst_db, setup_curs):
+    def check_table(self, src_tbl, dst_tbl, lock_db, src_db, dst_db, setup_curs):
         """Get transaction to same state, then process."""
 
 
@@ -139,22 +159,22 @@ class Syncer(skytools.DBScript):
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
 
-        if not skytools.exists_table(src_curs, tbl):
-            self.log.warning("Table %s does not exist on provider side" % tbl)
+        if not skytools.exists_table(src_curs, src_tbl):
+            self.log.warning("Table %s does not exist on provider side" % src_tbl)
             return
-        if not skytools.exists_table(dst_curs, tbl):
-            self.log.warning("Table %s does not exist on subscriber side" % tbl)
+        if not skytools.exists_table(dst_curs, dst_tbl):
+            self.log.warning("Table %s does not exist on subscriber side" % dst_tbl)
             return
 
         # lock table in separate connection
-        self.log.info('Locking %s' % tbl)
+        self.log.info('Locking %s' % src_tbl)
         lock_db.commit()
         self.set_lock_timeout(lock_curs)
         lock_time = time.time()
-        lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(tbl))
+        lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(src_tbl))
 
         # now wait until consumer has updated target table until locking
-        self.log.info('Syncing %s' % tbl)
+        self.log.info('Syncing %s' % dst_tbl)
 
         # consumer must get futher than this tick
         tick_id = self.force_tick(setup_curs)
@@ -199,13 +219,13 @@ class Syncer(skytools.DBScript):
         lock_db.commit()
 
         # do work
-        self.process_sync(tbl, src_db, dst_db)
+        self.process_sync(src_tbl, dst_tbl, src_db, dst_db)
 
         # done
         src_db.commit()
         dst_db.commit()
 
-    def process_sync(self, tbl, src_db, dst_db):
+    def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
         """It gets 2 connections in state where tbl should be in same state.
         """
         raise Exception('process_sync not implemented')
@@ -215,3 +235,4 @@ class Syncer(skytools.DBScript):
         q = "select * from pgq_node.get_node_info(%s)"
         rows = self.exec_cmd(dst_db, q, [self.queue_name])
         return rows[0]['provider_location']
+
index d7d900c47ba7e9a8c6d739e84107c5b47f118215..5f0f8bc53d342515adedc020644c454ab60ba056 100644 (file)
@@ -80,6 +80,8 @@ class CopyTable(Replicator):
             self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
             time.sleep(10)
 
+        src_real_table = pt.dest_table
+
         # 0 - dont touch
         # 1 - single tx
         # 2 - multi tx
@@ -100,15 +102,15 @@ class CopyTable(Replicator):
 
         # just in case, drop all fkeys (in case "replay" was skipped)
         # !! this may commit, so must be done before anything else !!
-        self.drop_fkeys(dst_db, tbl_stat.name)
+        self.drop_fkeys(dst_db, tbl_stat.dest_table)
 
         # now start ddl-dropping tx
-        q = "lock table " + skytools.quote_fqident(tbl_stat.name)
+        q = "lock table " + skytools.quote_fqident(tbl_stat.dest_table)
         dst_curs.execute(q)
 
         # find dst struct
-        src_struct = TableStruct(src_curs, tbl_stat.name)
-        dst_struct = TableStruct(dst_curs, tbl_stat.name)
+        src_struct = TableStruct(src_curs, src_real_table)
+        dst_struct = TableStruct(dst_curs, tbl_stat.dest_table)
 
         # take common columns, warn on missing ones
         dlist = dst_struct.get_column_list()
@@ -138,7 +140,7 @@ class CopyTable(Replicator):
                 q = "truncate "
                 if dst_db.server_version >= 80400:
                     q += "only "
-                q += skytools.quote_fqident(tbl_stat.name)
+                q += skytools.quote_fqident(tbl_stat.dest_table)
                 dst_curs.execute(q)
 
             if cmode == 2 and tbl_stat.dropped_ddl is None:
@@ -152,7 +154,7 @@ class CopyTable(Replicator):
                 tbl_stat.dropped_ddl = ddl
 
         # do truncate & copy
-        self.real_copy(src_curs, dst_curs, tbl_stat, common_cols)
+        self.real_copy(src_curs, dst_curs, tbl_stat, common_cols, src_real_table)
 
         # get snapshot
         src_curs.execute("select txid_current_snapshot()")
@@ -208,7 +210,7 @@ class CopyTable(Replicator):
         src_curs.execute(q, [self.queue_name])
         src_db.commit()
 
-    def real_copy(self, srccurs, dstcurs, tbl_stat, col_list):
+    def real_copy(self, srccurs, dstcurs, tbl_stat, col_list, src_real_table):
         "Actual copy."
 
         tablename = tbl_stat.name
@@ -219,7 +221,7 @@ class CopyTable(Replicator):
         cond = tbl_stat.table_attrs.get('copy_condition')
         if cond:
             cond_list.append(cond)
-        stats = p.real_copy(tablename, srccurs, dstcurs, col_list, cond_list)
+        stats = p.real_copy(src_real_table, srccurs, dstcurs, col_list, cond_list)
         if stats:
             self.log.info("%s: copy finished: %d bytes, %d rows" % (
                           tablename, stats[0], stats[1]))
index a7a1b93cda01e190e92cd53807497eae30925206..38fc5a8f0095d8f457f8a59f328581111882c469 100644 (file)
@@ -69,10 +69,10 @@ select * from londiste.global_add_table('leafq', 'public.tmp');
 (1 row)
 
 select * from londiste.get_table_list('leafq');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.leafdata | t     |             |                 |             |             |           |        0
- public.tmp      | f     |             |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.leafdata | t     |             |                 |             |             |           |        0 | 
+ public.tmp      | f     |             |                 |             |             |           |        0 | 
 (2 rows)
 
 select tgname, tgargs from pg_trigger
@@ -105,9 +105,9 @@ select * from londiste.local_remove_table('leafq', 'public.leafdata');
 (1 row)
 
 select * from londiste.get_table_list('leafq');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.leafdata | f     |             |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.leafdata | f     |             |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.local_show_missing('leafq');
index 828b57e06f2164b593c798ec9219ddef87619f15..478f6b1f74ccf2fae77053c385d92f2cf2894ef3 100644 (file)
@@ -98,21 +98,21 @@ select * from londiste.local_add_table('part1_set', 'tblmerge', array['merge_all
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     |             |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     |             |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     |             |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     |             |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('combined_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     | ok          |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | ok          |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
@@ -128,15 +128,15 @@ select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', nul
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     | in-copy     |                 |             |             | lead      |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | in-copy     |                 |             |             | lead      |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     | in-copy     |                 |             |             | wait-copy |        1
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | in-copy     |                 |             |             | wait-copy |        1 | 
 (1 row)
 
 select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'create index;');
@@ -146,15 +146,15 @@ select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'c
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
- public.tblmerge | t     | in-copy     |                 |             | create index; | lead      |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------+------------
+ public.tblmerge | t     | in-copy     |                 |             | create index; | lead      |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl |  copy_role  | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-------------+----------
- public.tblmerge | t     | in-copy     |                 |             |             | wait-replay |        1
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl |  copy_role  | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-------------+----------+------------
+ public.tblmerge | t     | in-copy     |                 |             |             | wait-replay |        1 | 
 (1 row)
 
 select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
@@ -164,15 +164,15 @@ select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', nul
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
- public.tblmerge | t     | in-copy     |                 |             | create index; | lead      |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------+------------
+ public.tblmerge | t     | in-copy     |                 |             | create index; | lead      |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl |  copy_role  | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-------------+----------
- public.tblmerge | t     | catching-up |                 |             |             | wait-replay |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl |  copy_role  | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-------------+----------+------------
+ public.tblmerge | t     | catching-up |                 |             |             | wait-replay |        0 | 
 (1 row)
 
 select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
@@ -182,15 +182,15 @@ select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', nul
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
- public.tblmerge | t     | catching-up |                 |             | create index; |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs |  dropped_ddl  | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------+------------
+ public.tblmerge | t     | catching-up |                 |             | create index; |           |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl |  copy_role  | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-------------+----------
- public.tblmerge | t     | catching-up |                 |             |             | wait-replay |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl |  copy_role  | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-------------+----------+------------
+ public.tblmerge | t     | catching-up |                 |             |             | wait-replay |        0 | 
 (1 row)
 
 select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
@@ -200,15 +200,15 @@ select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', nu
 (1 row)
 
 select * from londiste.get_table_list('part1_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     | catching-up |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | catching-up |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t     | catching-up |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | catching-up |                 |             |             |           |        0 | 
 (1 row)
 
 -- test automatic registration on combined-root
@@ -231,17 +231,17 @@ select * from londiste.local_add_table('part1_set', 'tblauto', array['merge_all'
 (1 row)
 
 select * from londiste.get_table_list('part2_set');
-   table_name    | local | merge_state | custom_snapshot |  table_attrs   | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+----------------+-------------+-----------+----------
- public.tblmerge | t     | catching-up |                 |                |             |           |        0
- public.tblauto  | t     | ok          |                 | handler=vtable |             |           |        0
+   table_name    | local | merge_state | custom_snapshot |  table_attrs   | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+----------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | catching-up |                 |                |             |           |        0 | 
+ public.tblauto  | t     | ok          |                 | handler=vtable |             |           |        0 | 
 (2 rows)
 
 select * from londiste.get_table_list('combined_set');
-   table_name    | local | merge_state | custom_snapshot |  table_attrs   | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+----------------+-------------+-----------+----------
- public.tblmerge | t     | ok          |                 |                |             |           |        0
- public.tblauto  | t     | ok          |                 | handler=vtable |             |           |        0
+   table_name    | local | merge_state | custom_snapshot |  table_attrs   | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+----------------+-------------+-----------+----------+------------
+ public.tblmerge | t     | ok          |                 |                |             |           |        0 | 
+ public.tblauto  | t     | ok          |                 | handler=vtable |             |           |        0 | 
 (2 rows)
 
 --
index 6faf1ff716a7d783304b0dde94879ffc152bfb1c..b5c16e8f0e61d68ec2e443e4ebb878c848903c21 100644 (file)
@@ -50,9 +50,9 @@ select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass order
 
 insert into testdata (txt) values ('test-data');
 select * from londiste.get_table_list('aset');
-   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.testdata | t     | ok          |                 |             |             |           |        0
+   table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.testdata | t     | ok          |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.local_show_missing('aset');
@@ -81,8 +81,8 @@ select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass;
 (0 rows)
 
 select * from londiste.get_table_list('aset');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
-------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
 (0 rows)
 
 select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
@@ -131,7 +131,9 @@ select tgname from pg_trigger where tgrelid = 'public.trg_test'::regclass order
 
 delete from londiste.table_info where table_name = 'public.trg_test';
 select tgname from pg_trigger where tgrelid = 'public.trg_test'::regclass order by 1;
- tgname 
---------
-(0 rows)
+         tgname          
+-------------------------
+ _londiste_aset
+ _londiste_aset_truncate
+(2 rows)
 
index 49015a99530f0a660d4d4116f0bc8ef8d4e82823..42aa3e4db629b4077966e532367f606856b96d23 100644 (file)
@@ -61,10 +61,10 @@ select * from londiste.global_add_table('branch_set', 'public.tmp');
 (1 row)
 
 select * from londiste.get_table_list('branch_set');
-    table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
-------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.slavedata | t     |             |                 |             |             |           |        0
- public.tmp       | f     |             |                 |             |             |           |        0
+    table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.slavedata | t     |             |                 |             |             |           |        0 | 
+ public.tmp       | f     |             |                 |             |             |           |        0 | 
 (2 rows)
 
 select * from londiste.global_remove_table('branch_set', 'public.tmp');
@@ -86,9 +86,9 @@ select * from londiste.local_remove_table('branch_set', 'public.slavedata');
 (1 row)
 
 select * from londiste.get_table_list('branch_set');
-    table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos 
-------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.slavedata | f     |             |                 |             |             |           |        0
+    table_name    | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table 
+------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.slavedata | f     |             |                 |             |             |           |        0 | 
 (1 row)
 
 select * from londiste.local_show_missing('branch_set');
index 9e085c7c606c14b908892cb236e216e4e17cd7c8..54681cc8135d5960bed00df8d7cc4894b4c4472a 100644 (file)
@@ -18,12 +18,23 @@ returns void as $$
 declare
     logtrg_name     text;
     b_queue_name    bytea;
+    _dest_table     text;
 begin
+    select coalesce(dest_table, table_name)
+        from londiste.table_info t
+        where t.queue_name = i_queue_name
+          and t.table_name = i_table_name
+        into _dest_table;
+    if not found then
+        return;
+    end if;
+
     -- skip if no triggers found on that table
-    perform 1 from pg_catalog.pg_trigger where tgrelid = londiste.find_table_oid(i_table_name);
+    perform 1 from pg_catalog.pg_trigger where tgrelid = londiste.find_table_oid(_dest_table);
     if not found then
         return;
     end if;
+
     -- cast to bytea
     b_queue_name := replace(i_queue_name, E'\\', E'\\\\')::bytea;
 
@@ -32,13 +43,13 @@ begin
     -- dependency on naming standard or side-storage.
     for logtrg_name in
         select tgname from pg_catalog.pg_trigger
-         where tgrelid = londiste.find_table_oid(i_table_name)
+         where tgrelid = londiste.find_table_oid(_dest_table)
            and londiste.is_replica_func(tgfoid)
            and octet_length(tgargs) > 0
            and substring(tgargs for (position(E'\\000'::bytea in tgargs) - 1)) = b_queue_name
     loop
         execute 'drop trigger ' || quote_ident(logtrg_name)
-                || ' on ' || londiste.quote_fqname(i_table_name);
+                || ' on ' || londiste.quote_fqname(_dest_table);
     end loop;
 end;
 $$ language plpgsql strict;
index 37a02ad858d31687f62b268b2eeef60daa4d7353..d1bbec879001933725f9ef50087c709e4dbf7492 100644 (file)
@@ -10,7 +10,8 @@ create or replace function londiste.get_table_list(
     out table_attrs text,
     out dropped_ddl text,
     out copy_role text,
-    out copy_pos int4)
+    out copy_pos int4,
+    out dest_table text)
 returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.get_table_list(1)
@@ -49,7 +50,7 @@ declare
     n_combined_queue text;
 begin
     for v_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl,
-        q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos
+        q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos, dest_table
     in
         select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl,
                min(case when t2.local then t2.queue_name else null end) as _queue1,
@@ -57,15 +58,18 @@ begin
                count(case when t2.local then t2.table_name else null end) as _total,
                count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
                min(n.combined_queue) as _combined_queue,
-               count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos
+               count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos,
+               t.dest_table as _dest_table
             from londiste.table_info t
             join pgq_node.node_info n on (n.queue_name = t.queue_name)
             left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
                 (n2.combined_queue is null and n.combined_queue is null))
-            left join londiste.table_info t2 on (t2.table_name = t.table_name and
-                t2.queue_name = n2.queue_name and (t2.merge_state is null or t2.merge_state != 'ok'))
+            left join londiste.table_info t2 on
+               (coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name) and
+                t2.queue_name = n2.queue_name and
+                (t2.merge_state is null or t2.merge_state != 'ok'))
             where t.queue_name = i_queue_name
-            group by t.nr, t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl
+            group by t.nr, t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl, t.dest_table
             order by t.nr, t.table_name
     loop
         -- if the table is in middle of copy from multiple partitions,
index 94aac7f3ef0b486b1418d79dd15492543e7239b6..97b11e7481016031c15377aaf0a3a20691b2a2e6 100644 (file)
@@ -51,7 +51,7 @@ begin
     loop
         perform 1
            from londiste.table_info st_from
-          where st_from.table_name = fkeys.from_table
+          where coalesce(st_from.dest_table, st_from.table_name) = fkeys.from_table
             and st_from.merge_state = 'ok'
             and st_from.custom_snapshot is null
             and st_from.queue_name = i_queue_name;
@@ -60,7 +60,7 @@ begin
         end if;
         perform 1
            from londiste.table_info st_to
-          where st_to.table_name = fkeys.to_table
+          where coalesce(st_to.dest_table, st_to.table_name) = fkeys.to_table
             and st_to.merge_state = 'ok'
             and st_to.custom_snapshot is null
             and st_to.queue_name = i_queue_name;
index 0e4350877d4fb3b16b94dfeba9d5dc20f2bb4259..6b477d06f0a4e212c0ec1bb81be9be76b4811eb1 100644 (file)
@@ -3,18 +3,21 @@ create or replace function londiste.local_add_table(
     in i_table_name     text,
     in i_trg_args       text[],
     in i_table_attrs    text,
+    in i_dest_table     text,
     out ret_code        int4,
     out ret_note        text)
 as $$
 -- ----------------------------------------------------------------------
--- Function: londiste.local_add_table(3)
+-- Function: londiste.local_add_table(5)
 --
 --      Register table on Londiste node, with customizable trigger args.
 --
 -- Parameters:
---      i_queue_name - queue name
---      i_table_name - table name
---      i_trg_args   - args to trigger, or magic parameters.
+--      i_queue_name    - queue name
+--      i_table_name    - table name
+--      i_trg_args      - args to trigger, or magic parameters.
+--      i_table_attrs   - args to python handler
+--      i_dest_table    - actual name of destination table (NULL if same)
 --
 -- Trigger args:
 --      See documentation for pgq triggers.
@@ -91,6 +94,9 @@ declare
     _no_triggers boolean := false;
     _skip boolean := false;
     _virtual_table boolean := false;
+    _dest_table text;
+    _got_extra1 boolean := false;
+    _table_name2 text;
 begin
 
     -------- i_trg_args ARGUMENTS PARSING
@@ -123,6 +129,9 @@ begin
                 _expect_sync := true;   -- do not copy
                 _no_triggers := true;   -- do not create triggers
             else
+                if arg like 'ev_extra1=%' then
+                    _got_extra1 := true;
+                end if;
                 -- ordinary arg
                 _args = array_append(_args, quote_literal(arg));
             end if;
@@ -136,16 +145,24 @@ begin
     end if;
 
     fq_table_name := londiste.make_fqname(i_table_name);
+    _dest_table := londiste.make_fqname(coalesce(i_dest_table, i_table_name));
+
+    if _dest_table <> fq_table_name and not _got_extra1 then
+        -- if renamed table, enforce trigger to put
+        -- global table name into extra1
+        arg := 'ev_extra1=' || quote_literal(fq_table_name);
+        _args := array_append(_args, quote_literal(arg));
+    end if;
 
     -------- TABLE STRUCTURE CHECK
 
     if not _virtual_table then
-        _tbloid := londiste.find_table_oid(fq_table_name);
+        _tbloid := londiste.find_table_oid(_dest_table);
         if _tbloid is null then
-            select 404, 'Table does not exist: ' || fq_table_name into ret_code, ret_note;
+            select 404, 'Table does not exist: ' || _dest_table into ret_code, ret_note;
             return;
         end if;
-        col_types := londiste.find_column_types(fq_table_name);
+        col_types := londiste.find_column_types(_dest_table);
         if position('k' in col_types) < 1 then
             -- allow missing primary key in case of combined table where
             -- pkey was removed by londiste
@@ -156,10 +173,10 @@ begin
                 and n_other.combined_queue = n_this.combined_queue
                 and n_other.queue_name <> n_this.queue_name
                 and t.queue_name = n_other.queue_name
-                and t.table_name = fq_table_name
+                and coalesce(t.dest_table, t.table_name) = _dest_table
                 and t.dropped_ddl is not null;
             if not found then
-                select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
+                select 400, 'Primary key missing on table: ' || _dest_table into ret_code, ret_note;
                 return;
             end if;
         end if;
@@ -215,7 +232,8 @@ begin
     update londiste.table_info
         set local = true,
             merge_state = new_state,
-            table_attrs = coalesce(i_table_attrs, table_attrs)
+            table_attrs = coalesce(i_table_attrs, table_attrs),
+            dest_table = nullif(_dest_table, fq_table_name)
         where queue_name = i_queue_name and table_name = fq_table_name;
     if not found then
         raise exception 'lost table: %', fq_table_name;
@@ -223,13 +241,15 @@ begin
 
     -- merge all table sources on leaf
     if _node.node_type = 'leaf' and not _no_merge then
-        for _queue_name, _local in
-            select t2.queue_name, t2.local
+        for _queue_name, _table_name2, _local in
+            select t2.queue_name, t2.table_name, t2.local
             from londiste.table_info t
             join pgq_node.node_info n on (n.queue_name = t.queue_name)
             left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
                     (n2.combined_queue is null and n.combined_queue is null))
-            left join londiste.table_info t2 on (t2.table_name = t.table_name and t2.queue_name = n2.queue_name)
+            left join londiste.table_info t2
+              on (t2.queue_name = n2.queue_name and
+                  coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name))
             where t.queue_name = i_queue_name
               and t.table_name = fq_table_name
               and t2.queue_name != i_queue_name -- skip self
@@ -258,13 +278,14 @@ begin
                set local = true,
                    merge_state = new_state,
                    table_attrs = coalesce(i_table_attrs, table_attrs)
-               where queue_name = _queue_name and table_name = fq_table_name;
+               where queue_name = _queue_name and table_name = _table_name2;
             if not found then
-                raise exception 'lost table: %', fq_table_name;
+                raise exception 'lost table: % on queue %', _table_name2, _queue_name;
             end if;
         end loop;
 
         -- if this node has combined_queue, add table there too
+        -- note: we need to keep both table_name/dest_table values
         select n2.queue_name, t.table_name
             from pgq_node.node_info n1
             join pgq_node.node_info n2
@@ -275,7 +296,7 @@ begin
             into _combined_queue, _combined_table;
         if found and _combined_table is null then
             select f.ret_code, f.ret_note
-                from londiste.local_add_table(_combined_queue, fq_table_name, i_trg_args, i_table_attrs) f
+                from londiste.local_add_table(_combined_queue, fq_table_name, i_trg_args, i_table_attrs, _dest_table) f
                 into ret_code, ret_note;
             if ret_code >= 300 then
                 return;
@@ -346,7 +367,7 @@ begin
         select count(*), min(t.tgname)
         into _skip_trg_count, _skip_trg_name
         from pg_catalog.pg_trigger t
-        where t.tgrelid = londiste.find_table_oid(fq_table_name)
+        where t.tgrelid = londiste.find_table_oid(_dest_table)
             and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
         -- if no previous skip triggers, prefix name and add SKIP to args
         if _skip_trg_count = 0 then
@@ -358,12 +379,12 @@ begin
             -- if not prefixed then rename
             if position(_skip_prefix in _skip_trg_name) != 1 then
                 sql := 'alter trigger ' || _skip_trg_name
-                    || ' on ' || londiste.quote_fqname(fq_table_name)
+                    || ' on ' || londiste.quote_fqname(_dest_table)
                     || ' rename to ' || _skip_prefix || _skip_trg_name;
                 execute sql;
             end if;
         else
-            select 405, 'Multiple SKIP triggers in table: ' || fq_table_name
+            select 405, 'Multiple SKIP triggers in table: ' || _dest_table
             into ret_code, ret_note;
             return;
         end if;
@@ -371,7 +392,7 @@ begin
 
     -- create Ins/Upd/Del trigger if it does not exists already
     perform 1 from pg_catalog.pg_trigger
-        where tgrelid = londiste.find_table_oid(fq_table_name)
+        where tgrelid = londiste.find_table_oid(_dest_table)
             and tgname = lg_name;
     if not found then
 
@@ -390,7 +411,7 @@ begin
         -- create trigger
         sql := 'create trigger ' || quote_ident(lg_name)
             || ' ' || lg_pos || ' ' || lg_event
-            || ' on ' || londiste.quote_fqname(fq_table_name)
+            || ' on ' || londiste.quote_fqname(_dest_table)
             || ' for each row execute procedure '
             || lg_func || '(' || lg_args || _extra_args || ')';
         execute sql;
@@ -401,11 +422,11 @@ begin
     if pgversion >= 80400 then
         trunctrg_name  := '_londiste_' || i_queue_name || '_truncate';
         perform 1 from pg_catalog.pg_trigger
-          where tgrelid = londiste.find_table_oid(fq_table_name)
+          where tgrelid = londiste.find_table_oid(_dest_table)
             and tgname = trunctrg_name;
         if not found then
             sql := 'create trigger ' || quote_ident(trunctrg_name)
-                || ' after truncate on ' || londiste.quote_fqname(fq_table_name)
+                || ' after truncate on ' || londiste.quote_fqname(_dest_table)
                 || ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
                 || _extra_args || ')';
             execute sql;
@@ -422,7 +443,7 @@ begin
     if pgversion >= 90000 then
         select tg.tgname into logtrg_previous
         from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
-        where r.oid = londiste.find_table_oid(fq_table_name)
+        where r.oid = londiste.find_table_oid(_dest_table)
           and not tg.tgisinternal
           and tg.tgname < lg_name::name
           -- per-row AFTER trigger
@@ -436,7 +457,7 @@ begin
     else
         select tg.tgname into logtrg_previous
         from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
-        where r.oid = londiste.find_table_oid(fq_table_name)
+        where r.oid = londiste.find_table_oid(_dest_table)
           and not tg.tgisconstraint
           and tg.tgname < lg_name::name
           -- per-row AFTER trigger
@@ -463,6 +484,26 @@ begin
 end;
 $$ language plpgsql;
 
+create or replace function londiste.local_add_table(
+    in i_queue_name     text,
+    in i_table_name     text,
+    in i_trg_args       text[],
+    in i_table_attrs    text,
+    out ret_code        int4,
+    out ret_note        text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(4)
+--
+--      Register table on Londiste node.
+-- ----------------------------------------------------------------------
+begin
+    select f.ret_code, f.ret_note into ret_code, ret_note
+      from londiste.local_add_table(i_queue_name, i_table_name, i_trg_args, i_table_attrs, null) f;
+    return;
+end;
+$$ language plpgsql;
+
 create or replace function londiste.local_add_table(
     in i_queue_name     text,
     in i_table_name     text,
index 097e0b6f294247767c77612b105f6656563ef9d2..30d02dbf8fd66b7055bb5794977dd80fe3b0eef5 100644 (file)
@@ -43,7 +43,8 @@ begin
                 -- skip_truncate = null,
                 -- table_attrs = null,
                 -- dropped_ddl = null,
-                merge_state = null
+                merge_state = null,
+                dest_table = null
             where queue_name = i_queue_name
                 and table_name = fq_table_name;
     else
index 3269aa464fd929da469a400f49b00c076d2cb703..dab5c0aff0407bca420669d33068eb48499b1401 100644 (file)
@@ -21,7 +21,7 @@ begin
                   and n.nspname !~ '^pg_(toast|temp)'
                   and not exists (select 1 from londiste.table_info
                                    where queue_name = i_queue_name
-                                     and table_name = (n.nspname || '.' || r.relname))
+                                     and coalesce(dest_table, table_name) = (n.nspname || '.' || r.relname))
                 order by 1, 2
         loop
             return next;
index 455fe4146e7e575e5bf722527f7b117fabdc790f..b982a0cc1b213e00645ef613f1588bb8ef08742f 100644 (file)
@@ -19,6 +19,15 @@ begin
         cnt := cnt + 1;
     end if;
 
+    -- table_info.dest_table
+    perform 1 from information_schema.columns
+      where table_schema = 'londiste'
+        and table_name = 'table_info'
+        and column_name = 'dest_table';
+    if not found then
+        alter table londiste.table_info add column dest_table text;
+    end if;
+
     return cnt;
 end;
 $$ language plpgsql;
index ddfd880addc45b24698d4e6e193cfa33a53db64b..7d082dbb3bcb078a5988f8a2d9dabc1bad8e390e 100644 (file)
@@ -94,6 +94,7 @@ create table londiste.table_info (
     custom_snapshot     text,
     dropped_ddl         text,
     table_attrs         text,
+    dest_table          text,
 
     primary key (queue_name, table_name),
     foreign key (queue_name)