From 12ce61de6b7dac644cc3474bc9b83382031117ab Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 17 Feb 2009 17:10:33 +0200 Subject: [PATCH] Combined queue_loader Merge table_dispatcher, cube_dispatcher and bulk_loader together --- scripts/queue_loader.ini | 125 +++++ scripts/queue_loader.py | 531 +++++++++++++++++++ setup.py | 2 +- tests/queue_loader/conf/loader_dst.ini | 100 ++++ tests/queue_loader/conf/loader_src.ini | 8 + tests/queue_loader/conf/setadm_loaderq.ini | 11 + tests/queue_loader/conf/ticker_loadersrc.ini | 6 + tests/queue_loader/init.sh | 14 + tests/queue_loader/regen.sh | 66 +++ tests/queue_loader/send.data.sql | 21 + tests/queue_loader/tables.sql | 38 ++ tests/queue_loader/triggers.sql | 16 + tests/queue_loader/zcheck.sh | 4 + tests/queue_loader/zstop.sh | 14 + 14 files changed, 955 insertions(+), 1 deletion(-) create mode 100644 scripts/queue_loader.ini create mode 100755 scripts/queue_loader.py create mode 100644 tests/queue_loader/conf/loader_dst.ini create mode 100644 tests/queue_loader/conf/loader_src.ini create mode 100644 tests/queue_loader/conf/setadm_loaderq.ini create mode 100644 tests/queue_loader/conf/ticker_loadersrc.ini create mode 100755 tests/queue_loader/init.sh create mode 100755 tests/queue_loader/regen.sh create mode 100644 tests/queue_loader/send.data.sql create mode 100644 tests/queue_loader/tables.sql create mode 100644 tests/queue_loader/triggers.sql create mode 100755 tests/queue_loader/zcheck.sh create mode 100755 tests/queue_loader/zstop.sh diff --git a/scripts/queue_loader.ini b/scripts/queue_loader.ini new file mode 100644 index 00000000..35e226fd --- /dev/null +++ b/scripts/queue_loader.ini @@ -0,0 +1,125 @@ + +[mega_dispatcher] + +job_name = +logfile = +pidfile = + +db = + +rename_tables = + +[DEFAULT] + +# fields - which fields to send through +#fields = col1, col2, col3:renamed3 +#fields = * + +# table_mode - how to handle a table +# +# ignore - ignore this table +# direct - update table directly +# split - split data into partitions +#table_mode = ignore + +# split_mode - how to split, if requested +# +# by-batch-time: use batch time for splitting +# by-event-time: use event time for splitting +# by-date-field:fld - use fld for splitting +#split_mode = by-batch-time + +# split_part - partition name format +# +# %(table_name)s %(year)s %(month)s %(day)s %(hour)s +#split_part = %(table_name)s_%(year)s_%(month)s_%(day)s + +# split_part_template - How to create new partition tables +# +# Available fields: +# %(part)s +# %(parent)s +# %(pkey)s +# +### Non-inherited partitions +#split_part_template = +# create table %%(part)s (like %%(parent)s); +# alter table only %%(part)s add primary key (%%(pkey)s); +# +### Inherited partitions +#split_part_template = +# create table %%(part)s () inherits (%%(parent)s); +# alter table only %%(part)s add primary key (%%(pkey)s); + + +# row_mode - How to apply the events +# +# plain - each event creates SQL statement to run +# keep_latest - change updates to DELETE + INSERT +# keep_all - change updates to inserts, ignore deletes +# bulk - instead of statement-per-row, do bulk updates +#row_mode = plain + + +# bulk_mode - How to do the bulk update +# +# correct - inserts as COPY into table, +# update as COPY into temp table and single UPDATE from there +# delete as COPY into temp table and single DELETE from there +# delete - as 'correct', but do update as DELETE + COPY +# merged - as 'delete', but merge insert rows with update rows +#bulk_mode=correct + +[table public.foo] +mode = +create_sql = + +# partition by date field +# partition by batch time + +# apply all +# keep_all +# apply latest + +cube: + +table: + +bulk: + + + + + + +[udata_dispatcher] +job_name = test_move + +src_db = dbname=sourcedb_test +dst_db = dbname=dataminedb_test + +pgq_queue_name = OrderLog + +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid + +# where to put data. when partitioning, will be used as base name +dest_table = orders + +# date field with will be used for partitioning +# special value: _EVTIME - event creation time +part_column = start_date + +#fields = * +#fields = id, name +#fields = id:newid, name, bar:baz + + +# template used for creating partition tables +# _DEST_TABLE +part_template = + create table _DEST_TABLE () inherits (orders); + alter table only _DEST_TABLE add constraint _DEST_TABLE_pkey primary key (id); + grant select on _DEST_TABLE to group reporting; + + diff --git a/scripts/queue_loader.py b/scripts/queue_loader.py new file mode 100755 index 00000000..c14ecd01 --- /dev/null +++ b/scripts/queue_loader.py @@ -0,0 +1,531 @@ +#! /usr/bin/env python + +"""Load data from queue into tables, with optional partitioning.""" + +import sys, time, skytools + +from pgq.cascade.worker import CascadedWorker + +from skytools import quote_ident, quote_fqident, UsageError + +# todo: auto table detect + +# BulkLoader load method +METH_CORRECT = 0 +METH_DELETE = 1 +METH_MERGED = 2 +LOAD_METHOD = METH_CORRECT +# BulkLoader hacks +AVOID_BIZGRES_BUG = 0 +USE_LONGLIVED_TEMP_TABLES = True + +class BasicLoader: + """Apply events as-is.""" + def __init__(self, table_name, parent_name, log): + self.table_name = table_name + self.parent_name = parent_name + self.sql_list = [] + self.log = log + + def add_row(self, op, data, pkey_list): + if op == 'I': + sql = skytools.mk_insert_sql(data, self.table_name, pkey_list) + elif op == 'U': + sql = skytools.mk_update_sql(data, self.table_name, pkey_list) + elif op == 'D': + sql = skytools.mk_delete_sql(data, self.table_name, pkey_list) + else: + raise Exception('bad operation: '+op) + self.sql_list.append(sql) + + def flush(self, curs): + if len(self.sql_list) > 0: + curs.execute("\n".join(self.sql_list)) + self.sql_list = [] + +class KeepLatestLoader(BasicLoader): + """Keep latest row version. + + Updates are changed to delete + insert, deletes are ignored. + Makes sense only for partitioned tables. + """ + def add_row(self, op, data, pkey_list): + if op == 'U': + BasicLoader.add_row(self, 'D', data, pkey_list) + BasicLoader.add_row(self, 'I', data, pkey_list) + elif op == 'I': + BasicLoader.add_row(self, 'I', data, pkey_list) + else: + pass + + +class KeepAllLoader(BasicLoader): + """Keep all row versions. + + Updates are changed to inserts, deletes are ignored. + Makes sense only for partitioned tables. + """ + def add_row(self, op, data, pkey_list): + if op == 'U': + op = 'I' + elif op == 'D': + return + BasicLoader.add_row(self, op, data, pkey_list) + + +class BulkEvent(object): + """Helper class for BulkLoader to store relevant data.""" + __slots__ = ('op', 'data', 'pk_data') + def __init__(self, op, data, pk_data): + self.op = op + self.data = data + self.pk_data = pk_data + +class BulkLoader(BasicLoader): + """Instead of statement-per event, load all data with one + big COPY, UPDATE or DELETE statement. + """ + fake_seq = 0 + def __init__(self, table_name, parent_name, log): + """Init per-batch table data cache.""" + BasicLoader.__init__(self, table_name, parent_name, log) + + self.pkey_list = None + self.dist_fields = None + self.col_list = None + + self.ev_list = [] + self.pkey_ev_map = {} + + def reset(self): + self.ev_list = [] + self.pkey_ev_map = {} + + def add_row(self, op, data, pkey_list): + """Store new event.""" + + # get pkey value + if self.pkey_list is None: + self.pkey_list = pkey_list + if len(self.pkey_list) > 0: + pk_data = (data[k] for k in self.pkey_list) + elif op == 'I': + # fake pkey, just to get them spread out + pk_data = self.fake_seq + self.fake_seq += 1 + else: + raise Exception('non-pk tables not supported: %s' % self.table_name) + + # get full column list, detect added columns + if not self.col_list: + self.col_list = data.keys() + elif self.col_list != data.keys(): + # ^ supposedly python guarantees same order in keys() + self.col_list = data.keys() + + # add to list + ev = BulkEvent(op, data, pk_data) + self.ev_list.append(ev) + + # keep all versions of row data + if ev.pk_data in self.pkey_ev_map: + self.pkey_ev_map[ev.pk_data].append(ev) + else: + self.pkey_ev_map[ev.pk_data] = [ev] + + def prepare_data(self): + """Got all data, prepare for insertion.""" + + del_list = [] + ins_list = [] + upd_list = [] + for ev_list in self.pkey_ev_map.values(): + # rewrite list of I/U/D events to + # optional DELETE and optional INSERT/COPY command + exists_before = -1 + exists_after = 1 + for ev in ev_list: + if ev.op == "I": + if exists_before < 0: + exists_before = 0 + exists_after = 1 + elif ev.op == "U": + if exists_before < 0: + exists_before = 1 + #exists_after = 1 # this shouldnt be needed + elif ev.op == "D": + if exists_before < 0: + exists_before = 1 + exists_after = 0 + else: + raise Exception('unknown event type: %s' % ev.op) + + # skip short-lived rows + if exists_before == 0 and exists_after == 0: + continue + + # take last event + ev = ev_list[-1] + + # generate needed commands + if exists_before and exists_after: + upd_list.append(ev.data) + elif exists_before: + del_list.append(ev.data) + elif exists_after: + ins_list.append(ev.data) + + return ins_list, upd_list, del_list + + def flush(self, curs): + ins_list, upd_list, del_list = self.prepare_data() + + # reorder cols + col_list = self.pkey_list[:] + for k in self.col_list: + if k not in self.pkey_list: + col_list.append(k) + + real_update_count = len(upd_list) + + #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % ( + # tbl, len(ins_list), len(upd_list), len(del_list))) + + # hack to unbroke stuff + if LOAD_METHOD == METH_MERGED: + upd_list += ins_list + ins_list = [] + + # fetch distribution fields + if self.dist_fields is None: + self.dist_fields = self.find_dist_fields(curs) + + key_fields = self.pkey_list[:] + for fld in self.dist_fields: + if fld not in key_fields: + key_fields.append(fld) + #self.log.debug("PKey fields: %s Extra fields: %s" % ( + # ",".join(cache.pkey_list), ",".join(extra_fields))) + + # create temp table + temp = self.create_temp_table(curs) + tbl = self.table_name + + # where expr must have pkey and dist fields + klist = [] + for pk in key_fields: + exp = "%s.%s = %s.%s" % (quote_fqident(tbl), quote_ident(pk), + quote_fqident(temp), quote_ident(pk)) + klist.append(exp) + whe_expr = " and ".join(klist) + + # create del sql + del_sql = "delete from only %s using %s where %s" % ( + quote_fqident(tbl), quote_fqident(temp), whe_expr) + + # create update sql + slist = [] + for col in col_list: + if col not in key_fields: + exp = "%s = %s.%s" % (quote_ident(col), quote_fqident(temp), quote_ident(col)) + slist.append(exp) + upd_sql = "update only %s set %s from %s where %s" % ( + quote_fqident(tbl), ", ".join(slist), quote_fqident(temp), whe_expr) + + # insert sql + colstr = ",".join([quote_ident(c) for c in col_list]) + ins_sql = "insert into %s (%s) select %s from %s" % ( + quote_fqident(tbl), colstr, colstr, quote_fqident(temp)) + + temp_used = False + + # process deleted rows + if len(del_list) > 0: + #self.log.info("Deleting %d rows from %s" % (len(del_list), tbl)) + # delete old rows + q = "truncate %s" % quote_fqident(temp) + self.log.debug(q) + curs.execute(q) + # copy rows + self.log.debug("COPY %d rows into %s" % (len(del_list), temp)) + skytools.magic_insert(curs, temp, del_list, col_list) + # delete rows + self.log.debug(del_sql) + curs.execute(del_sql) + self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount)) + self.log.debug(curs.statusmessage) + if len(del_list) != curs.rowcount: + self.log.warning("Delete mismatch: expected=%s updated=%d" + % (len(del_list), curs.rowcount)) + temp_used = True + + # process updated rows + if len(upd_list) > 0: + #self.log.info("Updating %d rows in %s" % (len(upd_list), tbl)) + # delete old rows + q = "truncate %s" % quote_fqident(temp) + self.log.debug(q) + curs.execute(q) + # copy rows + self.log.debug("COPY %d rows into %s" % (len(upd_list), temp)) + skytools.magic_insert(curs, temp, upd_list, col_list) + temp_used = True + if LOAD_METHOD == METH_CORRECT: + # update main table + self.log.debug(upd_sql) + curs.execute(upd_sql) + self.log.debug(curs.statusmessage) + # check count + if len(upd_list) != curs.rowcount: + self.log.warning("Update mismatch: expected=%s updated=%d" + % (len(upd_list), curs.rowcount)) + else: + # delete from main table + self.log.debug(del_sql) + curs.execute(del_sql) + self.log.debug(curs.statusmessage) + # check count + if real_update_count != curs.rowcount: + self.log.warning("Update mismatch: expected=%s deleted=%d" + % (real_update_count, curs.rowcount)) + # insert into main table + if AVOID_BIZGRES_BUG: + # copy again, into main table + self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl)) + skytools.magic_insert(curs, tbl, upd_list, col_list) + else: + # better way, but does not work due bizgres bug + self.log.debug(ins_sql) + curs.execute(ins_sql) + self.log.debug(curs.statusmessage) + + # process new rows + if len(ins_list) > 0: + self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl)) + skytools.magic_insert(curs, tbl, ins_list, col_list) + + # delete remaining rows + if temp_used: + if USE_LONGLIVED_TEMP_TABLES: + q = "truncate %s" % quote_fqident(temp) + else: + # fscking problems with long-lived temp tables + q = "drop table %s" % quote_fqident(temp) + self.log.debug(q) + curs.execute(q) + + self.reset() + + def create_temp_table(self, curs): + # create temp table for loading + tempname = self.table_name.replace('.', '_') + "_loadertmp" + + # check if exists + if USE_LONGLIVED_TEMP_TABLES: + if skytools.exists_temp_table(curs, tempname): + self.log.debug("Using existing temp table %s" % tempname) + return tempname + + # bizgres crashes on delete rows + arg = "on commit delete rows" + arg = "on commit preserve rows" + # create temp table for loading + q = "create temp table %s (like %s) %s" % ( + quote_fqident(tempname), quote_fqident(self.table_name), arg) + self.log.debug("Creating temp table: %s" % q) + curs.execute(q) + return tempname + + def find_dist_fields(self, curs): + if not skytools.exists_table(curs, "pg_catalog.mpp_distribution_policy"): + return [] + schema, name = skytools.fq_name_parts(self.table_name) + q = "select a.attname"\ + " from pg_class t, pg_namespace n, pg_attribute a,"\ + " mpp_distribution_policy p"\ + " where n.oid = t.relnamespace"\ + " and p.localoid = t.oid"\ + " and a.attrelid = t.oid"\ + " and a.attnum = any(p.attrnums)"\ + " and n.nspname = %s and t.relname = %s" + curs.execute(q, [schema, name]) + res = [] + for row in curs.fetchall(): + res.append(row[0]) + return res + + +class TableHandler: + """Basic partitioned loader. + Splits events into partitions, if requested. + Then applies them without further processing. + """ + def __init__(self, rowhandler, table_name, table_mode, cf, log): + self.part_map = {} + self.rowhandler = rowhandler + self.table_name = table_name + self.quoted_name = quote_fqident(table_name) + self.log = log + if table_mode == 'direct': + self.split = False + elif table_mode == 'split': + self.split = True + smode = cf.get('split_mode', 'by-batch-time') + sfield = None + if smode.find(':') > 0: + smode, sfield = smode.split(':', 1) + self.split_field = sfield + self.split_part = cf.get('split_part', '%(table_name)s_%(year)s_%(month)s_%(day)s') + self.split_part_template = cf.get('split_part_template', '') + if smode == 'by-batch-time': + self.split_format = self.split_date_from_batch + elif smode == 'by-event-time': + self.split_format = self.split_date_from_event + elif smode == 'by-date-field': + self.split_format = self.split_date_from_field + else: + raise UsageError('Bad value for split_mode: '+smode) + self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s" % ( + self.table_name, smode, self.split_field, self.split_part)) + elif table_mode == 'ignore': + pass + else: + raise UsageError('Bad value for table_mode: '+table_mode) + + def split_date_from_batch(self, ev, data, batch_info): + d = batch_info['batch_end'] + vals = { + 'table_name': self.table_name, + 'year': "%04d" % d.year, + 'month': "%02d" % d.month, + 'day': "%02d" % d.day, + 'hour': "%02d" % d.hour, + } + dst = self.split_part % vals + return dst + + def split_date_from_event(self, ev, data, batch_info): + d = ev.ev_date + vals = { + 'table_name': self.table_name, + 'year': "%04d" % d.year, + 'month': "%02d" % d.month, + 'day': "%02d" % d.day, + 'hour': "%02d" % d.hour, + } + dst = self.split_part % vals + return dst + + def split_date_from_field(self, ev, data, batch_info): + val = data[self.split_field] + date, time = val.split(' ', 1) + y, m, d = date.split('-') + h, rest = time.split(':', 1) + vals = { + 'table_name': self.table_name, + 'year': y, + 'month': m, + 'day': d, + 'hour': h, + } + dst = self.split_part % vals + return dst + + def add(self, curs, ev, batch_info): + data = skytools.db_urldecode(ev.data) + op, pkeys = ev.type.split(':', 1) + pkey_list = pkeys.split(',') + if self.split: + dst = self.split_format(ev, data, batch_info) + if dst not in self.part_map: + self.check_part(curs, dst, pkey_list) + else: + dst = self.table_name + + if dst not in self.part_map: + self.part_map[dst] = self.rowhandler(dst, self.table_name, self.log) + + p = self.part_map[dst] + p.add_row(op, data, pkey_list) + + def flush(self, curs): + for part in self.part_map.values(): + part.flush(curs) + + def check_part(self, curs, dst, pkey_list): + if skytools.exists_table(curs, dst): + return + if not self.split_part_template: + raise UsageError('Partition %s does not exist and split_part_template not specified' % dst) + + vals = { + 'dest': quote_fqident(dst), + 'part': quote_fqident(dst), + 'parent': quote_fqident(self.table_name), + 'pkey': ",".join(pkey_list), # quoting? + } + sql = self.split_part_template % vals + curs.execute(sql) + + +class IgnoreTable(TableHandler): + """Do-nothing.""" + def add(self, curs, ev, batch_info): + pass + + +class QueueLoader(CascadedWorker): + """Loader script.""" + table_state = {} + + def reset(self): + """Drop our caches on error.""" + self.table_state = {} + CascadedWorker.reset(self) + + def init_state(self, tbl): + cf = self.cf + if tbl in cf.cf.sections(): + cf = cf.clone(tbl) + table_mode = cf.get('table_mode', 'ignore') + row_mode = cf.get('row_mode', 'plain') + if table_mode == 'ignore': + tblhandler = IgnoreTable + else: + tblhandler = TableHandler + + if row_mode == 'plain': + rowhandler = BasicLoader + elif row_mode == 'keep_latest': + rowhandler = KeepLatestLoader + elif row_mode == 'keep_all': + rowhandler = KeepAllLoader + elif row_mode == 'bulk': + rowhandler = BulkLoader + else: + raise UsageError('Bad row_mode: '+row_mode) + self.table_state[tbl] = tblhandler(rowhandler, tbl, table_mode, cf, self.log) + + def process_remote_event(self, src_curs, dst_curs, ev): + t = ev.type[:2] + if t not in ('I:', 'U:', 'D:'): + CascadedWorker.process_remote_event(self, src_curs, dst_curs, ev) + return + + tbl = ev.extra1 + if tbl not in self.table_state: + self.init_state(tbl) + st = self.table_state[tbl] + st.add(dst_curs, ev, self._batch_info) + ev.tag_done() + + def finish_remote_batch(self, src_db, dst_db, tick_id): + curs = dst_db.cursor() + for st in self.table_state.values(): + st.flush(curs) + CascadedWorker.finish_remote_batch(self, src_db, dst_db, tick_id) + +if __name__ == '__main__': + script = QueueLoader('queue_loader', 'db', sys.argv[1:]) + script.start() + diff --git a/setup.py b/setup.py index 825e6ca9..fba8c4de 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( 'scripts/cube_dispatcher.py', 'scripts/queue_mover.py', 'scripts/table_dispatcher.py', 'scripts/bulk_loader.py', 'scripts/scriptmgr.py', 'scripts/queue_splitter.py', - 'scripts/skytools_upgrade.py', + 'scripts/skytools_upgrade.py', 'scripts/queue_loader.py', ], data_files = [ ('share/doc/skytools/conf', [ diff --git a/tests/queue_loader/conf/loader_dst.ini b/tests/queue_loader/conf/loader_dst.ini new file mode 100644 index 00000000..d12a7f44 --- /dev/null +++ b/tests/queue_loader/conf/loader_dst.ini @@ -0,0 +1,100 @@ +[queue_loader] + +job_name = loader_dst +db = dbname=loaderdst +queue_name = loaderq + +logfile = log/%(job_name)s.log +pidfile = pid/%(job_name)s.pid + +rename_tables = + +[data.simple_tbl] +table_mode = direct + +[data.bulk_tbl] +table_mode = direct +row_mode = bulk + +[data.keep_all_tbl] +table_mode = split +row_mode = bulk +split_mode = by-date-field:tstamp + +### Non-inherited partitions +split_part_template = + create table %%(part)s (like %%(parent)s); + alter table only %%(part)s add primary key (%%(pkey)s); + +[data.keep_latest_tbl] +table_mode = split +row_mode = bulk +split_mode = by-date-field:tstamp + +### Inherited partitions +split_part_template = + create table %%(part)s () inherits (%%(parent)s); + alter table only %%(part)s add primary key (%%(pkey)s); + + +[DEFAULT] + +# fields - which fields to send through +#fields = col1, col2, col3:renamed3 +#fields = * + +# table_mode - how to handle a table +# +# ignore - ignore this table +# direct - update table directly +# split - split data into partitions +#table_mode = ignore + +# split_mode - how to split, if requested +# +# by-batch-time: use batch time for splitting +# by-event-time: use event time for splitting +# by-date-field:fld - use fld for splitting +#split_mode = by-batch-time + +# split_part - partition name format +# +# %(table_name)s %(year)s %(month)s %(day)s %(hour)s +#split_part = %(table_name)s_%(year)s_%(month)s_%(day)s + +# split_part_template - How to create new partition tables +# +# Available fields: +# %(part)s +# %(parent)s +# %(pkey)s +# +### Non-inherited partitions +#split_part_template = +# create table %%(part)s (like %%(parent)s); +# alter table only %%(part)s add primary key (%%(pkey)s); +# +### Inherited partitions +#split_part_template = +# create table %%(part)s () inherits (%%(parent)s); +# alter table only %%(part)s add primary key (%%(pkey)s); + + +# row_mode - How to apply the events +# +# plain - each event creates SQL statement to run +# keep_latest - change updates to DELETE + INSERT +# keep_all - change updates to inserts, ignore deletes +# bulk - instead of statement-per-row, do bulk updates +#row_mode = plain + + +# bulk_mode - How to do the bulk update +# +# correct - inserts as COPY into table, +# update as COPY into temp table and single UPDATE from there +# delete as COPY into temp table and single DELETE from there +# delete - as 'correct', but do update as DELETE + COPY +# merged - as 'delete', but merge insert rows with update rows +#bulk_mode=correct + diff --git a/tests/queue_loader/conf/loader_src.ini b/tests/queue_loader/conf/loader_src.ini new file mode 100644 index 00000000..42a4b1e8 --- /dev/null +++ b/tests/queue_loader/conf/loader_src.ini @@ -0,0 +1,8 @@ +[queue_loader] + +job_name = loader_src +db = dbname=loadersrc +queue_name = loaderq + +logfile = log/%(job_name)s.log +pidfile = pid/%(job_name)s.pid diff --git a/tests/queue_loader/conf/setadm_loaderq.ini b/tests/queue_loader/conf/setadm_loaderq.ini new file mode 100644 index 00000000..ddb59434 --- /dev/null +++ b/tests/queue_loader/conf/setadm_loaderq.ini @@ -0,0 +1,11 @@ +[cascade_admin] + +job_name = setadm_loaderq + +node_db = dbname=loadersrc + +queue_name = loaderq + + +logfile = log/%(job_name)s.log +pidfile = pid/%(job_name)s.pid diff --git a/tests/queue_loader/conf/ticker_loadersrc.ini b/tests/queue_loader/conf/ticker_loadersrc.ini new file mode 100644 index 00000000..80256711 --- /dev/null +++ b/tests/queue_loader/conf/ticker_loadersrc.ini @@ -0,0 +1,6 @@ +[pgqadm] +job_name = ticker_loadersrc +db = dbname=loadersrc +loop_delay = 0.5 +logfile = log/%(job_name)s.log +pidfile = pid/%(job_name)s.pid diff --git a/tests/queue_loader/init.sh b/tests/queue_loader/init.sh new file mode 100755 index 00000000..0488e59d --- /dev/null +++ b/tests/queue_loader/init.sh @@ -0,0 +1,14 @@ +#! /bin/sh + +. ../env.sh + +lst="loadersrc loaderdst" + +for db in $lst; do + echo dropdb $db + dropdb $db +done +for db in $lst; do + echo createdb $db + createdb $db +done diff --git a/tests/queue_loader/regen.sh b/tests/queue_loader/regen.sh new file mode 100755 index 00000000..f9a9eb3d --- /dev/null +++ b/tests/queue_loader/regen.sh @@ -0,0 +1,66 @@ +#! /bin/sh + +. ../env.sh + +mkdir -p log pid conf + +./zstop.sh + +v= +v=-q +v=-v + +(cd ../..; make -s python-install ) + +echo "" + +cleardb() { + echo "Clearing database $1" + psql -q -d $1 -c ' + set client_min_messages=warning; + drop schema if exists londiste cascade; + drop schema if exists pgq_node cascade; + drop schema if exists pgq cascade; + drop schema if exists data cascade; + ' +} + +run() { + echo "$ $*" + "$@" +} + +db_list="loadersrc loaderdst" + +for db in $db_list; do + cleardb $db +done + +echo "clean logs" +rm -f log/*.log + +set -e + +run setadm $v conf/setadm_loaderq.ini create-root ldr-src 'dbname=loadersrc' --worker=loader_src +run setadm $v conf/setadm_loaderq.ini create-leaf ldr-dst 'dbname=loaderdst' --worker=loader_dst --provider="dbname=loadersrc" + +run pgqadm $v conf/ticker_loadersrc.ini -d ticker + +run queue_loader $v -d conf/loader_src.ini +run queue_loader $v -d conf/loader_dst.ini + +run psql -d loadersrc -f tables.sql +run psql -d loadersrc -f triggers.sql + +run psql -d loaderdst -f tables.sql + +run psql -d loadersrc -f send.data.sql +run psql -d loadersrc -f send.data.sql +run psql -d loadersrc -f send.data.sql + +run sleep 2 + +run setadm $v conf/setadm_loaderq.ini status + +./zcheck.sh + diff --git a/tests/queue_loader/send.data.sql b/tests/queue_loader/send.data.sql new file mode 100644 index 00000000..1c1e256d --- /dev/null +++ b/tests/queue_loader/send.data.sql @@ -0,0 +1,21 @@ + +insert into data.simple_tbl (username, contactname, data) +values ('randuser'||random()::text, 'randcontact'||random()::text, 'link'); + +/* +insert into data.simple_tbl (username, contactname, data) +values ('sameuser', 'samecontact', 'link'); +update data.simple_tbl +*/ + +insert into data.bulk_tbl (data) values ('newdata'); + + +insert into data.keep_all_tbl (username, data) values ('sameuser', 'newdata'); + +insert into data.keep_latest_tbl (username, data) values ('sameuser', 'newdata'); + +insert into data.random_tbl (username, data) values ('sameuser', 'newdata'); + + + diff --git a/tests/queue_loader/tables.sql b/tests/queue_loader/tables.sql new file mode 100644 index 00000000..89ac0ed6 --- /dev/null +++ b/tests/queue_loader/tables.sql @@ -0,0 +1,38 @@ + +set client_min_messages = 'warning'; + +create schema data; + +create table data.simple_tbl ( + username text not null, + contactname text not null, + data text, + primary key (username, contactname) +); + +create table data.bulk_tbl ( + id serial primary key, + data text +); + +create table data.keep_all_tbl ( + id serial primary key, + username text not null, + tstamp timestamptz not null default now(), + data text +); + +create table data.keep_latest_tbl ( + id serial primary key, + username text not null, + tstamp timestamptz not null default now(), + data text +); + +create table data.random_tbl ( + id serial primary key, + username text not null, + tstamp timestamptz not null default now(), + data text +); + diff --git a/tests/queue_loader/triggers.sql b/tests/queue_loader/triggers.sql new file mode 100644 index 00000000..f8c874f5 --- /dev/null +++ b/tests/queue_loader/triggers.sql @@ -0,0 +1,16 @@ + +create trigger logger after insert or update or delete on data.simple_tbl +for each row execute procedure pgq.logutriga('loaderq'); + +create trigger logger after insert or update or delete on data.bulk_tbl +for each row execute procedure pgq.logutriga('loaderq'); + +create trigger logger after insert or update or delete on data.keep_all_tbl +for each row execute procedure pgq.logutriga('loaderq'); + +create trigger logger after insert or update or delete on data.keep_latest_tbl +for each row execute procedure pgq.logutriga('loaderq'); + +create trigger logger after insert or update or delete on data.random_tbl +for each row execute procedure pgq.logutriga('loaderq'); + diff --git a/tests/queue_loader/zcheck.sh b/tests/queue_loader/zcheck.sh new file mode 100755 index 00000000..96f59aed --- /dev/null +++ b/tests/queue_loader/zcheck.sh @@ -0,0 +1,4 @@ +#! /bin/sh + +grep -E 'ERR|WARN|CRIT' log/*.log || echo "All OK" + diff --git a/tests/queue_loader/zstop.sh b/tests/queue_loader/zstop.sh new file mode 100755 index 00000000..69e574cb --- /dev/null +++ b/tests/queue_loader/zstop.sh @@ -0,0 +1,14 @@ +#! /bin/sh + +#. ../env.sh + +for p in pid/*.pid*; do + test -f "$p" || continue + pid=`cat "$p"` + test -d "/proc/$pid" || { + rm -f "$p" + continue + } + kill "$pid" +done + -- 2.39.5