g = optparse.OptionGroup(p, "options for add")
g.add_option("--all", action="store_true",
help = "add: include add possible tables")
+ g.add_option("--dest-table",
+ help = "add: redirect changes to different table")
g.add_option("--force", action="store_true",
help = "add: ignore table differences, repair: ignore lag")
g.add_option("--expect-sync", action="store_true", dest="expect_sync",
"""Simple checker based in Syncer.
When tables are in sync runs simple SQL query on them.
"""
- def process_sync(self, tbl, src_db, dst_db):
+ def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
"""Actual comparision."""
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- self.log.info('Counting %s' % tbl)
+ self.log.info('Counting %s' % dst_tbl)
q = "select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t"
q = self.cf.get('compare_sql', q)
- q = q.replace('_TABLE_', skytools.quote_fqident(tbl))
+ src_q = q.replace('_TABLE_', skytools.quote_fqident(src_tbl))
+ dst_q = q.replace('_TABLE_', skytools.quote_fqident(dst_tbl))
f = "%(cnt)d rows, checksum=%(chksum)s"
f = self.cf.get('compare_fmt', f)
- self.log.debug("srcdb: " + q)
- src_curs.execute(q)
+ self.log.debug("srcdb: " + src_q)
+ src_curs.execute(src_q)
src_row = src_curs.fetchone()
src_str = f % src_row
self.log.info("srcdb: %s" % src_str)
+ src_db.commit()
- self.log.debug("dstdb: " + q)
- dst_curs.execute(q)
+ self.log.debug("dstdb: " + dst_q)
+ dst_curs.execute(dst_q)
dst_row = dst_curs.fetchone()
dst_str = f % dst_row
self.log.info("dstdb: %s" % dst_str)
+ dst_db.commit()
if src_str != dst_str:
- self.log.warning("%s: Results do not match!" % tbl)
+ self.log.warning("%s: Results do not match!" % dst_tbl)
if __name__ == '__main__':
script = Comparator(sys.argv[1:])
"""
-import sys, skytools, londiste.handlers
+import sys
+import logging
+import skytools
+import londiste.handlers
__all__ = ['RowCache', 'BaseHandler', 'build_handler',
'load_handler_modules', 'create_handler_string']
"""Defines base API, does nothing.
"""
handler_name = 'nop'
- def __init__(self, table_name, args, log):
+ log = logging.getLogger('basehandler')
+
+ def __init__(self, table_name, args, dest_table):
self.table_name = table_name
+ self.dest_table = dest_table or table_name
+ self.fq_table_name = skytools.quote_fqident(self.table_name)
+ self.fq_dest_table = skytools.quote_fqident(self.dest_table)
self.args = args
- self.log = log
def add(self, trigger_arg_list):
"""Called when table is added.
"""Called when batch finishes."""
pass
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
condition = ' and '.join(cond_list)
- return skytools.full_copy(tablename, src_curs, dst_curs, column_list,
- condition)
+ return skytools.full_copy(src_tablename, src_curs, dst_curs,
+ column_list, condition,
+ dst_tablename = self.dest_table)
def needs_table(self):
"""Does the handler need the table to exist on destination."""
def process_event(self, ev, sql_queue_func, arg):
if len(ev.type) == 1:
# sql event
- fqname = skytools.quote_fqident(ev.extra1)
+ fqname = self.fq_dest_table
fmt = self.sql_command[ev.type]
sql = fmt % (fqname, ev.data)
else:
pklist = ev.type[2:].split(',')
row = skytools.db_urldecode(ev.data)
op = ev.type[0]
- tbl = ev.extra1
+ tbl = self.dest_table
if op == 'I':
sql = skytools.mk_insert_sql(row, tbl, pklist)
elif op == 'U':
args = skytools.db_urldecode(astr)
return (name, args)
-def build_handler(tblname, hstr, log):
+def build_handler(tblname, hstr, dest_table=None):
"""Parse and initialize handler.
hstr is result of create_handler_string()."""
# when no handler specified, use londiste
hname = hname or 'londiste'
klass = _handler_map[hname]
- return klass(tblname, args, log)
+ if not dest_table:
+ dest_table = tblname
+ return klass(tblname, args, dest_table)
def load_handler_modules(cf):
"""Load and register modules from config."""
Define successor for handler class cls with func as argument generator
"""
def wrapper(func):
- def _init_override(self, table_name, args, log):
- cls.__init__(self, table_name, func(args.copy()), log)
+ def _init_override(self, table_name, args, dest_table):
+ cls.__init__(self, table_name, func(args.copy()), dest_table)
dct = {'__init__': _init_override, 'handler_name': name}
module = sys.modules[cls.__module__]
newname = '%s_%s' % (cls.__name__, name.replace('.','_'))
in reverse order """
return reduce(lambda x, y: x.update(y) or x,
(p[i] for i in range(len(p)-1,-1,-1)), {})
+
"""
handler_name = 'bulk'
fake_seq = 0
- def __init__(self, table_name, args, log):
+ def __init__(self, table_name, args, dest_table):
"""Init per-batch table data cache."""
- BaseHandler.__init__(self, table_name, args, log)
+ BaseHandler.__init__(self, table_name, args, dest_table)
self.pkey_list = None
self.dist_fields = None
# create temp table
temp, qtemp = self.create_temp_table(curs)
- tbl = self.table_name
- qtbl = quote_fqident(self.table_name)
+ tbl = self.dest_table
+ qtbl = self.fq_dest_table
# where expr must have pkey and dist fields
klist = []
def create_temp_table(self, curs):
if USE_REAL_TABLE:
- tempname = self.table_name + "_loadertmpx"
+ tempname = self.dest_table + "_loadertmpx"
else:
# create temp table for loading
- tempname = self.table_name.replace('.', '_') + "_loadertmp"
+ tempname = self.dest_table.replace('.', '_') + "_loadertmp"
# check if exists
if USE_REAL_TABLE:
# create non-temp table
q = "create table %s (like %s)" % (
quote_fqident(tempname),
- quote_fqident(self.table_name))
+ quote_fqident(self.dest_table))
self.log.debug("bulk: Creating real table: %s" % q)
curs.execute(q)
return tempname, quote_fqident(tempname)
arg = "on commit preserve rows"
# create temp table for loading
q = "create temp table %s (like %s) %s" % (
- quote_ident(tempname), quote_fqident(self.table_name), arg)
+ quote_ident(tempname), quote_fqident(self.dest_table), arg)
self.log.debug("bulk: Creating temp table: %s" % q)
curs.execute(q)
return tempname, quote_ident(tempname)
def find_dist_fields(self, curs):
if not skytools.exists_table(curs, "pg_catalog.gp_distribution_policy"):
return []
- schema, name = skytools.fq_name_parts(self.table_name)
+ schema, name = skytools.fq_name_parts(self.dest_table)
q = "select a.attname"\
" from pg_class t, pg_namespace n, pg_attribute a,"\
" gp_distribution_policy p"\
"""
handler_name = 'dispatch'
- def __init__(self, table_name, args, log):
- BaseHandler.__init__(self, table_name, args, log)
+ def __init__(self, table_name, args, dest_table):
+
+ # compat for dest-table
+ dest_table = args.get('table', dest_table)
+
+ BaseHandler.__init__(self, table_name, args, dest_table)
+
# show args
self.log.debug("dispatch.init: table_name=%r, args=%r" % \
(table_name, args))
- # get table name
- self.table_name = args.get('table', self.table_name)
- self.quoted_name = quote_fqident(self.table_name)
self.batch_info = None
self.dst_curs = None
self.pkeys = None
if dst not in self.row_handler.table_map:
self.check_part(dst, part_time)
else:
- dst = self.table_name
+ dst = self.dest_table
if dst not in self.row_handler.table_map:
self.row_handler.add_table(dst, LOADERS[self.conf.load_mode],
else:
raise UsageError('Bad value for part_mode: %s' %\
self.conf.part_mode)
- vals = {'parent': self.table_name,
+ vals = {'parent': self.dest_table,
'year': "%04d" % dtm.year,
'month': "%02d" % dtm.month,
'day': "%02d" % dtm.day,
dst = quote_fqident(dst)
vals = {'dest': dst,
'part': dst,
- 'parent': self.quoted_name,
+ 'parent': self.fq_dest_table,
'pkeys': ",".join(self.pkeys), # quoting?
# we do this to make sure that constraints for
# tables who contain a schema will still work
else:
self.log.debug('part func %s not found, cloning table' %\
PART_FUNC)
- struct = TableStruct(curs, self.table_name)
+ struct = TableStruct(curs, self.dest_table)
struct.create(curs, T_ALL, dst)
exec_with_vals(self.conf.post_part)
self.log.info("Created table: %s" % dst)
else:
_write_hook = None
- return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols,
- condition, self.table_name, _dst_cols,
+ return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols, condition,
+ dst_tablename = self.dest_table,
+ dst_column_list = _dst_cols,
write_hook = _write_hook)
"""Handle multimaster replicas"""
handler_name = 'multimaster'
- def __init__(self, table_name, args, log):
+ def __init__(self, table_name, args, dest_table):
"""Init per-batch table data cache."""
conf = args.copy()
# remove Multimaster args from conf
conf.pop(name)
conf = skytools.db_urlencode(conf)
args = update(args, {'func_name': 'merge_on_time', 'func_conf': conf})
- ApplyFuncHandler.__init__(self, table_name, args, log)
+ ApplyFuncHandler.__init__(self, table_name, args, dest_table)
def add(self, trigger_arg_list):
"""Create SKIP and BEFORE INSERT trigger"""
class PartHandler(TableHandler):
handler_name = 'part'
- def __init__(self, table_name, args, log):
- TableHandler.__init__(self, table_name, args, log)
+ def __init__(self, table_name, args, dest_table):
+ TableHandler.__init__(self, table_name, args, dest_table)
self.max_part = None # max part number
self.local_part = None # part number of local node
self.key = args.get('key')
class QueueSplitterHandler(BaseHandler):
handler_name = 'qsplitter'
- def __init__(self, table_name, args, log):
+ def __init__(self, table_name, args, dest_table):
"""Init per-batch table data cache."""
- BaseHandler.__init__(self, table_name, args, log)
+ BaseHandler.__init__(self, table_name, args, dest_table)
try:
self.dst_queue_name = args['queue']
except KeyError:
def __init__(self, name, log):
"""Init TableState for one table."""
self.name = name
+ self.dest_table = name
self.log = log
# same as forget:
self.state = TABLE_MISSING
self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy',
self.max_parallel_copy))
+ if 'dest_table' in row and row['dest_table']:
+ self.dest_table = row['dest_table']
+ else:
+ self.dest_table = self.name
+
hstr = self.table_attrs.get('handlers', '') # compat
hstr = self.table_attrs.get('handler', hstr)
- self.plugin = build_handler(self.name, hstr, self.log)
+ self.plugin = build_handler(self.name, hstr, self.dest_table)
def max_parallel_copies_reached(self):
return self.max_parallel_copy and\
npossible -= 1
# drop all foreign keys to and from this table
- self.drop_fkeys(dst_db, t.name)
+ self.drop_fkeys(dst_db, t.dest_table)
# change state after fkeys are dropped thus allowing
# failure inbetween
self.stat_increase('ignored_events')
return
- fqname = skytools.quote_fqident(ev.extra1)
+ fqname = skytools.quote_fqident(t.dest_table)
if dst_curs.connection.server_version >= 80400:
sql = "TRUNCATE ONLY %s;" % fqname
else:
pkey_list = []
common_fields = []
- def process_sync(self, tbl, src_db, dst_db):
+ def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
"""Actual comparision."""
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- self.log.info('Checking %s' % tbl)
+ self.log.info('Checking %s' % dst_tbl)
self.common_fields = []
+ self.fq_common_fields = []
self.pkey_list = []
- copy_tbl = self.gen_copy_tbl(tbl, src_curs, dst_curs)
+ self.load_common_columns(src_tbl, dst_tbl, src_curs, dst_curs)
- dump_src = tbl + ".src"
- dump_dst = tbl + ".dst"
+ dump_src = dst_tbl + ".src"
+ dump_dst = dst_tbl + ".dst"
- self.log.info("Dumping src table: %s" % tbl)
- self.dump_table(tbl, copy_tbl, src_curs, dump_src)
+ self.log.info("Dumping src table: %s" % src_tbl)
+ self.dump_table(src_tbl, src_curs, dump_src)
src_db.commit()
- self.log.info("Dumping dst table: %s" % tbl)
- self.dump_table(tbl, copy_tbl, dst_curs, dump_dst)
+ self.log.info("Dumping dst table: %s" % dst_tbl)
+ self.dump_table(dst_tbl, dst_curs, dump_dst)
dst_db.commit()
- self.log.info("Sorting src table: %s" % tbl)
+ self.log.info("Sorting src table: %s" % dump_src)
s_in, s_out = os.popen4("sort --version")
s_ver = s_out.read()
else:
args = ""
os.system("sort %s -T . -o %s.sorted %s" % (args, dump_src, dump_src))
- self.log.info("Sorting dst table: %s" % tbl)
+ self.log.info("Sorting dst table: %s" % dump_dst)
os.system("sort %s -T . -o %s.sorted %s" % (args, dump_dst, dump_dst))
- self.dump_compare(tbl, dump_src + ".sorted", dump_dst + ".sorted")
+ self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted")
os.unlink(dump_src)
os.unlink(dump_dst)
os.unlink(dump_src + ".sorted")
os.unlink(dump_dst + ".sorted")
- def gen_copy_tbl(self, tbl, src_curs, dst_curs):
- """Create COPY expession from common fields."""
- self.pkey_list = get_pkey_list(src_curs, tbl)
- dst_pkey = get_pkey_list(dst_curs, tbl)
+ def load_common_columns(self, src_tbl, dst_tbl, src_curs, dst_curs):
+ """Get common fields, put pkeys in start."""
+
+ self.pkey_list = get_pkey_list(src_curs, src_tbl)
+ dst_pkey = get_pkey_list(dst_curs, dst_tbl)
if dst_pkey != self.pkey_list:
self.log.error('pkeys do not match')
sys.exit(1)
- src_cols = get_column_list(src_curs, tbl)
- dst_cols = get_column_list(dst_curs, tbl)
+ src_cols = get_column_list(src_curs, src_tbl)
+ dst_cols = get_column_list(dst_curs, dst_tbl)
field_list = []
for f in self.pkey_list:
field_list.append(f)
self.common_fields = field_list
fqlist = [skytools.quote_ident(col) for col in field_list]
+ self.fq_common_fields = fqlist
- tbl_expr = "%s (%s)" % (skytools.quote_fqident(tbl), ",".join(fqlist))
-
- self.log.debug("using copy expr: %s" % tbl_expr)
+ cols = ",".join(fqlist)
+ self.log.debug("using columns: %s" % cols)
- return tbl_expr
-
- def dump_table(self, tbl, copy_tbl, curs, fn):
+ def dump_table(self, tbl, curs, fn):
"""Dump table to disk."""
+ cols = ','.join(self.fq_common_fields)
+ q = "copy %s (%s) to stdout" % (skytools.quote_fqident(tbl), cols)
+
f = open(fn, "w", 64*1024)
- curs.copy_to(f, copy_tbl)
+ curs.copy_expert(q, f)
size = f.tell()
f.close()
self.log.info('%s: Got %d bytes' % (tbl, size))
help="don't merge tables from source queues", default=False)
p.add_option("--max-parallel-copy", type = "int",
help="max number of parallel copy processes")
+ p.add_option("--dest-table",
+ help="add: name for actual table")
return p
# dont check for exist/not here (root handling)
problems = False
for tbl in args:
- if (tbl in src_tbls) and not src_tbls[tbl]:
+ if (tbl in src_tbls) and not src_tbls[tbl]['local']:
self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl)
problems = True
if problems:
else:
create_flags = 0
+ # sanity check
+ if self.options.dest_table and len(args) > 1:
+ self.log.error("--dest-table can be given only for single table")
+ sys.exit(1)
+
# seems ok
for tbl in args:
tbl = skytools.fq_name(tbl)
- self.add_table(src_db, dst_db, tbl, create_flags)
+ self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
- def add_table(self, src_db, dst_db, tbl, create_flags):
+ def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls):
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- tbl_exists = skytools.exists_table(dst_curs, tbl)
+ src_dest_table = src_tbls[tbl]['dest_table']
+ dest_table = self.options.dest_table or tbl
+ tbl_exists = skytools.exists_table(dst_curs, dest_table)
if create_flags:
if tbl_exists:
- self.log.info('Table %s already exist, not touching' % tbl)
+ self.log.info('Table %s already exist, not touching' % dest_table)
else:
- if not skytools.exists_table(src_curs, tbl):
+ if not skytools.exists_table(src_curs, src_dest_table):
# table not present on provider - nowhere to get the DDL from
self.log.warning('Table "%s" missing on provider, skipping' % tbl)
return
- schema = skytools.fq_name_parts(tbl)[0]
+ schema = skytools.fq_name_parts(dest_table)[0]
if not skytools.exists_schema(dst_curs, schema):
q = "create schema %s" % skytools.quote_ident(schema)
dst_curs.execute(q)
- s = skytools.TableStruct(src_curs, tbl)
+ s = skytools.TableStruct(src_curs, src_dest_table)
src_db.commit()
- s.create(dst_curs, create_flags, log = self.log)
+
+ # create, using rename logic only when necessary
+ newname = None
+ if src_dest_table != dest_table:
+ newname = dest_table
+ s.create(dst_curs, create_flags, log = self.log, new_table_name = newname)
tgargs = []
if self.options.trigger_arg:
if self.options.handler:
hstr = londiste.handler.create_handler_string(
self.options.handler, self.options.handler_arg)
- p = londiste.handler.build_handler(tbl, hstr, self.log)
+ p = londiste.handler.build_handler(tbl, hstr, self.options.dest_table)
attrs['handler'] = hstr
p.add(tgargs)
if self.options.handler:
hstr = londiste.handler.create_handler_string(
self.options.handler, self.options.handler_arg)
- p = londiste.handler.build_handler('unused.string', hstr, self.log)
+ p = londiste.handler.build_handler('unused.string', hstr, None)
return p.needs_table()
return True
if tbl not in dst_tbls:
self.log.info("Table %s info missing from subscriber, adding" % tbl)
self.exec_cmd(dst_curs, q, [self.set_name, tbl])
- dst_tbls[tbl] = False
+ dst_tbls[tbl] = {'local': False, 'dest_table': tbl}
for tbl in dst_tbls.keys():
q = "select * from londiste.global_remove_table(%s, %s)"
if tbl not in src_tbls:
del dst_tbls[tbl]
def fetch_set_tables(self, curs):
- q = "select table_name, local from londiste.get_table_list(%s)"
+ q = "select table_name, local, "\
+ " coalesce(dest_table, table_name) as dest_table "\
+ " from londiste.get_table_list(%s)"
curs.execute(q, [self.set_name])
res = {}
for row in curs.fetchall():
- res[row[0]] = row[1]
+ res[row[0]] = row
return res
def cmd_remove_table(self, *args):
import sys, time, skytools
+class ATable:
+ def __init__(self, row):
+ self.table_name = row['table_name']
+ self.dest_table = row['dest_table'] or row['table_name']
+ self.merge_state = row['merge_state']
+
class Syncer(skytools.DBScript):
"""Walks tables in primary key order and checks if data matches."""
self.log.error('Consumer lagging too much, cannot proceed')
sys.exit(1)
- def get_subscriber_table_state(self, dst_db):
- """Load table states from subscriber."""
- dst_curs = dst_db.cursor()
- q = "select * from londiste.get_table_list(%s) where local"
- dst_curs.execute(q, [self.queue_name])
- res = dst_curs.dictfetchall()
- dst_db.commit()
- return res
+ def get_tables(self, db):
+ """Load table info.
+
+ Returns tuple of (dict(name->ATable), namelist)"""
+
+ curs = db.cursor()
+ q = "select table_name, merge_state, dest_table"\
+ " from londiste.get_table_list(%s) where local"
+ curs.execute(q, [self.queue_name])
+ rows = curs.fetchall()
+ db.commit()
+
+ res = {}
+ names = []
+ for row in rows:
+ t = ATable(row)
+ res[t.table_name] = t
+ names.append(t.table_name)
+ return res, names
def work(self):
"""Syncer main function."""
self.check_consumer(setup_curs)
- state_list = self.get_subscriber_table_state(dst_db)
- state_map = {}
- full_list = []
- for ts in state_list:
- name = ts['table_name']
- full_list.append(name)
- state_map[name] = ts
+ src_tables, ignore = self.get_tables(src_db)
+ dst_tables, names = self.get_tables(dst_db)
if len(self.args) > 2:
tlist = self.args[2:]
else:
- tlist = full_list
+ tlist = names
for tbl in tlist:
tbl = skytools.fq_name(tbl)
- if not tbl in state_map:
+ if not tbl in dst_tables:
self.log.warning('Table not subscribed: %s' % tbl)
continue
- st = state_map[tbl]
- if st['merge_state'] != 'ok':
- self.log.info('Table %s not synced yet, no point' % tbl)
+ if not tbl in src_tables:
+ self.log.warning('Table not available on provider: %s' % tbl)
+ continue
+ t1 = src_tables[tbl]
+ t2 = dst_tables[tbl]
+
+ if t1.merge_state != 'ok':
+ self.log.warning('Table %s not ready yet on provider' % tbl)
continue
- self.check_table(tbl, lock_db, src_db, dst_db, setup_curs)
+ if t2.merge_state != 'ok':
+ self.log.warning('Table %s not synced yet, no point' % tbl)
+ continue
+ self.check_table(t1.dest_table, t2.dest_table, lock_db, src_db, dst_db, setup_curs)
lock_db.commit()
src_db.commit()
dst_db.commit()
if dur > 10 and not self.options.force:
raise Exception("Ticker seems dead")
- def check_table(self, tbl, lock_db, src_db, dst_db, setup_curs):
+ def check_table(self, src_tbl, dst_tbl, lock_db, src_db, dst_db, setup_curs):
"""Get transaction to same state, then process."""
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- if not skytools.exists_table(src_curs, tbl):
- self.log.warning("Table %s does not exist on provider side" % tbl)
+ if not skytools.exists_table(src_curs, src_tbl):
+ self.log.warning("Table %s does not exist on provider side" % src_tbl)
return
- if not skytools.exists_table(dst_curs, tbl):
- self.log.warning("Table %s does not exist on subscriber side" % tbl)
+ if not skytools.exists_table(dst_curs, dst_tbl):
+ self.log.warning("Table %s does not exist on subscriber side" % dst_tbl)
return
# lock table in separate connection
- self.log.info('Locking %s' % tbl)
+ self.log.info('Locking %s' % src_tbl)
lock_db.commit()
self.set_lock_timeout(lock_curs)
lock_time = time.time()
- lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(tbl))
+ lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(src_tbl))
# now wait until consumer has updated target table until locking
- self.log.info('Syncing %s' % tbl)
+ self.log.info('Syncing %s' % dst_tbl)
# consumer must get futher than this tick
tick_id = self.force_tick(setup_curs)
lock_db.commit()
# do work
- self.process_sync(tbl, src_db, dst_db)
+ self.process_sync(src_tbl, dst_tbl, src_db, dst_db)
# done
src_db.commit()
dst_db.commit()
- def process_sync(self, tbl, src_db, dst_db):
+ def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
"""It gets 2 connections in state where tbl should be in same state.
"""
raise Exception('process_sync not implemented')
q = "select * from pgq_node.get_node_info(%s)"
rows = self.exec_cmd(dst_db, q, [self.queue_name])
return rows[0]['provider_location']
+
self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
time.sleep(10)
+ src_real_table = pt.dest_table
+
# 0 - dont touch
# 1 - single tx
# 2 - multi tx
# just in case, drop all fkeys (in case "replay" was skipped)
# !! this may commit, so must be done before anything else !!
- self.drop_fkeys(dst_db, tbl_stat.name)
+ self.drop_fkeys(dst_db, tbl_stat.dest_table)
# now start ddl-dropping tx
- q = "lock table " + skytools.quote_fqident(tbl_stat.name)
+ q = "lock table " + skytools.quote_fqident(tbl_stat.dest_table)
dst_curs.execute(q)
# find dst struct
- src_struct = TableStruct(src_curs, tbl_stat.name)
- dst_struct = TableStruct(dst_curs, tbl_stat.name)
+ src_struct = TableStruct(src_curs, src_real_table)
+ dst_struct = TableStruct(dst_curs, tbl_stat.dest_table)
# take common columns, warn on missing ones
dlist = dst_struct.get_column_list()
q = "truncate "
if dst_db.server_version >= 80400:
q += "only "
- q += skytools.quote_fqident(tbl_stat.name)
+ q += skytools.quote_fqident(tbl_stat.dest_table)
dst_curs.execute(q)
if cmode == 2 and tbl_stat.dropped_ddl is None:
tbl_stat.dropped_ddl = ddl
# do truncate & copy
- self.real_copy(src_curs, dst_curs, tbl_stat, common_cols)
+ self.real_copy(src_curs, dst_curs, tbl_stat, common_cols, src_real_table)
# get snapshot
src_curs.execute("select txid_current_snapshot()")
src_curs.execute(q, [self.queue_name])
src_db.commit()
- def real_copy(self, srccurs, dstcurs, tbl_stat, col_list):
+ def real_copy(self, srccurs, dstcurs, tbl_stat, col_list, src_real_table):
"Actual copy."
tablename = tbl_stat.name
cond = tbl_stat.table_attrs.get('copy_condition')
if cond:
cond_list.append(cond)
- stats = p.real_copy(tablename, srccurs, dstcurs, col_list, cond_list)
+ stats = p.real_copy(src_real_table, srccurs, dstcurs, col_list, cond_list)
if stats:
self.log.info("%s: copy finished: %d bytes, %d rows" % (
tablename, stats[0], stats[1]))
(1 row)
select * from londiste.get_table_list('leafq');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.leafdata | t | | | | | | 0
- public.tmp | f | | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.leafdata | t | | | | | | 0 |
+ public.tmp | f | | | | | | 0 |
(2 rows)
select tgname, tgargs from pg_trigger
(1 row)
select * from londiste.get_table_list('leafq');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.leafdata | f | | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.leafdata | f | | | | | | 0 |
(1 row)
select * from londiste.local_show_missing('leafq');
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | | | | | | 0 |
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | | | | | | 0 |
(1 row)
select * from londiste.get_table_list('combined_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | ok | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | ok | | | | | 0 |
(1 row)
select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | in-copy | | | | lead | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | in-copy | | | | lead | 0 |
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | in-copy | | | | wait-copy | 1
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | in-copy | | | | wait-copy | 1 |
(1 row)
select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'create index;');
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
- public.tblmerge | t | in-copy | | | create index; | lead | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------+------------
+ public.tblmerge | t | in-copy | | | create index; | lead | 0 |
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-------------+----------
- public.tblmerge | t | in-copy | | | | wait-replay | 1
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-------------+----------+------------
+ public.tblmerge | t | in-copy | | | | wait-replay | 1 |
(1 row)
select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
- public.tblmerge | t | in-copy | | | create index; | lead | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------+------------
+ public.tblmerge | t | in-copy | | | create index; | lead | 0 |
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-------------+----------
- public.tblmerge | t | catching-up | | | | wait-replay | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-------------+----------+------------
+ public.tblmerge | t | catching-up | | | | wait-replay | 0 |
(1 row)
select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
- public.tblmerge | t | catching-up | | | create index; | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------+------------
+ public.tblmerge | t | catching-up | | | create index; | | 0 |
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-------------+----------
- public.tblmerge | t | catching-up | | | | wait-replay | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-------------+----------+------------
+ public.tblmerge | t | catching-up | | | | wait-replay | 0 |
(1 row)
select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | catching-up | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | catching-up | | | | | 0 |
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | catching-up | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.tblmerge | t | catching-up | | | | | 0 |
(1 row)
-- test automatic registration on combined-root
(1 row)
select * from londiste.get_table_list('part2_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+----------------+-------------+-----------+----------
- public.tblmerge | t | catching-up | | | | | 0
- public.tblauto | t | ok | | handler=vtable | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+----------------+-------------+-----------+----------+------------
+ public.tblmerge | t | catching-up | | | | | 0 |
+ public.tblauto | t | ok | | handler=vtable | | | 0 |
(2 rows)
select * from londiste.get_table_list('combined_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+----------------+-------------+-----------+----------
- public.tblmerge | t | ok | | | | | 0
- public.tblauto | t | ok | | handler=vtable | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+----------------+-------------+-----------+----------+------------
+ public.tblmerge | t | ok | | | | | 0 |
+ public.tblauto | t | ok | | handler=vtable | | | 0 |
(2 rows)
--
insert into testdata (txt) values ('test-data');
select * from londiste.get_table_list('aset');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.testdata | t | ok | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+-----------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.testdata | t | ok | | | | | 0 |
(1 row)
select * from londiste.local_show_missing('aset');
(0 rows)
select * from londiste.get_table_list('aset');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
-------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
(0 rows)
select ev_id, ev_type, ev_data, ev_extra1 from pgq.event_template;
delete from londiste.table_info where table_name = 'public.trg_test';
select tgname from pg_trigger where tgrelid = 'public.trg_test'::regclass order by 1;
- tgname
---------
-(0 rows)
+ tgname
+-------------------------
+ _londiste_aset
+ _londiste_aset_truncate
+(2 rows)
(1 row)
select * from londiste.get_table_list('branch_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
-------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.slavedata | t | | | | | | 0
- public.tmp | f | | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.slavedata | t | | | | | | 0 |
+ public.tmp | f | | | | | | 0 |
(2 rows)
select * from londiste.global_remove_table('branch_set', 'public.tmp');
(1 row)
select * from londiste.get_table_list('branch_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
-------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.slavedata | f | | | | | | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos | dest_table
+------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------+------------
+ public.slavedata | f | | | | | | 0 |
(1 row)
select * from londiste.local_show_missing('branch_set');
declare
logtrg_name text;
b_queue_name bytea;
+ _dest_table text;
begin
+ select coalesce(dest_table, table_name)
+ from londiste.table_info t
+ where t.queue_name = i_queue_name
+ and t.table_name = i_table_name
+ into _dest_table;
+ if not found then
+ return;
+ end if;
+
-- skip if no triggers found on that table
- perform 1 from pg_catalog.pg_trigger where tgrelid = londiste.find_table_oid(i_table_name);
+ perform 1 from pg_catalog.pg_trigger where tgrelid = londiste.find_table_oid(_dest_table);
if not found then
return;
end if;
+
-- cast to bytea
b_queue_name := replace(i_queue_name, E'\\', E'\\\\')::bytea;
-- dependency on naming standard or side-storage.
for logtrg_name in
select tgname from pg_catalog.pg_trigger
- where tgrelid = londiste.find_table_oid(i_table_name)
+ where tgrelid = londiste.find_table_oid(_dest_table)
and londiste.is_replica_func(tgfoid)
and octet_length(tgargs) > 0
and substring(tgargs for (position(E'\\000'::bytea in tgargs) - 1)) = b_queue_name
loop
execute 'drop trigger ' || quote_ident(logtrg_name)
- || ' on ' || londiste.quote_fqname(i_table_name);
+ || ' on ' || londiste.quote_fqname(_dest_table);
end loop;
end;
$$ language plpgsql strict;
out table_attrs text,
out dropped_ddl text,
out copy_role text,
- out copy_pos int4)
+ out copy_pos int4,
+ out dest_table text)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: londiste.get_table_list(1)
n_combined_queue text;
begin
for v_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl,
- q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos
+ q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos, dest_table
in
select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl,
min(case when t2.local then t2.queue_name else null end) as _queue1,
count(case when t2.local then t2.table_name else null end) as _total,
count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
min(n.combined_queue) as _combined_queue,
- count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos
+ count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos,
+ t.dest_table as _dest_table
from londiste.table_info t
join pgq_node.node_info n on (n.queue_name = t.queue_name)
left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
(n2.combined_queue is null and n.combined_queue is null))
- left join londiste.table_info t2 on (t2.table_name = t.table_name and
- t2.queue_name = n2.queue_name and (t2.merge_state is null or t2.merge_state != 'ok'))
+ left join londiste.table_info t2 on
+ (coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name) and
+ t2.queue_name = n2.queue_name and
+ (t2.merge_state is null or t2.merge_state != 'ok'))
where t.queue_name = i_queue_name
- group by t.nr, t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl
+ group by t.nr, t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl, t.dest_table
order by t.nr, t.table_name
loop
-- if the table is in middle of copy from multiple partitions,
loop
perform 1
from londiste.table_info st_from
- where st_from.table_name = fkeys.from_table
+ where coalesce(st_from.dest_table, st_from.table_name) = fkeys.from_table
and st_from.merge_state = 'ok'
and st_from.custom_snapshot is null
and st_from.queue_name = i_queue_name;
end if;
perform 1
from londiste.table_info st_to
- where st_to.table_name = fkeys.to_table
+ where coalesce(st_to.dest_table, st_to.table_name) = fkeys.to_table
and st_to.merge_state = 'ok'
and st_to.custom_snapshot is null
and st_to.queue_name = i_queue_name;
in i_table_name text,
in i_trg_args text[],
in i_table_attrs text,
+ in i_dest_table text,
out ret_code int4,
out ret_note text)
as $$
-- ----------------------------------------------------------------------
--- Function: londiste.local_add_table(3)
+-- Function: londiste.local_add_table(5)
--
-- Register table on Londiste node, with customizable trigger args.
--
-- Parameters:
--- i_queue_name - queue name
--- i_table_name - table name
--- i_trg_args - args to trigger, or magic parameters.
+-- i_queue_name - queue name
+-- i_table_name - table name
+-- i_trg_args - args to trigger, or magic parameters.
+-- i_table_attrs - args to python handler
+-- i_dest_table - actual name of destination table (NULL if same)
--
-- Trigger args:
-- See documentation for pgq triggers.
_no_triggers boolean := false;
_skip boolean := false;
_virtual_table boolean := false;
+ _dest_table text;
+ _got_extra1 boolean := false;
+ _table_name2 text;
begin
-------- i_trg_args ARGUMENTS PARSING
_expect_sync := true; -- do not copy
_no_triggers := true; -- do not create triggers
else
+ if arg like 'ev_extra1=%' then
+ _got_extra1 := true;
+ end if;
-- ordinary arg
_args = array_append(_args, quote_literal(arg));
end if;
end if;
fq_table_name := londiste.make_fqname(i_table_name);
+ _dest_table := londiste.make_fqname(coalesce(i_dest_table, i_table_name));
+
+ if _dest_table <> fq_table_name and not _got_extra1 then
+ -- if renamed table, enforce trigger to put
+ -- global table name into extra1
+ arg := 'ev_extra1=' || quote_literal(fq_table_name);
+ _args := array_append(_args, quote_literal(arg));
+ end if;
-------- TABLE STRUCTURE CHECK
if not _virtual_table then
- _tbloid := londiste.find_table_oid(fq_table_name);
+ _tbloid := londiste.find_table_oid(_dest_table);
if _tbloid is null then
- select 404, 'Table does not exist: ' || fq_table_name into ret_code, ret_note;
+ select 404, 'Table does not exist: ' || _dest_table into ret_code, ret_note;
return;
end if;
- col_types := londiste.find_column_types(fq_table_name);
+ col_types := londiste.find_column_types(_dest_table);
if position('k' in col_types) < 1 then
-- allow missing primary key in case of combined table where
-- pkey was removed by londiste
and n_other.combined_queue = n_this.combined_queue
and n_other.queue_name <> n_this.queue_name
and t.queue_name = n_other.queue_name
- and t.table_name = fq_table_name
+ and coalesce(t.dest_table, t.table_name) = _dest_table
and t.dropped_ddl is not null;
if not found then
- select 400, 'Primary key missing on table: ' || fq_table_name into ret_code, ret_note;
+ select 400, 'Primary key missing on table: ' || _dest_table into ret_code, ret_note;
return;
end if;
end if;
update londiste.table_info
set local = true,
merge_state = new_state,
- table_attrs = coalesce(i_table_attrs, table_attrs)
+ table_attrs = coalesce(i_table_attrs, table_attrs),
+ dest_table = nullif(_dest_table, fq_table_name)
where queue_name = i_queue_name and table_name = fq_table_name;
if not found then
raise exception 'lost table: %', fq_table_name;
-- merge all table sources on leaf
if _node.node_type = 'leaf' and not _no_merge then
- for _queue_name, _local in
- select t2.queue_name, t2.local
+ for _queue_name, _table_name2, _local in
+ select t2.queue_name, t2.table_name, t2.local
from londiste.table_info t
join pgq_node.node_info n on (n.queue_name = t.queue_name)
left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
(n2.combined_queue is null and n.combined_queue is null))
- left join londiste.table_info t2 on (t2.table_name = t.table_name and t2.queue_name = n2.queue_name)
+ left join londiste.table_info t2
+ on (t2.queue_name = n2.queue_name and
+ coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name))
where t.queue_name = i_queue_name
and t.table_name = fq_table_name
and t2.queue_name != i_queue_name -- skip self
set local = true,
merge_state = new_state,
table_attrs = coalesce(i_table_attrs, table_attrs)
- where queue_name = _queue_name and table_name = fq_table_name;
+ where queue_name = _queue_name and table_name = _table_name2;
if not found then
- raise exception 'lost table: %', fq_table_name;
+ raise exception 'lost table: % on queue %', _table_name2, _queue_name;
end if;
end loop;
-- if this node has combined_queue, add table there too
+ -- note: we need to keep both table_name/dest_table values
select n2.queue_name, t.table_name
from pgq_node.node_info n1
join pgq_node.node_info n2
into _combined_queue, _combined_table;
if found and _combined_table is null then
select f.ret_code, f.ret_note
- from londiste.local_add_table(_combined_queue, fq_table_name, i_trg_args, i_table_attrs) f
+ from londiste.local_add_table(_combined_queue, fq_table_name, i_trg_args, i_table_attrs, _dest_table) f
into ret_code, ret_note;
if ret_code >= 300 then
return;
select count(*), min(t.tgname)
into _skip_trg_count, _skip_trg_name
from pg_catalog.pg_trigger t
- where t.tgrelid = londiste.find_table_oid(fq_table_name)
+ where t.tgrelid = londiste.find_table_oid(_dest_table)
and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
-- if no previous skip triggers, prefix name and add SKIP to args
if _skip_trg_count = 0 then
-- if not prefixed then rename
if position(_skip_prefix in _skip_trg_name) != 1 then
sql := 'alter trigger ' || _skip_trg_name
- || ' on ' || londiste.quote_fqname(fq_table_name)
+ || ' on ' || londiste.quote_fqname(_dest_table)
|| ' rename to ' || _skip_prefix || _skip_trg_name;
execute sql;
end if;
else
- select 405, 'Multiple SKIP triggers in table: ' || fq_table_name
+ select 405, 'Multiple SKIP triggers in table: ' || _dest_table
into ret_code, ret_note;
return;
end if;
-- create Ins/Upd/Del trigger if it does not exists already
perform 1 from pg_catalog.pg_trigger
- where tgrelid = londiste.find_table_oid(fq_table_name)
+ where tgrelid = londiste.find_table_oid(_dest_table)
and tgname = lg_name;
if not found then
-- create trigger
sql := 'create trigger ' || quote_ident(lg_name)
|| ' ' || lg_pos || ' ' || lg_event
- || ' on ' || londiste.quote_fqname(fq_table_name)
+ || ' on ' || londiste.quote_fqname(_dest_table)
|| ' for each row execute procedure '
|| lg_func || '(' || lg_args || _extra_args || ')';
execute sql;
if pgversion >= 80400 then
trunctrg_name := '_londiste_' || i_queue_name || '_truncate';
perform 1 from pg_catalog.pg_trigger
- where tgrelid = londiste.find_table_oid(fq_table_name)
+ where tgrelid = londiste.find_table_oid(_dest_table)
and tgname = trunctrg_name;
if not found then
sql := 'create trigger ' || quote_ident(trunctrg_name)
- || ' after truncate on ' || londiste.quote_fqname(fq_table_name)
+ || ' after truncate on ' || londiste.quote_fqname(_dest_table)
|| ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
|| _extra_args || ')';
execute sql;
if pgversion >= 90000 then
select tg.tgname into logtrg_previous
from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
- where r.oid = londiste.find_table_oid(fq_table_name)
+ where r.oid = londiste.find_table_oid(_dest_table)
and not tg.tgisinternal
and tg.tgname < lg_name::name
-- per-row AFTER trigger
else
select tg.tgname into logtrg_previous
from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
- where r.oid = londiste.find_table_oid(fq_table_name)
+ where r.oid = londiste.find_table_oid(_dest_table)
and not tg.tgisconstraint
and tg.tgname < lg_name::name
-- per-row AFTER trigger
end;
$$ language plpgsql;
+create or replace function londiste.local_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ in i_trg_args text[],
+ in i_table_attrs text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(4)
+--
+-- Register table on Londiste node.
+-- ----------------------------------------------------------------------
+begin
+ select f.ret_code, f.ret_note into ret_code, ret_note
+ from londiste.local_add_table(i_queue_name, i_table_name, i_trg_args, i_table_attrs, null) f;
+ return;
+end;
+$$ language plpgsql;
+
create or replace function londiste.local_add_table(
in i_queue_name text,
in i_table_name text,
-- skip_truncate = null,
-- table_attrs = null,
-- dropped_ddl = null,
- merge_state = null
+ merge_state = null,
+ dest_table = null
where queue_name = i_queue_name
and table_name = fq_table_name;
else
and n.nspname !~ '^pg_(toast|temp)'
and not exists (select 1 from londiste.table_info
where queue_name = i_queue_name
- and table_name = (n.nspname || '.' || r.relname))
+ and coalesce(dest_table, table_name) = (n.nspname || '.' || r.relname))
order by 1, 2
loop
return next;
cnt := cnt + 1;
end if;
+ -- table_info.dest_table
+ perform 1 from information_schema.columns
+ where table_schema = 'londiste'
+ and table_name = 'table_info'
+ and column_name = 'dest_table';
+ if not found then
+ alter table londiste.table_info add column dest_table text;
+ end if;
+
return cnt;
end;
$$ language plpgsql;
custom_snapshot text,
dropped_ddl text,
table_attrs text,
+ dest_table text,
primary key (queue_name, table_name),
foreign key (queue_name)