added max-parallel-copy londiste argument to specify max number of parallel copy processes
help="merge tables from all source queues", default=False)
g.add_option("--no-merge", action="store_true",
help="don't merge tables from source queues", default=False)
+ g.add_option("--max-parallel-copy", type = "int",
+ help="max number of parallel copy processes")
p.add_option_group(g)
SYNC_LOOP = 1 # sleep, try again
SYNC_EXIT = 2 # nothing to do, exit skript
+MAX_PARALLEL_COPY = 8 # default number of allowed max parallel copy processes
+
class Counter(object):
"""Counts table statuses."""
elif t.state == TABLE_OK:
self.ok += 1
+
def get_copy_count(self):
return self.copy + self.catching_up + self.wanna_sync + self.do_sync
self.plugin = None
# except this
self.changed = 0
+ # position in parallel copy work order
+ self.copy_pos = 0
+ # max number of parallel copy processesses allowed
+ self.max_parallel_copy = MAX_PARALLEL_COPY
def forget(self):
"""Reset all info."""
self.table_attrs = {}
self.changed = 1
self.plugin = None
+ self.copy_pos = 0
+ self.max_parallel_copy = MAX_PARALLEL_COPY
def change_snapshot(self, str_snapshot, tag_changed = 1):
"""Set snapshot."""
if row['merge_state'] == "?":
self.changed = 1
+ self.copy_pos = int(row.get('copy_pos','0'))
+ self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy',
+ self.max_parallel_copy))
+
hstr = self.table_attrs.get('handlers', '') # compat
hstr = self.table_attrs.get('handler', hstr)
self.plugin = build_handler(self.name, hstr, self.log)
+ def max_parallel_copies_reached(self):
+ return self.max_parallel_copy and\
+ self.copy_pos >= self.max_parallel_copy
+
def interesting(self, ev, tick_id, copy_thread):
"""Check if table wants this event."""
def gc_snapshot(self, copy_thread, prev_tick, cur_tick, no_lag):
"""Remove attached snapshot if possible.
-
+
If the event processing is in current moment. the snapshot
is not needed beyond next batch.
self.code_check_done = 1
self.sync_database_encodings(src_db, dst_db)
-
+
self.cur_tick = self.batch_info['tick_id']
self.prev_tick = self.batch_info['prev_tick_id']
def sync_tables(self, src_db, dst_db):
"""Table sync loop.
-
+
Calls appropriate handles, which is expected to
return one of SYNC_* constants."""
dst_db.commit()
self.load_table_state(dst_db.cursor())
dst_db.commit()
-
+
dsync_backup = None
def sync_from_main_thread(self, cnt, src_db, dst_db):
"Main thread sync logic."
# This operates on all table, any amount can be in any state
ret = SYNC_OK
-
+
if cnt.do_sync:
# wait for copy thread to catch up
ret = SYNC_LOOP
# there cannot be interesting events in current batch
# but maybe there's several tables, lets do them in one go
ret = SYNC_LOOP
-
+
return ret
elif t.state == TABLE_CATCHING_UP:
# partition merging
- if t.copy_role == 'wait-replay':
+ if t.copy_role in ('wait-replay', 'lead'):
return SYNC_LOOP
# is there more work?
if not t or not t.interesting(ev, self.cur_tick, self.copy_thread):
self.stat_increase('ignored_events')
return
-
+
try:
p = self.used_plugins[ev.extra1]
except KeyError:
p = t.get_plugin()
self.used_plugins[ev.extra1] = p
p.prepare_batch(self.batch_info, dst_curs)
-
+
p.process_event(ev, self.apply_sql, dst_curs)
def handle_truncate_event(self, ev, dst_curs):
def load_table_state(self, curs):
"""Load table state from database.
-
+
Todo: if all tables are OK, there is no need
to load state on every batch.
"""
q2 = "select londiste.restore_table_fkey(%(from_table)s, %(fkey_name)s)"
dst_curs.execute(q2, row)
dst_db.commit()
-
+
def drop_fkeys(self, dst_db, table_name):
"""Drop all foreign keys to and from this table.
q2 = "select londiste.drop_table_fkey(%(from_table)s, %(fkey_name)s)"
dst_curs.execute(q2, row)
dst_db.commit()
-
+
def process_root_node(self, dst_db):
"""On root node send seq changes to queue."""
help="merge tables from all source queues", default=False)
p.add_option("--no-merge", action="store_true",
help="don't merge tables from source queues", default=False)
+ p.add_option("--max-parallel-copy", type = "int",
+ help="max number of parallel copy processes")
+
return p
def extra_init(self, node_type, node_db, provider_db):
if self.options.expect_sync:
tgargs.append('expect_sync')
- # actual table registration
- q = "select * from londiste.local_add_table(%s, %s, %s)"
- self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs])
-
if not self.options.expect_sync:
if self.options.skip_truncate:
attrs['skip_truncate'] = 1
if self.options.copy_condition:
attrs['copy_condition'] = self.options.copy_condition
+
+ if self.options.max_parallel_copy:
+ attrs['max_parallel_copy'] = self.options.max_parallel_copy
+
+ args = [self.set_name, tbl, tgargs]
+
if attrs:
- enc_attrs = skytools.db_urlencode(attrs)
- q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
- self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
+ args.append(skytools.db_urlencode(attrs))
+
+ q = "select * from londiste.local_add_table(%s)" %\
+ ','.join(['%s']*len(args))
+
+ # actual table registration
+ self.exec_cmd(dst_curs, q, args)
dst_db.commit()
def handler_needs_table(self):
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- while tbl_stat.copy_role == 'wait-copy':
- self.log.info('waiting for first partition to initialize copy')
+ while 1:
+ if tbl_stat.copy_role == 'wait-copy':
+ self.log.info('waiting for first partition to initialize copy')
+ elif tbl_stat.max_parallel_copies_reached():
+ self.log.info('number of max parallel copies (%s) reached' %\
+ tbl_stat.max_parallel_copy)
+ else:
+ break
time.sleep(10)
tbl_stat = self.reload_table_stat(dst_curs, tbl_stat.name)
dst_db.commit()
pt = pmap[tbl_stat.name]
if pt.state == TABLE_OK:
break
-
+
self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
time.sleep(10)
# find dst struct
src_struct = TableStruct(src_curs, tbl_stat.name)
dst_struct = TableStruct(dst_curs, tbl_stat.name)
-
+
# take common columns, warn on missing ones
dlist = dst_struct.get_column_list()
slist = src_struct.get_column_list()
dst_struct.create(dst_curs, objs, log = self.log)
elif cmode == 2:
dst_db.commit()
+ self.change_table_state(dst_db, tbl_stat, TABLE_CATCHING_UP)
+ # start waiting for other copy processes to finish
while tbl_stat.copy_role == 'lead':
self.log.info('waiting for other partitions to finish copy')
time.sleep(10)
out custom_snapshot text,
out table_attrs text,
out dropped_ddl text,
- out copy_role text)
-returns setof record as $$
+ out copy_role text,
+ out copy_pos int)
+returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: londiste.get_table_list(1)
--
-- table_attrs - urlencoded dict of table attributes
-- dropped_ddl - partition combining: temp place to put DDL
-- copy_role - partition combining: how to handle copy
+-- copy_pos - position in parallel copy working order
--
-- copy_role = lead:
-- on copy start, drop indexes and store in dropped_ddl
n_parts int4;
n_done int4;
var_table_name text;
+ n_combined_queue text;
begin
- for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done in
+ for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done, n_combined_queue, copy_pos in
select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl,
- min(t2.queue_name) as _queue1,
- count(t2.table_name) as _total,
- count(nullif(t2.merge_state, 'in-copy')) as _done
+ min(case when t2.local then t2.queue_name else null end) as _queue1,
+ count(case when t2.local then t2.table_name else null end) as _total,
+ count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
+ min(n.combined_queue) as _combined_queue,
+ count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos
from londiste.table_info t
join pgq_node.node_info n on (n.queue_name = t.queue_name)
left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
-- if the table is in middle of copy from multiple partitions,
-- the copy processes need coordination
copy_role := null;
+
if q_part1 is not null then
if i_queue_name = q_part1 then
-- lead
- if merge_state = 'in-copy' then
+ if merge_state in ('in-copy', 'catching-up') then
-- show copy_role only if need to wait for others
- if n_done < n_parts - 1 then
+ if n_done < n_parts then
copy_role := 'lead';
end if;
end if;
end if;
table_name:=var_table_name;
return next;
- end loop;
+ end loop;
return;
-end;
+end;
$$ language plpgsql strict stable;
in i_queue_name text,
in i_table_name text,
in i_trg_args text[],
+ in i_table_attrs text default null,
out ret_code int4,
out ret_note text)
as $$
update londiste.table_info
set local = true,
- merge_state = new_state
+ merge_state = new_state,
+ table_attrs = coalesce(i_table_attrs, table_attrs)
where queue_name = i_queue_name and table_name = fq_table_name;
if not found then
raise exception 'lost table: %', fq_table_name;
update londiste.table_info
set local = true,
- merge_state = new_state
+ merge_state = new_state,
+ table_attrs = coalesce(i_table_attrs, table_attrs)
where queue_name = _queue_name and table_name = fq_table_name;
if not found then
raise exception 'lost table: %', fq_table_name;
fq_table_name,
'skip_truncate=1');
end if;
-
+
-------- TRIGGER LOGIC
-- new trigger
foreign key (queue_name)
references pgq_node.node_info (queue_name)
on delete cascade,
- check (dropped_ddl is null or merge_state = 'in-copy')
+ check (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'))
);
--- /dev/null
+ALTER TABLE londiste.table_info DROP CONSTRAINT table_info_check;
+ALTER TABLE londiste.table_info ADD CHECK (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'));