From 5dd59cd49795fdd5c281938c9336c1dc188f5324 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 13 Feb 2009 13:57:58 +0200 Subject: [PATCH] scripts/ update, mostly from 2.1-stable Some docsrting updates also. Dispatchers have not upgraded for cascading yet. --- scripts/bulk_loader.py | 52 ++++++++++++++++++++++--------------- scripts/catsql.py | 9 +++---- scripts/cube_dispatcher.py | 9 +++---- scripts/queue_mover.py | 5 ++-- scripts/queue_splitter.py | 5 ++-- scripts/scriptmgr.py | 31 +++++++++++++++------- scripts/skytools_upgrade.py | 7 +++++ scripts/table_dispatcher.py | 7 ++--- 8 files changed, 78 insertions(+), 47 deletions(-) diff --git a/scripts/bulk_loader.py b/scripts/bulk_loader.py index d8d6c871..20d37fc3 100755 --- a/scripts/bulk_loader.py +++ b/scripts/bulk_loader.py @@ -20,6 +20,8 @@ of minutes. """ import sys, os, pgq, skytools +from skytools import quote_ident, quote_fqident + ## several methods for applying data @@ -33,6 +35,8 @@ METH_MERGED = 2 # no good method for temp table check before 8.2 USE_LONGLIVED_TEMP_TABLES = False +AVOID_BIZGRES_BUG = 1 + def find_dist_fields(curs, fqtbl): if not skytools.exists_table(curs, "pg_catalog.mpp_distribution_policy"): return [] @@ -65,7 +69,7 @@ def exists_temp_table(curs, tbl): # and substr(n.nspname, 1, 8) = 'pg_temp_' # and t.relname = %s; #""" - curs.execute(q, [tempname]) + curs.execute(q, [tbl]) tmp = curs.fetchall() return len(tmp) > 0 @@ -190,6 +194,9 @@ class TableCache: self.final_del_list = del_list class BulkLoader(pgq.SerialConsumer): + """Bulk loader script.""" + load_method = METH_CORRECT + remap_tables = {} def __init__(self, args): pgq.SerialConsumer.__init__(self, "bulk_loader", "src_db", "dst_db", args) @@ -197,12 +204,12 @@ class BulkLoader(pgq.SerialConsumer): pgq.SerialConsumer.reload(self) self.load_method = self.cf.getint("load_method", METH_CORRECT) - if self.load_method not in (0,1,2): + if self.load_method not in (METH_CORRECT,METH_DELETE,METH_MERGED): raise Exception("bad load_method") self.remap_tables = {} - for map in self.cf.getlist("remap_tables", ''): - tmp = map.split(':') + for mapelem in self.cf.getlist("remap_tables", ''): + tmp = mapelem.split(':') tbl = tmp[0].strip() new = tmp[1].strip() self.remap_tables[tbl] = new @@ -267,32 +274,35 @@ class BulkLoader(pgq.SerialConsumer): # where expr must have pkey and dist fields klist = [] for pk in cache.pkey_list + extra_fields: - exp = "%s.%s = %s.%s" % (tbl, pk, temp, pk) + exp = "%s.%s = %s.%s" % (quote_fqident(tbl), quote_ident(pk), + quote_fqident(temp), quote_ident(pk)) klist.append(exp) whe_expr = " and ".join(klist) # create del sql - del_sql = "delete from only %s using %s where %s" % (tbl, temp, whe_expr) + del_sql = "delete from only %s using %s where %s" % ( + quote_fqident(tbl), quote_fqident(temp), whe_expr) # create update sql slist = [] key_fields = cache.pkey_list + extra_fields for col in cache.col_list: if col not in key_fields: - exp = "%s = %s.%s" % (col, temp, col) + exp = "%s = %s.%s" % (quote_ident(col), quote_fqident(temp), quote_ident(col)) slist.append(exp) upd_sql = "update only %s set %s from %s where %s" % ( - tbl, ", ".join(slist), temp, whe_expr) + quote_fqident(tbl), ", ".join(slist), quote_fqident(temp), whe_expr) # insert sql - colstr = ",".join(cache.col_list) - ins_sql = "insert into %s (%s) select %s from %s" % (tbl, colstr, colstr, temp) + colstr = ",".join([quote_ident(c) for c in cache.col_list]) + ins_sql = "insert into %s (%s) select %s from %s" % ( + quote_fqident(tbl), colstr, colstr, quote_fqident(temp)) # process deleted rows if len(del_list) > 0: self.log.info("Deleting %d rows from %s" % (len(del_list), tbl)) # delete old rows - q = "truncate %s" % temp + q = "truncate %s" % quote_fqident(temp) self.log.debug(q) curs.execute(q) # copy rows @@ -311,7 +321,7 @@ class BulkLoader(pgq.SerialConsumer): if len(upd_list) > 0: self.log.info("Updating %d rows in %s" % (len(upd_list), tbl)) # delete old rows - q = "truncate %s" % temp + q = "truncate %s" % quote_fqident(temp) self.log.debug(q) curs.execute(q) # copy rows @@ -336,15 +346,15 @@ class BulkLoader(pgq.SerialConsumer): self.log.warning("Update mismatch: expected=%s deleted=%d" % (real_update_count, curs.rowcount)) # insert into main table - if 0: - # does not work due bizgres bug - self.log.debug(ins_sql) - curs.execute(ins_sql) - self.log.debug(curs.statusmessage) - else: + if AVOID_BIZGRES_BUG: # copy again, into main table self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl)) skytools.magic_insert(curs, tbl, upd_list, col_list) + else: + # better way, but does not work due bizgres bug + self.log.debug(ins_sql) + curs.execute(ins_sql) + self.log.debug(curs.statusmessage) # process new rows if len(ins_list) > 0: @@ -353,10 +363,10 @@ class BulkLoader(pgq.SerialConsumer): # delete remaining rows if USE_LONGLIVED_TEMP_TABLES: - q = "truncate %s" % temp + q = "truncate %s" % quote_fqident(temp) else: # fscking problems with long-lived temp tables - q = "drop table %s" % temp + q = "drop table %s" % quote_fqident(temp) self.log.debug(q) curs.execute(q) @@ -375,7 +385,7 @@ class BulkLoader(pgq.SerialConsumer): arg = "on commit preserve rows" # create temp table for loading q = "create temp table %s (like %s) %s" % ( - tempname, tbl, arg) + quote_fqident(tempname), quote_fqident(tbl), arg) self.log.debug("Creating temp table: %s" % q) curs.execute(q) return tempname diff --git a/scripts/catsql.py b/scripts/catsql.py index 94fbacd8..80005851 100755 --- a/scripts/catsql.py +++ b/scripts/catsql.py @@ -34,7 +34,7 @@ Note: import sys, os, re, getopt def usage(x): - print "usage: catsql [--ndoc] FILE [FILE ...]" + print("usage: catsql [--ndoc] FILE [FILE ...]") sys.exit(x) # NDoc specific changes @@ -65,7 +65,6 @@ def proc_func(f, ln): ln = fix_func(ln) pre_list = [ln] comm_list = [] - n_comm = 0 while 1: ln = f.readline() if not ln: @@ -109,8 +108,8 @@ def cat_file(fn): elif cmd == "q": # quit sys.exit(0) elif cmd == "cd": # chdir - dir = m.group(2).strip() - os.chdir(dir) + cd_dir = m.group(2).strip() + os.chdir(cd_dir) else: # skip all others pass else: @@ -126,7 +125,7 @@ def main(): try: opts, args = getopt.gnu_getopt(sys.argv[1:], 'h', ['ndoc']) except getopt.error, d: - print d + print(str(d)) usage(1) for o, v in opts: if o == "-h": diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py index 11887134..55f4bc7c 100755 --- a/scripts/cube_dispatcher.py +++ b/scripts/cube_dispatcher.py @@ -1,7 +1,8 @@ #! /usr/bin/env python -# it accepts urlencoded rows for multiple tables from queue -# and insert them into actual tables, with partitioning on tick time +"""It accepts urlencoded rows for multiple tables from queue +and insert them into actual tables, with partitioning on tick time. +""" import sys, os, pgq, skytools @@ -11,6 +12,7 @@ alter table only _DEST_TABLE add primary key (_PKEY); """ class CubeDispatcher(pgq.SerialConsumer): + """Partition on tick time, multiple tables.""" def __init__(self, args): pgq.SerialConsumer.__init__(self, "cube_dispatcher", "src_db", "dst_db", args) @@ -40,9 +42,7 @@ class CubeDispatcher(pgq.SerialConsumer): return curs.fetchone()[0] def process_remote_batch(self, src_db, batch_id, ev_list, dst_db): - # actual processing - date_str = self.get_part_date(batch_id) self.dispatch(dst_db, ev_list, self.get_part_date(batch_id)) def dispatch(self, dst_db, ev_list, date_str): @@ -156,7 +156,6 @@ class CubeDispatcher(pgq.SerialConsumer): """ dcur = dcon.cursor() - exist_map = {} for tbl, inf in tables.items(): if skytools.exists_table(dcur, tbl): continue diff --git a/scripts/queue_mover.py b/scripts/queue_mover.py index 129728a3..bcfed8f0 100755 --- a/scripts/queue_mover.py +++ b/scripts/queue_mover.py @@ -1,10 +1,11 @@ #! /usr/bin/env python -# this script simply mover events from one queue to another +"""This script simply mover events from one queue to another.""" -import sys, os, pgq, skytools +import sys, os, pgq class QueueMover(pgq.SerialConsumer): + """Plain queue copy.""" def __init__(self, args): pgq.SerialConsumer.__init__(self, "queue_mover", "src_db", "dst_db", args) diff --git a/scripts/queue_splitter.py b/scripts/queue_splitter.py index c6714ca0..0a949dfc 100755 --- a/scripts/queue_splitter.py +++ b/scripts/queue_splitter.py @@ -1,10 +1,11 @@ #! /usr/bin/env python -# puts events into queue specified by field from 'queue_field' config parameter +"""Puts events into queue specified by field from 'queue_field' config parameter""" -import sys, os, pgq, skytools +import sys, pgq class QueueSplitter(pgq.SerialConsumer): + """Split events from one queue into several.""" def __init__(self, args): pgq.SerialConsumer.__init__(self, "queue_splitter", "src_db", "dst_db", args) diff --git a/scripts/scriptmgr.py b/scripts/scriptmgr.py index 3f589c8f..34d437fd 100755 --- a/scripts/scriptmgr.py +++ b/scripts/scriptmgr.py @@ -26,6 +26,11 @@ def job_sort_cmp(j1, j2): else: return 0 class ScriptMgr(skytools.DBScript): + svc_list = [] + svc_map = {} + config_list = [] + job_map = {} + job_list = [] def init_optparse(self, p = None): p = skytools.DBScript.init_optparse(self, p) p.add_option("-a", "--all", action="store_true", help="apply command to all jobs") @@ -94,7 +99,7 @@ class ScriptMgr(skytools.DBScript): 'args': svc['args'], 'service': svc['service'], 'job_name': cf.get('job_name'), - 'pidfile': cf.getfile('pidfile'), + 'pidfile': cf.getfile('pidfile', ''), } self.job_list.append(job) self.job_map[job['job_name']] = job @@ -103,20 +108,22 @@ class ScriptMgr(skytools.DBScript): for job in self.job_list: os.chdir(job['cwd']) cf = skytools.Config(job['service'], job['config']) - pidfile = cf.getfile('pidfile') + pidfile = cf.getfile('pidfile', '') name = job['job_name'] svc = job['service'] if job['disabled']: name += " (disabled)" - if os.path.isfile(pidfile): - print " OK [%s] %s" % (svc, name) + if not pidfile: + print(" pidfile? [%s] %s" % (svc, name)) + elif os.path.isfile(pidfile): + print(" OK [%s] %s" % (svc, name)) else: - print " STOPPED [%s] %s" % (svc, name) + print(" STOPPED [%s] %s" % (svc, name)) def cmd_info(self): for job in self.job_list: - print job + print(job) def cmd_start(self, job_name): if job_name not in self.job_map: @@ -129,6 +136,9 @@ class ScriptMgr(skytools.DBScript): self.log.info('Starting %s' % job_name) os.chdir(job['cwd']) pidfile = job['pidfile'] + if not pidfile: + self.log.warning("No pidfile for %s cannot launch") + return 0 if os.path.isfile(pidfile): self.log.warning("Script %s seems running" % job_name) return 0 @@ -166,6 +176,9 @@ class ScriptMgr(skytools.DBScript): def signal_job(self, job, sig): os.chdir(job['cwd']) pidfile = job['pidfile'] + if not pidfile: + self.log.warning("No pidfile for %s (%s)" % (job['job_name'], job['config'])) + return if os.path.isfile(pidfile): pid = int(open(pidfile).read()) try: @@ -182,7 +195,7 @@ class ScriptMgr(skytools.DBScript): self.load_jobs() if len(self.args) < 2: - print "need command" + print("need command") sys.exit(1) jobs = self.args[2:] @@ -201,7 +214,7 @@ class ScriptMgr(skytools.DBScript): return if len(jobs) == 0: - print "no jobs given?" + print("no jobs given?") sys.exit(1) if cmd == "start": @@ -223,7 +236,7 @@ class ScriptMgr(skytools.DBScript): for n in jobs: self.cmd_reload(n) else: - print "unknown command:", cmd + print("unknown command: " + cmd) sys.exit(1) if __name__ == '__main__': diff --git a/scripts/skytools_upgrade.py b/scripts/skytools_upgrade.py index 69439863..b78afb31 100755 --- a/scripts/skytools_upgrade.py +++ b/scripts/skytools_upgrade.py @@ -1,5 +1,7 @@ #! /usr/bin/env python +"""Simple upgrade script for versioned schemas.""" + import sys, os, re, skytools ver_rx = r"(\d+)([.](\d+)([.](\d+))?)?" @@ -16,6 +18,11 @@ version_list = [ ['pgq_ext', '2.1.6', 'v2.1.6_pgq_ext.sql', None], ['londiste', '2.1.6', 'v2.1.6_londiste.sql', None], + + ['pgq', '2.1.7', 'v2.1.7_pgq_core.sql', None], + ['londiste', '2.1.7', 'v2.1.7_londiste.sql', None], + + ['pgq', '2.1.8', 'v2.1.8_pgq_core.sql', None], ] def parse_ver(ver): diff --git a/scripts/table_dispatcher.py b/scripts/table_dispatcher.py index 2b645937..5b01370e 100755 --- a/scripts/table_dispatcher.py +++ b/scripts/table_dispatcher.py @@ -1,7 +1,8 @@ #! /usr/bin/env python -# it loads urlencoded rows for one trable from queue and inserts -# them into actual tables, with optional partitioning +"""It loads urlencoded rows for one trable from queue and inserts +them into actual tables, with optional partitioning. +""" import sys, os, pgq, skytools @@ -9,6 +10,7 @@ DEST_TABLE = "_DEST_TABLE" SCHEMA_TABLE = "_SCHEMA_TABLE" class TableDispatcher(pgq.SerialConsumer): + """Single-table partitioner.""" def __init__(self, args): pgq.SerialConsumer.__init__(self, "table_dispatcher", "src_db", "dst_db", args) @@ -95,7 +97,6 @@ class TableDispatcher(pgq.SerialConsumer): """ dcur = dcon.cursor() - exist_map = {} for tbl in tables.keys(): if not skytools.exists_table(dcur, tbl): if not self.part_template: -- 2.39.5