londiste: sweeping change to postpone log string formatting
authormartinko <gamato@users.sf.net>
Thu, 21 Feb 2013 15:09:47 +0000 (16:09 +0100)
committermartinko <gamato@users.sf.net>
Thu, 21 Feb 2013 15:09:47 +0000 (16:09 +0100)
python/londiste/compare.py
python/londiste/handlers/applyfn.py
python/londiste/handlers/bulk.py
python/londiste/handlers/dispatch.py
python/londiste/handlers/part.py
python/londiste/playback.py
python/londiste/repair.py
python/londiste/setup.py
python/londiste/syncer.py
python/londiste/table_copy.py

index 83dac2e4a7615e6f54dbe6d2133a92c6a1e3cef6..3f74862f4a2d590e9aff64ce8f16da5e9d38314e 100644 (file)
@@ -27,7 +27,7 @@ class Comparator(Syncer):
         dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
         src_where = dst_where
 
-        self.log.info('Counting %s' % dst_tbl)
+        self.log.info('Counting %s', dst_tbl)
 
         # get common cols
         cols = self.calc_cols(src_curs, src_tbl, dst_curs, dst_tbl)
@@ -61,22 +61,22 @@ class Comparator(Syncer):
             f += ", checksum=%(chksum)s"
         f = self.cf.get('compare_fmt', f)
 
-        self.log.debug("srcdb: " + src_q)
+        self.log.debug("srcdb: %s", src_q)
         src_curs.execute(src_q)
         src_row = src_curs.fetchone()
         src_str = f % src_row
-        self.log.info("srcdb: %s" % src_str)
+        self.log.info("srcdb: %s", src_str)
         src_db.commit()
 
-        self.log.debug("dstdb: " + dst_q)
+        self.log.debug("dstdb: %s", dst_q)
         dst_curs.execute(dst_q)
         dst_row = dst_curs.fetchone()
         dst_str = f % dst_row
-        self.log.info("dstdb: %s" % dst_str)
+        self.log.info("dstdb: %s", dst_str)
         dst_db.commit()
 
         if src_str != dst_str:
-            self.log.warning("%s: Results do not match!" % dst_tbl)
+            self.log.warning("%s: Results do not match!", dst_tbl)
             return 1
         return 0
 
index b7b1173cf9de237653f832e772cb17e017621e61..cbbf603bcc00e0b393c6baab330d61a05cdfa017 100644 (file)
@@ -34,7 +34,7 @@ class ApplyFuncHandler(BaseHandler):
         qfn = skytools.quote_fqident(fn)
         qargs = [skytools.quote_literal(a) for a in args]
         sql = "select %s(%s);" % (qfn, ', '.join(qargs))
-        self.log.debug('applyfn.sql: %s' % sql)
+        self.log.debug('applyfn.sql: %s', sql)
         sql_queue_func(sql, qfunc_arg)
 
 #------------------------------------------------------------------------------
index 0c0167ac319d21e3f1a206b470a00636cdcb6a16..e8b9104bee734345cfce97e3674f0107d920a7c6 100644 (file)
@@ -82,7 +82,7 @@ class BulkLoader(BaseHandler):
         if not self.method in (0,1,2):
             raise Exception('unknown method: %s' % self.method)
 
-        self.log.debug('bulk_init(%s), method=%d' % (repr(args), self.method))
+        self.log.debug('bulk_init(%r), method=%d', args, self.method)
 
     def reset(self):
         self.pkey_ev_map = {}
@@ -98,7 +98,7 @@ class BulkLoader(BaseHandler):
         op = ev.ev_type[0]
         if op not in 'IUD':
             raise Exception('Unknown event type: '+ev.ev_type)
-        self.log.debug('bulk.process_event: %s/%s' % (ev.ev_type, ev.ev_data))
+        self.log.debug('bulk.process_event: %s/%s', ev.ev_type, ev.ev_data)
         # pkey_list = ev.ev_type[2:].split(',')
         data = skytools.db_urldecode(ev.ev_data)
 
@@ -184,8 +184,8 @@ class BulkLoader(BaseHandler):
 
         real_update_count = len(upd_list)
 
-        self.log.debug("bulk_flush: %s  (I/U/D = %d/%d/%d)" % (
-                       self.table_name, len(ins_list), len(upd_list), len(del_list)))
+        self.log.debug("bulk_flush: %s  (I/U/D = %d/%d/%d)",
+                self.table_name, len(ins_list), len(upd_list), len(del_list))
 
         # hack to unbroke stuff
         if self.method == METH_MERGED:
@@ -200,8 +200,8 @@ class BulkLoader(BaseHandler):
         for fld in self.dist_fields:
             if fld not in key_fields:
                 key_fields.append(fld)
-        self.log.debug("PKey fields: %s  Dist fields: %s" % (
-                       ",".join(self.pkey_list), ",".join(self.dist_fields)))
+        self.log.debug("PKey fields: %s  Dist fields: %s",
+                       ",".join(self.pkey_list), ",".join(self.dist_fields))
 
         # create temp table
         temp, qtemp = self.create_temp_table(curs)
@@ -241,67 +241,67 @@ class BulkLoader(BaseHandler):
 
         # process deleted rows
         if len(del_list) > 0:
-            self.log.debug("bulk: Deleting %d rows from %s" % (len(del_list), tbl))
+            self.log.debug("bulk: Deleting %d rows from %s", len(del_list), tbl)
             # delete old rows
             q = "truncate %s" % qtemp
-            self.log.debug('bulk: %s' % q)
+            self.log.debug('bulk: %s', q)
             curs.execute(q)
             # copy rows
-            self.log.debug("bulk: COPY %d rows into %s" % (len(del_list), temp))
+            self.log.debug("bulk: COPY %d rows into %s", len(del_list), temp)
             skytools.magic_insert(curs, qtemp, del_list, col_list, quoted_table=1)
             # delete rows
-            self.log.debug('bulk: ' + del_sql)
+            self.log.debug('bulk: %s', del_sql)
             curs.execute(del_sql)
-            self.log.debug("bulk: %s - %d" % (curs.statusmessage, curs.rowcount))
+            self.log.debug("bulk: %s - %d", curs.statusmessage, curs.rowcount)
             if len(del_list) != curs.rowcount:
-                self.log.warning("Delete mismatch: expected=%s deleted=%d"
-                        % (len(del_list), curs.rowcount))
+                self.log.warning("Delete mismatch: expected=%s deleted=%d",
+                        len(del_list), curs.rowcount)
             temp_used = True
 
         # process updated rows
         if len(upd_list) > 0:
-            self.log.debug("bulk: Updating %d rows in %s" % (len(upd_list), tbl))
+            self.log.debug("bulk: Updating %d rows in %s", len(upd_list), tbl)
             # delete old rows
             q = "truncate %s" % qtemp
-            self.log.debug('bulk: ' + q)
+            self.log.debug('bulk: %s', q)
             curs.execute(q)
             # copy rows
-            self.log.debug("bulk: COPY %d rows into %s" % (len(upd_list), temp))
+            self.log.debug("bulk: COPY %d rows into %s", len(upd_list), temp)
             skytools.magic_insert(curs, qtemp, upd_list, col_list, quoted_table=1)
             temp_used = True
             if self.method == METH_CORRECT:
                 # update main table
-                self.log.debug('bulk: ' + upd_sql)
+                self.log.debug('bulk: %s', upd_sql)
                 curs.execute(upd_sql)
-                self.log.debug("bulk: %s - %d" % (curs.statusmessage, curs.rowcount))
+                self.log.debug("bulk: %s - %d", curs.statusmessage, curs.rowcount)
                 # check count
                 if len(upd_list) != curs.rowcount:
-                    self.log.warning("Update mismatch: expected=%s updated=%d"
-                            % (len(upd_list), curs.rowcount))
+                    self.log.warning("Update mismatch: expected=%s updated=%d",
+                            len(upd_list), curs.rowcount)
             else:
                 # delete from main table
-                self.log.debug('bulk: ' + del_sql)
+                self.log.debug('bulk: %s', del_sql)
                 curs.execute(del_sql)
-                self.log.debug('bulk: ' + curs.statusmessage)
+                self.log.debug('bulk: %s', curs.statusmessage)
                 # check count
                 if real_update_count != curs.rowcount:
-                    self.log.warning("bulk: Update mismatch: expected=%s deleted=%d"
-                            % (real_update_count, curs.rowcount))
+                    self.log.warning("bulk: Update mismatch: expected=%s deleted=%d",
+                            real_update_count, curs.rowcount)
                 # insert into main table
                 if AVOID_BIZGRES_BUG:
                     # copy again, into main table
-                    self.log.debug("bulk: COPY %d rows into %s" % (len(upd_list), tbl))
+                    self.log.debug("bulk: COPY %d rows into %s", len(upd_list), tbl)
                     skytools.magic_insert(curs, qtbl, upd_list, col_list, quoted_table=1)
                 else:
                     # better way, but does not work due bizgres bug
-                    self.log.debug('bulk: ' + ins_sql)
+                    self.log.debug('bulk: %s', ins_sql)
                     curs.execute(ins_sql)
-                    self.log.debug('bulk: ' + curs.statusmessage)
+                    self.log.debug('bulk: %s', curs.statusmessage)
 
         # process new rows
         if len(ins_list) > 0:
-            self.log.debug("bulk: Inserting %d rows into %s" % (len(ins_list), tbl))
-            self.log.debug("bulk: COPY %d rows into %s" % (len(ins_list), tbl))
+            self.log.debug("bulk: Inserting %d rows into %s", len(ins_list), tbl)
+            self.log.debug("bulk: COPY %d rows into %s", len(ins_list), tbl)
             skytools.magic_insert(curs, qtbl, ins_list, col_list, quoted_table=1)
 
         # delete remaining rows
@@ -311,7 +311,7 @@ class BulkLoader(BaseHandler):
             else:
                 # fscking problems with long-lived temp tables
                 q = "drop table %s" % qtemp
-            self.log.debug('bulk: ' + q)
+            self.log.debug('bulk: %s', q)
             curs.execute(q)
 
         self.reset()
@@ -326,19 +326,19 @@ class BulkLoader(BaseHandler):
         # check if exists
         if USE_REAL_TABLE:
             if skytools.exists_table(curs, tempname):
-                self.log.debug("bulk: Using existing real table %s" % tempname)
+                self.log.debug("bulk: Using existing real table %s", tempname)
                 return tempname, quote_fqident(tempname)
 
             # create non-temp table
             q = "create table %s (like %s)" % (
                         quote_fqident(tempname),
                         quote_fqident(self.dest_table))
-            self.log.debug("bulk: Creating real table: %s" % q)
+            self.log.debug("bulk: Creating real table: %s", q)
             curs.execute(q)
             return tempname, quote_fqident(tempname)
         elif USE_LONGLIVED_TEMP_TABLES:
             if skytools.exists_temp_table(curs, tempname):
-                self.log.debug("bulk: Using existing temp table %s" % tempname)
+                self.log.debug("bulk: Using existing temp table %s", tempname)
                 return tempname, quote_ident(tempname)
 
         # bizgres crashes on delete rows
@@ -347,7 +347,7 @@ class BulkLoader(BaseHandler):
         # create temp table for loading
         q = "create temp table %s (like %s) %s" % (
                 quote_ident(tempname), quote_fqident(self.dest_table), arg)
-        self.log.debug("bulk: Creating temp table: %s" % q)
+        self.log.debug("bulk: Creating temp table: %s", q)
         curs.execute(q)
         return tempname, quote_ident(tempname)
 
index 1a13c90ad9d2196766a01f04ad719cde9fa65603..758034c7ed33b1f2decd00b50b0a99de423f0d77 100644 (file)
@@ -316,10 +316,9 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
 
     def logexec(self, curs, sql):
         """Logs and executes sql statement"""
-        self.log.debug('exec: %s' % sql)
+        self.log.debug('exec: %s', sql)
         curs.execute(sql)
-        self.log.debug('msg: %s, rows: %s' % (
-            curs.statusmessage, curs.rowcount))
+        self.log.debug('msg: %s, rows: %s', curs.statusmessage, curs.rowcount)
 
     # create sql parts
 
@@ -403,15 +402,15 @@ class BulkLoader(BaseBulkTempLoader):
         cnt = len(data)
         if (cnt == 0):
             return
-        self.log.debug("bulk: Deleting %d rows from %s" % (cnt, self.table))
+        self.log.debug("bulk: Deleting %d rows from %s", cnt, self.table)
         # copy rows to temp
         self.bulk_insert(curs, data)
         # delete rows using temp
         self.delete(curs)
         # check if right amount of rows deleted (only in direct mode)
         if self.conf.table_mode == 'direct' and cnt != curs.rowcount:
-            self.log.warning("%s: Delete mismatch: expected=%s deleted=%d"
-                    % (self.table, cnt, curs.rowcount))
+            self.log.warning("%s: Delete mismatch: expected=%s deleted=%d",
+                    self.table, cnt, curs.rowcount)
 
     def process_update(self, curs, op_map):
         """Process update list"""
@@ -424,7 +423,7 @@ class BulkLoader(BaseBulkTempLoader):
         cnt = len(data)
         if (cnt == 0):
             return
-        self.log.debug("bulk: Updating %d rows in %s" % (cnt, self.table))
+        self.log.debug("bulk: Updating %d rows in %s", cnt, self.table)
         # copy rows to temp
         self.bulk_insert(curs, data)
         if self.method == METH_CORRECT:
@@ -432,15 +431,15 @@ class BulkLoader(BaseBulkTempLoader):
             self.update(curs)
             # check count (only in direct mode)
             if self.conf.table_mode == 'direct' and cnt != curs.rowcount:
-                self.log.warning("%s: Update mismatch: expected=%s updated=%d"
-                        % (self.table, cnt, curs.rowcount))
+                self.log.warning("%s: Update mismatch: expected=%s updated=%d",
+                        self.table, cnt, curs.rowcount)
         else:
             # delete from main table using temp
             self.delete(curs)
             # check count (only in direct mode)
             if self.conf.table_mode == 'direct' and real_cnt != curs.rowcount:
-                self.log.warning("%s: Update mismatch: expected=%s deleted=%d"
-                        % (self.table, real_cnt, curs.rowcount))
+                self.log.warning("%s: Update mismatch: expected=%s deleted=%d",
+                        self.table, real_cnt, curs.rowcount)
             # insert into main table
             if AVOID_BIZGRES_BUG:
                 # copy again, into main table
@@ -457,19 +456,19 @@ class BulkLoader(BaseBulkTempLoader):
         # merged method loads inserts together with updates
         if (cnt == 0) or (self.method == METH_MERGED):
             return
-        self.log.debug("bulk: Inserting %d rows into %s" % (cnt, self.table))
+        self.log.debug("bulk: Inserting %d rows into %s", cnt, self.table)
         # copy into target table (no temp used)
         self.bulk_insert(curs, data, table = self.qtable)
 
     def bulk_flush(self, curs, op_map):
-        self.log.debug("bulk_flush: %s  (I/U/D = %d/%d/%d)" % (
-            self.table, len(op_map['I']), len(op_map['U']), len(op_map['D'])))
+        self.log.debug("bulk_flush: %s  (I/U/D = %d/%d/%d)", self.table,
+                len(op_map['I']), len(op_map['U']), len(op_map['D']))
 
         # fetch distribution fields
         if self.dist_fields is None:
             self.dist_fields = self.find_dist_fields(curs)
-            self.log.debug("Key fields: %s  Dist fields: %s" % (
-                           ",".join(self.pkeys), ",".join(self.dist_fields)))
+            self.log.debug("Key fields: %s  Dist fields: %s",
+                           ",".join(self.pkeys), ",".join(self.dist_fields))
             # add them to key
             for key in self.dist_fields:
                 if key not in self.keys:
@@ -505,7 +504,7 @@ class BulkLoader(BaseBulkTempLoader):
         """
         if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE:
             if self.temp_present:
-                self.log.debug("bulk: Using existing temp table %s" % self.temp)
+                self.log.debug("bulk: Using existing temp table %s", self.temp)
                 return False
         self.create(curs)
         self.temp_present = True
@@ -525,7 +524,7 @@ class BulkLoader(BaseBulkTempLoader):
             # truncate when re-using existing table
             if not self.create_temp(curs):
                 self.truncate(curs)
-        self.log.debug("bulk: COPY %d rows into %s" % (len(data), table))
+        self.log.debug("bulk: COPY %d rows into %s", len(data), table)
         skytools.magic_insert(curs, table, data, self.fields,
                               quoted_table = True)
         if _use_temp and self.run_analyze:
@@ -634,8 +633,7 @@ class Dispatcher(BaseHandler):
         BaseHandler.__init__(self, table_name, args, dest_table)
 
         # show args
-        self.log.debug("dispatch.init: table_name=%r, args=%r" % \
-                       (table_name, args))
+        self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
         self.batch_info = None
         self.dst_curs = None
         self.pkeys = None
@@ -784,8 +782,7 @@ class Dispatcher(BaseHandler):
         # process only operations specified
         if not op in self.conf.event_types:
             return
-        self.log.debug('dispatch.process_event: %s/%s' % (
-            ev.ev_type, ev.ev_data))
+        self.log.debug('dispatch.process_event: %s/%s', ev.ev_type, ev.ev_data)
         if self.pkeys is None:
             self.pkeys = self.filter_pkeys(pkeys.split(','))
         data = self.filter_data(data)
@@ -886,7 +883,7 @@ class Dispatcher(BaseHandler):
                 have_func = skytools.exists_function(curs, PART_FUNC_OLD, len(PART_FUNC_ARGS))
 
             if have_func:
-                self.log.debug('check_part.exec: func: %s, args: %s' % (pfcall, vals))
+                self.log.debug('check_part.exec: func: %s, args: %s', pfcall, vals)
                 curs.execute(pfcall, vals)
             else:
                 #
@@ -896,12 +893,12 @@ class Dispatcher(BaseHandler):
                 # - check constraints
                 # - inheritance
                 #
-                self.log.debug('part func %s not found, cloning table' % self.conf.part_func)
+                self.log.debug('part func %s not found, cloning table', self.conf.part_func)
                 struct = TableStruct(curs, self.dest_table)
                 struct.create(curs, T_ALL, dst)
 
         exec_with_vals(self.conf.post_part)
-        self.log.info("Created table: %s" % dst)
+        self.log.info("Created table: %s", dst)
 
         if self.conf.retention_period:
             self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
@@ -913,8 +910,8 @@ class Dispatcher(BaseHandler):
         func = RETENTION_FUNC
         args = [parent_table, retention_period, partition_period]
         sql = "select " + func + " (%s, %s, %s)"
-        self.log.debug("func: %s, args: %s" % (func, args))
-        curs.execute (sql, args)
+        self.log.debug("func: %s, args: %s", func, args)
+        curs.execute(sql, args)
         res = []
         for row in curs.fetchall():
             res.append(row[0])
index bdabb3e621857074763be440b6a6be1825631906..247256e467d1711b4f7305f041f1d8f20cdad70c 100644 (file)
@@ -72,8 +72,8 @@ class PartHandler(TableHandler):
         """Filter event by hash in extra3, apply only local part."""
         if ev.extra3:
             meta = skytools.db_urldecode(ev.extra3)
-            self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d' %\
-                           (int(meta['hash']), self.max_part, self.local_part))
+            self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d',
+                           int(meta['hash']), self.max_part, self.local_part)
             if (int(meta['hash']) & self.max_part) != self.local_part:
                 self.log.debug('part.process_event: not my event')
                 return
@@ -84,7 +84,7 @@ class PartHandler(TableHandler):
         """Prepare the where condition for copy and replay filtering"""
         self.load_part_info(dst_curs)
         w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
-        self.log.debug('part: copy_condition=%s' % w)
+        self.log.debug('part: copy_condition=%s', w)
         return w
 
     def load_part_info(self, curs):
index 4129cbe1bf5be421c70afb71a603d7710da2292d..4f509dcf1f24158fbcc8856339abd574c2e59537 100644 (file)
@@ -103,7 +103,7 @@ class TableState(object):
         """Set snapshot."""
         if self.str_snapshot == str_snapshot:
             return
-        self.log.debug("%s: change_snapshot to %s" % (self.name, str_snapshot))
+        self.log.debug("%s: change_snapshot to %s", self.name, str_snapshot)
         self.str_snapshot = str_snapshot
         if str_snapshot:
             self.from_snapshot = skytools.Snapshot(str_snapshot)
@@ -122,8 +122,7 @@ class TableState(object):
         self.state = state
         self.sync_tick_id = tick_id
         self.changed = 1
-        self.log.debug("%s: change_state to %s" % (self.name,
-                                    self.render_state()))
+        self.log.debug("%s: change_state to %s", self.name, self.render_state())
 
     def render_state(self):
         """Make a string to be stored in db."""
@@ -172,8 +171,8 @@ class TableState(object):
     def loaded_state(self, row):
         """Update object with info from db."""
 
-        self.log.debug("loaded_state: %s: %s / %s" % (
-                       self.name, row['merge_state'], row['custom_snapshot']))
+        self.log.debug("loaded_state: %s: %s / %s",
+                       self.name, row['merge_state'], row['custom_snapshot'])
         self.change_snapshot(row['custom_snapshot'], 0)
         self.state = self.parse_state(row['merge_state'])
         self.changed = 0
@@ -290,7 +289,7 @@ class Replicator(CascadedWorker):
         # compare: sql to use
         #compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t
         # workaround for hashtext change between 8.3 and 8.4
-        #compare_sql = select count(1) as cnt, sum(('x'||substr(md5(t.*::text),1,16))::bit(64)::bigint) as chksum from only _TABLE_ t     
+        #compare_sql = select count(1) as cnt, sum(('x'||substr(md5(t.*::text),1,16))::bit(64)::bigint) as chksum from only _TABLE_ t
         #compare_fmt = %(cnt)d rows, checksum=%(chksum)s
     """
 
@@ -485,11 +484,11 @@ class Replicator(CascadedWorker):
                 else:
                     # regular provider is used
                     if t.name not in pmap:
-                        self.log.warning("Table %s not available on provider" % t.name)
+                        self.log.warning("Table %s not available on provider", t.name)
                         continue
                     pt = pmap[t.name]
                     if pt.state != TABLE_OK: # or pt.custom_snapshot: # FIXME: does snapsnot matter?
-                        self.log.info("Table %s not OK on provider, waiting" % t.name)
+                        self.log.info("Table %s not OK on provider, waiting", t.name)
                         continue
 
                 # dont allow more copies than configured
@@ -519,7 +518,7 @@ class Replicator(CascadedWorker):
 
         # somebody may have done remove-table in the meantime
         if self.copy_table_name not in self.table_map:
-            self.log.error("copy_sync: lost table: %s" % self.copy_table_name)
+            self.log.error("copy_sync: lost table: %s", self.copy_table_name)
             return SYNC_EXIT
 
         # This operates on single table
@@ -537,8 +536,8 @@ class Replicator(CascadedWorker):
             elif self.cur_tick < t.sync_tick_id:
                 return SYNC_OK
             else:
-                self.log.error("copy_sync: cur_tick=%d sync_tick=%d" % (
-                                self.cur_tick, t.sync_tick_id))
+                self.log.error("copy_sync: cur_tick=%d sync_tick=%d",
+                                self.cur_tick, t.sync_tick_id)
                 raise Exception('Invalid table state')
         elif t.state == TABLE_WANNA_SYNC:
             # wait for main thread to react
@@ -597,7 +596,7 @@ class Replicator(CascadedWorker):
     def process_remote_event(self, src_curs, dst_curs, ev):
         """handle one event"""
 
-        self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s" % (ev.id, ev.type, ev.data, ev.extra1))
+        self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s", ev.id, ev.type, ev.data, ev.extra1)
 
         # set current_event only if processing them one-by-one
         if self.work_state < 0:
@@ -824,8 +823,8 @@ class Replicator(CascadedWorker):
             if not t.changed:
                 continue
             merge_state = t.render_state()
-            self.log.info("storing state of %s: copy:%d new_state:%s" % (
-                            t.name, self.copy_thread, merge_state))
+            self.log.info("storing state of %s: copy:%d new_state:%s",
+                            t.name, self.copy_thread, merge_state)
             q = "select londiste.local_set_table_state(%s, %s, %s, %s)"
             curs.execute(q, [self.set_name,
                              t.name, t.str_snapshot, merge_state])
@@ -838,8 +837,8 @@ class Replicator(CascadedWorker):
         self.save_table_state(dst_db.cursor())
         dst_db.commit()
 
-        self.log.info("Table %s status changed to '%s'" % (
-                      tbl.name, tbl.render_state()))
+        self.log.info("Table %s status changed to '%s'",
+                      tbl.name, tbl.render_state())
 
     def get_tables_in_state(self, state):
         "get all tables with specific state"
@@ -878,11 +877,11 @@ class Replicator(CascadedWorker):
             time.sleep(2)
 
         # launch and wait for daemonization result
-        self.log.debug("Launch args: "+repr(cmd))
+        self.log.debug("Launch args: %r", cmd)
         res = os.spawnvp(os.P_WAIT, script, cmd)
-        self.log.debug("Launch result: "+repr(res))
+        self.log.debug("Launch result: %r", res)
         if res != 0:
-            self.log.error("Failed to launch copy process, result=%d" % res)
+            self.log.error("Failed to launch copy process, result=%d", res)
 
     def sync_database_encodings(self, src_db, dst_db):
         """Make sure client_encoding is same on both side."""
@@ -979,4 +978,3 @@ class Replicator(CascadedWorker):
 if __name__ == '__main__':
     script = Replicator(sys.argv[1:])
     script.start()
-
index d33e6d62485b8aa8110d9b21d3e68cf28b9218f2..46ad067b5832f2ef5441bf11ff31bfb3a2d78cd7 100644 (file)
@@ -49,7 +49,7 @@ class Repairer(Syncer):
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
 
-        self.log.info('Checking %s' % dst_tbl)
+        self.log.info('Checking %s', dst_tbl)
 
         self.common_fields = []
         self.fq_common_fields = []
@@ -62,16 +62,16 @@ class Repairer(Syncer):
         dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
         src_where = dst_where
 
-        self.log.info("Dumping src table: %s" % src_tbl)
+        self.log.info("Dumping src table: %s", src_tbl)
         self.dump_table(src_tbl, src_curs, dump_src, src_where)
         src_db.commit()
-        self.log.info("Dumping dst table: %s" % dst_tbl)
+        self.log.info("Dumping dst table: %s", dst_tbl)
         self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where)
         dst_db.commit()
-        
-        self.log.info("Sorting src table: %s" % dump_src)
+
+        self.log.info("Sorting src table: %s", dump_src)
         self.do_sort(dump_src, dump_src + '.sorted')
-        self.log.info("Sorting dst table: %s" % dump_dst)
+        self.log.info("Sorting dst table: %s", dump_dst)
         self.do_sort(dump_dst, dump_dst + '.sorted')
 
         self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted")
@@ -127,7 +127,7 @@ class Repairer(Syncer):
         self.fq_common_fields = fqlist
 
         cols = ",".join(fqlist)
-        self.log.debug("using columns: %s" % cols)
+        self.log.debug("using columns: %s", cols)
 
     def dump_table(self, tbl, curs, fn, whr):
         """Dump table to disk."""
@@ -135,12 +135,12 @@ class Repairer(Syncer):
         if len(whr) == 0:
             whr = 'true'
         q = "copy (SELECT %s FROM %s WHERE %s) to stdout" % (cols, skytools.quote_fqident(tbl), whr)
-        self.log.debug("Query: %s" % q)
+        self.log.debug("Query: %s", q)
         f = open(fn, "w", 64*1024)
         curs.copy_expert(q, f)
         size = f.tell()
         f.close()
-        self.log.info('%s: Got %d bytes' % (tbl, size))
+        self.log.info('%s: Got %d bytes', tbl, size)
 
     def get_row(self, ln):
         """Parse a row into dict."""
@@ -154,7 +154,7 @@ class Repairer(Syncer):
 
     def dump_compare(self, tbl, src_fn, dst_fn):
         """Dump + compare single table."""
-        self.log.info("Comparing dumps: %s" % tbl)
+        self.log.info("Comparing dumps: %s", tbl)
         self.cnt_insert = 0
         self.cnt_update = 0
         self.cnt_delete = 0
@@ -197,10 +197,10 @@ class Repairer(Syncer):
                 dst_ln = f2.readline()
                 if dst_ln: self.total_dst += 1
 
-        self.log.info("finished %s: src: %d rows, dst: %d rows,"\
-                    " missed: %d inserts, %d updates, %d deletes" % (
+        self.log.info("finished %s: src: %d rows, dst: %d rows,"
+                " missed: %d inserts, %d updates, %d deletes",
                 tbl, self.total_src, self.total_dst,
-                self.cnt_insert, self.cnt_update, self.cnt_delete))
+                self.cnt_insert, self.cnt_update, self.cnt_delete)
 
     def got_missed_insert(self, tbl, src_row):
         """Create sql for missed insert."""
@@ -248,7 +248,7 @@ class Repairer(Syncer):
 
     def show_fix(self, tbl, q, desc):
         """Print/write/apply repair sql."""
-        self.log.info("missed %s: %s" % (desc, q))
+        self.log.info("missed %s: %s", desc, q)
         if self.apply_curs:
             self.apply_curs.execute(q)
         else:
@@ -300,7 +300,7 @@ class Repairer(Syncer):
 
     def cmp_keys(self, src_row, dst_row):
         """Compare primary keys of the rows.
-        
+
         Returns 1 if src > dst, -1 if src < dst and 0 if src == dst"""
 
         # None means table is done.  tag it larger than any existing row.
@@ -319,4 +319,3 @@ class Repairer(Syncer):
             elif v1 > v2:
                 return 1
         return 0
-
index 0cc4401eef2e46e9934f2ae1fa35e3636e460f7a..c54e26444fb9608c37f1fcc10b28b6278231a5d3 100644 (file)
@@ -140,7 +140,7 @@ class LondisteSetup(CascadeAdmin):
             for tbl in args:
                 tbl = skytools.fq_name(tbl)
                 if (tbl in src_tbls) and not src_tbls[tbl]['local']:
-                    self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl)
+                    self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
                     problems = True
             if problems:
                 self.log.error("Problems, canceling operation")
@@ -189,12 +189,12 @@ class LondisteSetup(CascadeAdmin):
 
         if create_flags:
             if tbl_exists:
-                self.log.info('Table %s already exist, not touching' % desc)
+                self.log.info('Table %s already exist, not touching', desc)
             else:
                 src_dest_table = src_tbls[tbl]['dest_table']
                 if not skytools.exists_table(src_curs, src_dest_table):
                     # table not present on provider - nowhere to get the DDL from
-                    self.log.warning('Table %s missing on provider, cannot create, skipping' % desc)
+                    self.log.warning('Table %s missing on provider, cannot create, skipping', desc)
                     return
                 schema = skytools.fq_name_parts(dest_table)[0]
                 if not skytools.exists_schema(dst_curs, schema):
@@ -276,7 +276,7 @@ class LondisteSetup(CascadeAdmin):
         for tbl in src_tbls.keys():
             q = "select * from londiste.global_add_table(%s, %s)"
             if tbl not in dst_tbls:
-                self.log.info("Table %s info missing from subscriber, adding" % tbl)
+                self.log.info("Table %s info missing from subscriber, adding", tbl)
                 self.exec_cmd(dst_curs, q, [self.set_name, tbl])
                 dst_tbls[tbl] = {'local': False, 'dest_table': tbl}
         for tbl in dst_tbls.keys():
@@ -317,7 +317,7 @@ class LondisteSetup(CascadeAdmin):
             " where table_name = %s and local"
         curs.execute(q, [self.set_name, tbl])
         if curs.rowcount == 0:
-            self.log.error("Table %s not found on this node" % tbl)
+            self.log.error("Table %s not found on this node", tbl)
             sys.exit(1)
 
         attrs, dest_table = curs.fetchone()
@@ -382,17 +382,17 @@ class LondisteSetup(CascadeAdmin):
         seq_exists = skytools.exists_sequence(dst_curs, seq)
         if create_flags:
             if seq_exists:
-                self.log.info('Sequence %s already exist, not creating' % seq)
+                self.log.info('Sequence %s already exist, not creating', seq)
             else:
                 if not skytools.exists_sequence(src_curs, seq):
                     # sequence not present on provider - nowhere to get the DDL from
-                    self.log.warning('Sequence "%s" missing on provider, skipping' % seq)
+                    self.log.warning('Sequence "%s" missing on provider, skipping', seq)
                     return
                 s = skytools.SeqStruct(src_curs, seq)
                 src_db.commit()
                 s.create(dst_curs, create_flags, log = self.log)
         elif not seq_exists:
-            self.log.warning('Sequence "%s" missing on subscriber, use --create if necessary' % seq)
+            self.log.warning('Sequence "%s" missing on subscriber, use --create if necessary', seq)
             return
 
         q = "select * from londiste.local_add_seq(%s, %s)"
@@ -410,7 +410,7 @@ class LondisteSetup(CascadeAdmin):
         for seq in src_seqs.keys():
             q = "select * from londiste.global_update_seq(%s, %s, %s)"
             if seq not in dst_seqs:
-                self.log.info("Sequence %s info missing from subscriber, adding" % seq)
+                self.log.info("Sequence %s info missing from subscriber, adding", seq)
                 self.exec_cmd(dst_curs, q, [self.set_name, seq, src_seqs[seq]['last_value']])
                 tmp = src_seqs[seq].copy()
                 tmp['local'] = False
@@ -504,7 +504,7 @@ class LondisteSetup(CascadeAdmin):
             res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False)
             ret = res[0]['ret_code']
             if ret >= 300:
-                self.log.warning("Skipping execution of '%s'" % fname)
+                self.log.warning("Skipping execution of '%s'", fname)
                 continue
             if attrs.need_execute(curs, local_tables, local_seqs):
                 self.log.info("%s: executing sql", fname)
@@ -605,16 +605,16 @@ class LondisteSetup(CascadeAdmin):
                     res_list.append(a)
                     res_map[a] = 1
                 elif a in reverse_map:
-                    self.log.info("%s already processed" % a)
+                    self.log.info("%s already processed", a)
                 elif allow_nonexist:
                     res_list.append(a)
                     res_map[a] = 1
                 elif self.options.force:
-                    self.log.warning("%s not available, but --force is used" % a)
+                    self.log.warning("%s not available, but --force is used", a)
                     res_list.append(a)
                     res_map[a] = 1
                 else:
-                    self.log.warning("%s not available" % a)
+                    self.log.warning("%s not available", a)
                     err = 1
         if err:
             raise skytools.UsageError("Cannot proceed")
index befeaded9fee1f656eed88555be790dd0c72b47f..1713f6e9295a16e4008a24811909441f4bd9ca57 100644 (file)
@@ -145,11 +145,11 @@ class Syncer(skytools.DBScript):
         for tbl in tlist:
             tbl = skytools.fq_name(tbl)
             if not tbl in dst_tables:
-                self.log.warning('Table not subscribed: %s' % tbl)
+                self.log.warning('Table not subscribed: %s', tbl)
                 continue
             t2 = dst_tables[tbl]
             if t2.merge_state != 'ok':
-                self.log.warning('Table %s not synced yet, no point' % tbl)
+                self.log.warning('Table %s not synced yet, no point', tbl)
                 continue
 
             pnode, ploc, wname = find_copy_source(self, self.queue_name, tbl, pnode, ploc)
@@ -179,12 +179,12 @@ class Syncer(skytools.DBScript):
 
         src_tables, ignore = self.get_tables(src_db)
         if not tbl in src_tables:
-            self.log.warning('Table not available on provider: %s' % tbl)
+            self.log.warning('Table not available on provider: %s', tbl)
             return
         t1 = src_tables[tbl]
 
         if t1.merge_state != 'ok':
-            self.log.warning('Table %s not ready yet on provider' % tbl)
+            self.log.warning('Table %s not ready yet on provider', tbl)
             return
 
         #self.check_consumer(setup_db, dst_db)
@@ -231,10 +231,10 @@ class Syncer(skytools.DBScript):
         dst_curs = dst_db.cursor()
 
         if not skytools.exists_table(src_curs, src_tbl):
-            self.log.warning("Table %s does not exist on provider side" % src_tbl)
+            self.log.warning("Table %s does not exist on provider side", src_tbl)
             return
         if not skytools.exists_table(dst_curs, dst_tbl):
-            self.log.warning("Table %s does not exist on subscriber side" % dst_tbl)
+            self.log.warning("Table %s does not exist on subscriber side", dst_tbl)
             return
 
         # lock table against changes
@@ -273,14 +273,14 @@ class Syncer(skytools.DBScript):
         lock_curs = lock_db.cursor()
 
         # lock table in separate connection
-        self.log.info('Locking %s' % src_tbl)
+        self.log.info('Locking %s', src_tbl)
         lock_db.commit()
         self.set_lock_timeout(lock_curs)
         lock_time = time.time()
         lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(src_tbl))
 
         # now wait until consumer has updated target table until locking
-        self.log.info('Syncing %s' % dst_tbl)
+        self.log.info('Syncing %s', dst_tbl)
 
         # consumer must get futher than this tick
         tick_id = self.force_tick(setup_curs)
@@ -313,7 +313,7 @@ class Syncer(skytools.DBScript):
         self.old_worker_paused = self.pause_consumer(setup_curs, self.provider_info['worker_name'])
 
         lock_curs = lock_db.cursor()
-        self.log.info('Syncing %s' % dst_tbl)
+        self.log.info('Syncing %s', dst_tbl)
 
         # consumer must get futher than this tick
         tick_id = self.force_tick(setup_curs, False)
@@ -375,4 +375,3 @@ class Syncer(skytools.DBScript):
                 break
             time.sleep(0.5)
         return oldflag
-
index f325a7c2b2dd4a417af276248c53d68d8118d3ae..8e0262359ba109c625d23aef6bac1cbeb1acb5f5 100644 (file)
@@ -64,7 +64,7 @@ class CopyTable(Replicator):
             if tbl_stat.copy_role == 'wait-copy':
                 self.log.info('waiting for first partition to initialize copy')
             elif tbl_stat.max_parallel_copies_reached():
-                self.log.info('number of max parallel copies (%s) reached' %\
+                self.log.info('number of max parallel copies (%s) reached',
                                 tbl_stat.max_parallel_copy)
             else:
                 break
@@ -81,7 +81,7 @@ class CopyTable(Replicator):
             if pt.state == TABLE_OK:
                 break
 
-            self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
+            self.log.warning("table %s not in sync yet on provider, waiting", tbl_stat.name)
             time.sleep(10)
 
         src_real_table = pt.dest_table
@@ -102,7 +102,7 @@ class CopyTable(Replicator):
 
         self.sync_database_encodings(src_db, dst_db)
 
-        self.log.info("Starting full copy of %s" % tbl_stat.name)
+        self.log.info("Starting full copy of %s", tbl_stat.name)
 
         # just in case, drop all fkeys (in case "replay" was skipped)
         # !! this may commit, so must be done before anything else !!
@@ -124,14 +124,14 @@ class CopyTable(Replicator):
         common_cols = []
         for c in slist:
             if c not in dlist:
-                self.log.warning("Table %s column %s does not exist on subscriber"
-                                 % (tbl_stat.name, c))
+                self.log.warning("Table %s column %s does not exist on subscriber",
+                                 tbl_stat.name, c)
             else:
                 common_cols.append(c)
         for c in dlist:
             if c not in slist:
-                self.log.warning("Table %s column %s does not exist on provider"
-                                 % (tbl_stat.name, c))
+                self.log.warning("Table %s column %s does not exist on provider",
+                                 tbl_stat.name, c)
 
         # drop unnecessary stuff
         if cmode > 0:
@@ -140,9 +140,9 @@ class CopyTable(Replicator):
 
             # drop data
             if tbl_stat.table_attrs.get('skip_truncate'):
-                self.log.info("%s: skipping truncate" % tbl_stat.name)
+                self.log.info("%s: skipping truncate", tbl_stat.name)
             else:
-                self.log.info("%s: truncating" % tbl_stat.name)
+                self.log.info("%s: truncating", tbl_stat.name)
                 q = "truncate "
                 if dst_db.server_version >= 80400:
                     q += "only "
@@ -160,12 +160,12 @@ class CopyTable(Replicator):
                 tbl_stat.dropped_ddl = ddl
 
         # do truncate & copy
-        self.log.info("%s: start copy" % tbl_stat.name)
+        self.log.info("%s: start copy", tbl_stat.name)
         p = tbl_stat.get_plugin()
         stats = p.real_copy(src_real_table, src_curs, dst_curs, common_cols)
         if stats:
-            self.log.info("%s: copy finished: %d bytes, %d rows" % (
-                          tbl_stat.name, stats[0], stats[1]))
+            self.log.info("%s: copy finished: %d bytes, %d rows",
+                          tbl_stat.name, stats[0], stats[1])
 
         # get snapshot
         src_curs.execute("select txid_current_snapshot()")
@@ -269,4 +269,3 @@ class CopyTable(Replicator):
 if __name__ == '__main__':
     script = CopyTable(sys.argv[1:])
     script.start()
-