parallel copy process limit
authorEgon Valdmees <egon.valdmees@skype.net>
Fri, 5 Aug 2011 10:48:47 +0000 (13:48 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 16 Sep 2011 10:34:46 +0000 (13:34 +0300)
added max-parallel-copy londiste argument to specify max number of parallel copy processes

python/londiste.py
python/londiste/playback.py
python/londiste/setup.py
python/londiste/table_copy.py
sql/londiste/functions/londiste.get_table_list.sql
sql/londiste/functions/londiste.local_add_table.sql
sql/londiste/structure/tables.sql
upgrade/src/londiste.table_info.sql [new file with mode: 0644]

index 778b9f78b28858c8ea48f166642be55ed2015e39..0b938dbe6dea897bfcbb7293752abb8f44222f30 100755 (executable)
@@ -132,6 +132,8 @@ class Londiste(skytools.DBScript):
                 help="merge tables from all source queues", default=False)
         g.add_option("--no-merge", action="store_true",
                 help="don't merge tables from source queues", default=False)
+        g.add_option("--max-parallel-copy", type = "int",
+                help="max number of parallel copy processes")
 
         p.add_option_group(g)
 
index d1d3dc274fc052dbe1a69205666096b9830dd94a..03afe3e420b44b5240f0f9832c7581105dc45cc7 100644 (file)
@@ -25,6 +25,8 @@ SYNC_OK   = 0  # continue with batch
 SYNC_LOOP = 1  # sleep, try again
 SYNC_EXIT = 2  # nothing to do, exit skript
 
+MAX_PARALLEL_COPY = 8 # default number of allowed max parallel copy processes
+
 class Counter(object):
     """Counts table statuses."""
 
@@ -51,6 +53,7 @@ class Counter(object):
             elif t.state == TABLE_OK:
                 self.ok += 1
 
+
     def get_copy_count(self):
         return self.copy + self.catching_up + self.wanna_sync + self.do_sync
 
@@ -74,6 +77,10 @@ class TableState(object):
         self.plugin = None
         # except this
         self.changed = 0
+        # position in parallel copy work order
+        self.copy_pos = 0
+        # max number of parallel copy processesses allowed
+        self.max_parallel_copy = MAX_PARALLEL_COPY
 
     def forget(self):
         """Reset all info."""
@@ -87,6 +94,8 @@ class TableState(object):
         self.table_attrs = {}
         self.changed = 1
         self.plugin = None
+        self.copy_pos = 0
+        self.max_parallel_copy = MAX_PARALLEL_COPY
 
     def change_snapshot(self, str_snapshot, tag_changed = 1):
         """Set snapshot."""
@@ -175,10 +184,18 @@ class TableState(object):
         if row['merge_state'] == "?":
             self.changed = 1
 
+        self.copy_pos = int(row.get('copy_pos','0'))
+        self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy',
+                                                        self.max_parallel_copy))
+
         hstr = self.table_attrs.get('handlers', '') # compat
         hstr = self.table_attrs.get('handler', hstr)
         self.plugin = build_handler(self.name, hstr, self.log)
 
+    def max_parallel_copies_reached(self):
+        return self.max_parallel_copy and\
+                    self.copy_pos >= self.max_parallel_copy
+
     def interesting(self, ev, tick_id, copy_thread):
         """Check if table wants this event."""
 
@@ -210,7 +227,7 @@ class TableState(object):
 
     def gc_snapshot(self, copy_thread, prev_tick, cur_tick, no_lag):
         """Remove attached snapshot if possible.
-        
+
         If the event processing is in current moment. the snapshot
         is not needed beyond next batch.
 
@@ -318,7 +335,7 @@ class Replicator(CascadedWorker):
             self.code_check_done = 1
 
         self.sync_database_encodings(src_db, dst_db)
-        
+
         self.cur_tick = self.batch_info['tick_id']
         self.prev_tick = self.batch_info['prev_tick_id']
 
@@ -367,7 +384,7 @@ or (ev_extra1 in (%s)))
 
     def sync_tables(self, src_db, dst_db):
         """Table sync loop.
-        
+
         Calls appropriate handles, which is expected to
         return one of SYNC_* constants."""
 
@@ -395,7 +412,7 @@ or (ev_extra1 in (%s)))
             dst_db.commit()
             self.load_table_state(dst_db.cursor())
             dst_db.commit()
-    
+
     dsync_backup = None
     def sync_from_main_thread(self, cnt, src_db, dst_db):
         "Main thread sync logic."
@@ -403,7 +420,7 @@ or (ev_extra1 in (%s)))
         # This operates on all table, any amount can be in any state
 
         ret = SYNC_OK
-        
+
         if cnt.do_sync:
             # wait for copy thread to catch up
             ret = SYNC_LOOP
@@ -470,7 +487,7 @@ or (ev_extra1 in (%s)))
                 # there cannot be interesting events in current batch
                 # but maybe there's several tables, lets do them in one go
                 ret = SYNC_LOOP
-        
+
         return ret
 
 
@@ -506,7 +523,7 @@ or (ev_extra1 in (%s)))
         elif t.state == TABLE_CATCHING_UP:
 
             # partition merging
-            if t.copy_role == 'wait-replay':
+            if t.copy_role in ('wait-replay', 'lead'):
                 return SYNC_LOOP
 
             # is there more work?
@@ -566,14 +583,14 @@ or (ev_extra1 in (%s)))
         if not t or not t.interesting(ev, self.cur_tick, self.copy_thread):
             self.stat_increase('ignored_events')
             return
-        
+
         try:
             p = self.used_plugins[ev.extra1]
         except KeyError:
             p = t.get_plugin()
             self.used_plugins[ev.extra1] = p
             p.prepare_batch(self.batch_info, dst_curs)
-     
+
         p.process_event(ev, self.apply_sql, dst_curs)
 
     def handle_truncate_event(self, ev, dst_curs):
@@ -675,7 +692,7 @@ or (ev_extra1 in (%s)))
 
     def load_table_state(self, curs):
         """Load table state from database.
-        
+
         Todo: if all tables are OK, there is no need
         to load state on every batch.
         """
@@ -823,7 +840,7 @@ or (ev_extra1 in (%s)))
             q2 = "select londiste.restore_table_fkey(%(from_table)s, %(fkey_name)s)"
             dst_curs.execute(q2, row)
             dst_db.commit()
-    
+
     def drop_fkeys(self, dst_db, table_name):
         """Drop all foreign keys to and from this table.
 
@@ -839,7 +856,7 @@ or (ev_extra1 in (%s)))
             q2 = "select londiste.drop_table_fkey(%(from_table)s, %(fkey_name)s)"
             dst_curs.execute(q2, row)
             dst_db.commit()
-        
+
     def process_root_node(self, dst_db):
         """On root node send seq changes to queue."""
 
index 1bfd703925173141376f5aa5206ad55004a38568..6ec8ea602acfa360317784305d44751bb4b1b6d9 100644 (file)
@@ -69,6 +69,9 @@ class LondisteSetup(CascadeAdmin):
                     help="merge tables from all source queues", default=False)
         p.add_option("--no-merge", action="store_true",
                     help="don't merge tables from source queues", default=False)
+        p.add_option("--max-parallel-copy", type = "int",
+                    help="max number of parallel copy processes")
+
         return p
 
     def extra_init(self, node_type, node_db, provider_db):
@@ -183,19 +186,25 @@ class LondisteSetup(CascadeAdmin):
         if self.options.expect_sync:
             tgargs.append('expect_sync')
 
-        # actual table registration
-        q = "select * from londiste.local_add_table(%s, %s, %s)"
-        self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs])
-
         if not self.options.expect_sync:
             if self.options.skip_truncate:
                 attrs['skip_truncate'] = 1
             if self.options.copy_condition:
                 attrs['copy_condition'] = self.options.copy_condition
+
+        if self.options.max_parallel_copy:
+            attrs['max_parallel_copy'] = self.options.max_parallel_copy
+
+        args = [self.set_name, tbl, tgargs]
+
         if attrs:
-            enc_attrs = skytools.db_urlencode(attrs)
-            q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
-            self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
+            args.append(skytools.db_urlencode(attrs))
+
+        q = "select * from londiste.local_add_table(%s)" %\
+            ','.join(['%s']*len(args))
+
+        # actual table registration
+        self.exec_cmd(dst_curs, q, args)
         dst_db.commit()
 
     def handler_needs_table(self):
index 7422b5333aeb8aaed5fbc324a538b6c089c20102..257dbe60e4e1752133ee5c8a7444b836e61a4e0f 100644 (file)
@@ -56,8 +56,14 @@ class CopyTable(Replicator):
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
 
-        while tbl_stat.copy_role == 'wait-copy':
-            self.log.info('waiting for first partition to initialize copy')
+        while 1:
+            if tbl_stat.copy_role == 'wait-copy':
+                self.log.info('waiting for first partition to initialize copy')
+            elif tbl_stat.max_parallel_copies_reached():
+                self.log.info('number of max parallel copies (%s) reached' %\
+                                tbl_stat.max_parallel_copy)
+            else:
+                break
             time.sleep(10)
             tbl_stat = self.reload_table_stat(dst_curs, tbl_stat.name)
             dst_db.commit()
@@ -70,7 +76,7 @@ class CopyTable(Replicator):
             pt = pmap[tbl_stat.name]
             if pt.state == TABLE_OK:
                 break
-            
+
             self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
             time.sleep(10)
 
@@ -98,7 +104,7 @@ class CopyTable(Replicator):
         # find dst struct
         src_struct = TableStruct(src_curs, tbl_stat.name)
         dst_struct = TableStruct(dst_curs, tbl_stat.name)
-        
+
         # take common columns, warn on missing ones
         dlist = dst_struct.get_column_list()
         slist = src_struct.get_column_list()
@@ -154,6 +160,8 @@ class CopyTable(Replicator):
             dst_struct.create(dst_curs, objs, log = self.log)
         elif cmode == 2:
             dst_db.commit()
+            self.change_table_state(dst_db, tbl_stat, TABLE_CATCHING_UP)
+            # start waiting for other copy processes to finish
             while tbl_stat.copy_role == 'lead':
                 self.log.info('waiting for other partitions to finish copy')
                 time.sleep(10)
index 2f526eebfce6e0c8f27b6dbae94b9e1fb1ade392..d101f8616762c7f5916bd23109f82b4d7a003685 100644 (file)
@@ -7,8 +7,9 @@ create or replace function londiste.get_table_list(
     out custom_snapshot text,
     out table_attrs text,
     out dropped_ddl text,
-    out copy_role text)
-returns setof record as $$ 
+    out copy_role text,
+    out copy_pos int)
+returns setof record as $$
 -- ----------------------------------------------------------------------
 -- Function: londiste.get_table_list(1)
 --
@@ -25,6 +26,7 @@ returns setof record as $$
 --      table_attrs     - urlencoded dict of table attributes
 --      dropped_ddl     - partition combining: temp place to put DDL
 --      copy_role       - partition combining: how to handle copy
+--      copy_pos        - position in parallel copy working order
 --
 -- copy_role = lead:
 --      on copy start, drop indexes and store in dropped_ddl
@@ -42,12 +44,15 @@ declare
     n_parts     int4;
     n_done      int4;
     var_table_name text;
+    n_combined_queue text;
 begin
-    for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done in
+    for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done, n_combined_queue, copy_pos in
         select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl,
-               min(t2.queue_name) as _queue1,
-               count(t2.table_name) as _total,
-               count(nullif(t2.merge_state, 'in-copy')) as _done
+               min(case when t2.local then t2.queue_name else null end) as _queue1,
+               count(case when t2.local then t2.table_name else null end) as _total,
+               count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
+               min(n.combined_queue) as _combined_queue,
+               count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos
             from londiste.table_info t
             join pgq_node.node_info n on (n.queue_name = t.queue_name)
             left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
@@ -61,12 +66,13 @@ begin
         -- if the table is in middle of copy from multiple partitions,
         -- the copy processes need coordination
         copy_role := null;
+
         if q_part1 is not null then
             if i_queue_name = q_part1 then
                 -- lead
-                if merge_state = 'in-copy' then
+                if merge_state in ('in-copy', 'catching-up') then
                     -- show copy_role only if need to wait for others
-                    if n_done < n_parts - 1 then
+                    if n_done < n_parts then
                         copy_role := 'lead';
                     end if;
                 end if;
@@ -93,8 +99,8 @@ begin
         end if;
         table_name:=var_table_name;
         return next;
-    end loop; 
+    end loop;
     return;
-end; 
+end;
 $$ language plpgsql strict stable;
 
index 3bfcfebfc048cf449150f7a2170beef708b7264f..78ffad6f606958062232cc1b19d6525d94db5944 100644 (file)
@@ -2,6 +2,7 @@ create or replace function londiste.local_add_table(
     in i_queue_name     text,
     in i_table_name     text,
     in i_trg_args       text[],
+    in i_table_attrs    text default null,
     out ret_code        int4,
     out ret_note        text)
 as $$
@@ -211,7 +212,8 @@ begin
 
     update londiste.table_info
         set local = true,
-            merge_state = new_state
+            merge_state = new_state,
+            table_attrs = coalesce(i_table_attrs, table_attrs)
         where queue_name = i_queue_name and table_name = fq_table_name;
     if not found then
         raise exception 'lost table: %', fq_table_name;
@@ -252,7 +254,8 @@ begin
 
             update londiste.table_info
                set local = true,
-                   merge_state = new_state
+                   merge_state = new_state,
+                   table_attrs = coalesce(i_table_attrs, table_attrs)
                where queue_name = _queue_name and table_name = fq_table_name;
             if not found then
                 raise exception 'lost table: %', fq_table_name;
@@ -266,7 +269,7 @@ begin
                                             fq_table_name,
                                             'skip_truncate=1');
     end if;
-
+    
     -------- TRIGGER LOGIC
 
     -- new trigger
index 020acaa6ed454c251dc0606f0ad7c6b3bb598855..ddfd880addc45b24698d4e6e193cfa33a53db64b 100644 (file)
@@ -99,7 +99,7 @@ create table londiste.table_info (
     foreign key (queue_name)
       references pgq_node.node_info (queue_name)
       on delete cascade,
-    check (dropped_ddl is null or merge_state = 'in-copy')
+    check (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'))
 );
 
 
diff --git a/upgrade/src/londiste.table_info.sql b/upgrade/src/londiste.table_info.sql
new file mode 100644 (file)
index 0000000..b199bfd
--- /dev/null
@@ -0,0 +1,2 @@
+ALTER TABLE londiste.table_info DROP CONSTRAINT table_info_check;
+ALTER TABLE londiste.table_info ADD CHECK (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'));