Combined queue_loader
authorMarko Kreen <markokr@gmail.com>
Tue, 17 Feb 2009 15:10:33 +0000 (17:10 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 17 Feb 2009 15:11:22 +0000 (17:11 +0200)
Merge table_dispatcher, cube_dispatcher and bulk_loader together

14 files changed:
scripts/queue_loader.ini [new file with mode: 0644]
scripts/queue_loader.py [new file with mode: 0755]
setup.py
tests/queue_loader/conf/loader_dst.ini [new file with mode: 0644]
tests/queue_loader/conf/loader_src.ini [new file with mode: 0644]
tests/queue_loader/conf/setadm_loaderq.ini [new file with mode: 0644]
tests/queue_loader/conf/ticker_loadersrc.ini [new file with mode: 0644]
tests/queue_loader/init.sh [new file with mode: 0755]
tests/queue_loader/regen.sh [new file with mode: 0755]
tests/queue_loader/send.data.sql [new file with mode: 0644]
tests/queue_loader/tables.sql [new file with mode: 0644]
tests/queue_loader/triggers.sql [new file with mode: 0644]
tests/queue_loader/zcheck.sh [new file with mode: 0755]
tests/queue_loader/zstop.sh [new file with mode: 0755]

diff --git a/scripts/queue_loader.ini b/scripts/queue_loader.ini
new file mode 100644 (file)
index 0000000..35e226f
--- /dev/null
@@ -0,0 +1,125 @@
+
+[mega_dispatcher]
+
+job_name =
+logfile =
+pidfile =
+
+db = 
+
+rename_tables = 
+
+[DEFAULT]
+
+# fields - which fields to send through
+#fields = col1, col2, col3:renamed3
+#fields = *
+
+# table_mode - how to handle a table
+#
+# ignore - ignore this table
+# direct - update table directly
+# split - split data into partitions
+#table_mode = ignore
+
+# split_mode - how to split, if requested
+#
+# by-batch-time: use batch time for splitting
+# by-event-time: use event time for splitting
+# by-date-field:fld - use fld for splitting
+#split_mode = by-batch-time
+
+# split_part - partition name format
+#
+# %(table_name)s %(year)s %(month)s %(day)s %(hour)s
+#split_part = %(table_name)s_%(year)s_%(month)s_%(day)s
+
+# split_part_template - How to create new partition tables
+#
+# Available fields:
+# %(part)s
+# %(parent)s
+# %(pkey)s
+#
+### Non-inherited partitions
+#split_part_template =
+#    create table %%(part)s (like %%(parent)s);
+#    alter table only %%(part)s add primary key (%%(pkey)s);
+#
+### Inherited partitions
+#split_part_template = 
+#    create table %%(part)s () inherits (%%(parent)s);
+#    alter table only %%(part)s add primary key (%%(pkey)s);
+
+
+# row_mode - How to apply the events
+#
+# plain - each event creates SQL statement to run
+# keep_latest - change updates to DELETE + INSERT
+# keep_all - change updates to inserts, ignore deletes
+# bulk - instead of statement-per-row, do bulk updates
+#row_mode = plain
+
+
+# bulk_mode - How to do the bulk update
+#
+# correct - inserts as COPY into table,
+#           update as COPY into temp table and single UPDATE from there
+#           delete as COPY into temp table and single DELETE from there
+# delete - as 'correct', but do update as DELETE + COPY
+# merged - as 'delete', but merge insert rows with update rows
+#bulk_mode=correct
+
+[table public.foo]
+mode = 
+create_sql =
+
+# partition by date field
+# partition by batch time
+
+# apply all
+# keep_all
+# apply latest
+
+cube:
+
+table:
+
+bulk:
+
+
+
+
+
+
+[udata_dispatcher]
+job_name          = test_move
+
+src_db            = dbname=sourcedb_test
+dst_db            = dbname=dataminedb_test
+
+pgq_queue_name    = OrderLog
+
+logfile           = ~/log/%(job_name)s.log
+pidfile           = ~/pid/%(job_name)s.pid
+
+# where to put data.  when partitioning, will be used as base name
+dest_table = orders
+
+# date field with will be used for partitioning
+# special value: _EVTIME - event creation time
+part_column = start_date
+
+#fields = *
+#fields = id, name
+#fields = id:newid, name, bar:baz
+
+
+# template used for creating partition tables
+# _DEST_TABLE
+part_template     = 
+    create table _DEST_TABLE () inherits (orders);
+    alter table only _DEST_TABLE add constraint _DEST_TABLE_pkey primary key (id);
+    grant select on _DEST_TABLE to group reporting;
+
+
diff --git a/scripts/queue_loader.py b/scripts/queue_loader.py
new file mode 100755 (executable)
index 0000000..c14ecd0
--- /dev/null
@@ -0,0 +1,531 @@
+#! /usr/bin/env python
+
+"""Load data from queue into tables, with optional partitioning."""
+
+import sys, time, skytools
+
+from pgq.cascade.worker import CascadedWorker
+
+from skytools import quote_ident, quote_fqident, UsageError
+
+# todo: auto table detect
+
+# BulkLoader load method
+METH_CORRECT = 0
+METH_DELETE = 1
+METH_MERGED = 2
+LOAD_METHOD = METH_CORRECT
+# BulkLoader hacks
+AVOID_BIZGRES_BUG = 0
+USE_LONGLIVED_TEMP_TABLES = True
+
+class BasicLoader:
+    """Apply events as-is."""
+    def __init__(self, table_name, parent_name, log):
+        self.table_name = table_name
+        self.parent_name = parent_name
+        self.sql_list = []
+        self.log = log
+
+    def add_row(self, op, data, pkey_list):
+        if op == 'I':
+            sql = skytools.mk_insert_sql(data, self.table_name, pkey_list)
+        elif op == 'U':
+            sql = skytools.mk_update_sql(data, self.table_name, pkey_list)
+        elif op == 'D':
+            sql = skytools.mk_delete_sql(data, self.table_name, pkey_list)
+        else:
+            raise Exception('bad operation: '+op)
+        self.sql_list.append(sql)
+
+    def flush(self, curs):
+        if len(self.sql_list) > 0:
+            curs.execute("\n".join(self.sql_list))
+            self.sql_list = []
+
+class KeepLatestLoader(BasicLoader):
+    """Keep latest row version.
+
+    Updates are changed to delete + insert, deletes are ignored.
+    Makes sense only for partitioned tables.
+    """
+    def add_row(self, op, data, pkey_list):
+        if op == 'U':
+            BasicLoader.add_row(self, 'D', data, pkey_list)
+            BasicLoader.add_row(self, 'I', data, pkey_list)
+        elif op == 'I':
+            BasicLoader.add_row(self, 'I', data, pkey_list)
+        else:
+            pass
+
+
+class KeepAllLoader(BasicLoader):
+    """Keep all row versions.
+
+    Updates are changed to inserts, deletes are ignored.
+    Makes sense only for partitioned tables.
+    """
+    def add_row(self, op, data, pkey_list):
+        if op == 'U':
+            op = 'I'
+        elif op == 'D':
+            return
+        BasicLoader.add_row(self, op, data, pkey_list)
+
+
+class BulkEvent(object):
+    """Helper class for BulkLoader to store relevant data."""
+    __slots__ = ('op', 'data', 'pk_data')
+    def __init__(self, op, data, pk_data):
+        self.op = op
+        self.data = data
+        self.pk_data = pk_data
+
+class BulkLoader(BasicLoader):
+    """Instead of statement-per event, load all data with one
+    big COPY, UPDATE or DELETE statement.
+    """
+    fake_seq = 0
+    def __init__(self, table_name, parent_name, log):
+        """Init per-batch table data cache."""
+        BasicLoader.__init__(self, table_name, parent_name, log)
+
+        self.pkey_list = None
+        self.dist_fields = None
+        self.col_list = None
+
+        self.ev_list = []
+        self.pkey_ev_map = {}
+
+    def reset(self):
+        self.ev_list = []
+        self.pkey_ev_map = {}
+
+    def add_row(self, op, data, pkey_list):
+        """Store new event."""
+
+        # get pkey value
+        if self.pkey_list is None:
+            self.pkey_list = pkey_list
+        if len(self.pkey_list) > 0:
+            pk_data = (data[k] for k in self.pkey_list)
+        elif op == 'I':
+            # fake pkey, just to get them spread out
+            pk_data = self.fake_seq
+            self.fake_seq += 1
+        else:
+            raise Exception('non-pk tables not supported: %s' % self.table_name)
+
+        # get full column list, detect added columns
+        if not self.col_list:
+            self.col_list = data.keys()
+        elif self.col_list != data.keys():
+            # ^ supposedly python guarantees same order in keys()
+            self.col_list = data.keys()
+
+        # add to list
+        ev = BulkEvent(op, data, pk_data)
+        self.ev_list.append(ev)
+
+        # keep all versions of row data
+        if ev.pk_data in self.pkey_ev_map:
+            self.pkey_ev_map[ev.pk_data].append(ev)
+        else:
+            self.pkey_ev_map[ev.pk_data] = [ev]
+
+    def prepare_data(self):
+        """Got all data, prepare for insertion."""
+
+        del_list = []
+        ins_list = []
+        upd_list = []
+        for ev_list in self.pkey_ev_map.values():
+            # rewrite list of I/U/D events to
+            # optional DELETE and optional INSERT/COPY command
+            exists_before = -1
+            exists_after = 1
+            for ev in ev_list:
+                if ev.op == "I":
+                    if exists_before < 0:
+                        exists_before = 0
+                    exists_after = 1
+                elif ev.op == "U":
+                    if exists_before < 0:
+                        exists_before = 1
+                    #exists_after = 1 # this shouldnt be needed
+                elif ev.op == "D":
+                    if exists_before < 0:
+                        exists_before = 1
+                    exists_after = 0
+                else:
+                    raise Exception('unknown event type: %s' % ev.op)
+
+            # skip short-lived rows
+            if exists_before == 0 and exists_after == 0:
+                continue
+
+            # take last event
+            ev = ev_list[-1]
+            
+            # generate needed commands
+            if exists_before and exists_after:
+                upd_list.append(ev.data)
+            elif exists_before:
+                del_list.append(ev.data)
+            elif exists_after:
+                ins_list.append(ev.data)
+
+        return ins_list, upd_list, del_list
+
+    def flush(self, curs):
+        ins_list, upd_list, del_list = self.prepare_data()
+
+        # reorder cols
+        col_list = self.pkey_list[:]
+        for k in self.col_list:
+            if k not in self.pkey_list:
+                col_list.append(k)
+
+        real_update_count = len(upd_list)
+
+        #self.log.debug("process_one_table: %s  (I/U/D = %d/%d/%d)" % (
+        #               tbl, len(ins_list), len(upd_list), len(del_list)))
+
+        # hack to unbroke stuff
+        if LOAD_METHOD == METH_MERGED:
+            upd_list += ins_list
+            ins_list = []
+
+        # fetch distribution fields
+        if self.dist_fields is None:
+            self.dist_fields = self.find_dist_fields(curs)
+
+        key_fields = self.pkey_list[:]
+        for fld in self.dist_fields:
+            if fld not in key_fields:
+                key_fields.append(fld)
+        #self.log.debug("PKey fields: %s  Extra fields: %s" % (
+        #               ",".join(cache.pkey_list), ",".join(extra_fields)))
+
+        # create temp table
+        temp = self.create_temp_table(curs)
+        tbl = self.table_name
+        
+        # where expr must have pkey and dist fields
+        klist = []
+        for pk in key_fields:
+            exp = "%s.%s = %s.%s" % (quote_fqident(tbl), quote_ident(pk),
+                                     quote_fqident(temp), quote_ident(pk))
+            klist.append(exp)
+        whe_expr = " and ".join(klist)
+
+        # create del sql
+        del_sql = "delete from only %s using %s where %s" % (
+                  quote_fqident(tbl), quote_fqident(temp), whe_expr)
+
+        # create update sql
+        slist = []
+        for col in col_list:
+            if col not in key_fields:
+                exp = "%s = %s.%s" % (quote_ident(col), quote_fqident(temp), quote_ident(col))
+                slist.append(exp)
+        upd_sql = "update only %s set %s from %s where %s" % (
+                    quote_fqident(tbl), ", ".join(slist), quote_fqident(temp), whe_expr)
+
+        # insert sql
+        colstr = ",".join([quote_ident(c) for c in col_list])
+        ins_sql = "insert into %s (%s) select %s from %s" % (
+                  quote_fqident(tbl), colstr, colstr, quote_fqident(temp))
+
+        temp_used = False
+
+        # process deleted rows
+        if len(del_list) > 0:
+            #self.log.info("Deleting %d rows from %s" % (len(del_list), tbl))
+            # delete old rows
+            q = "truncate %s" % quote_fqident(temp)
+            self.log.debug(q)
+            curs.execute(q)
+            # copy rows
+            self.log.debug("COPY %d rows into %s" % (len(del_list), temp))
+            skytools.magic_insert(curs, temp, del_list, col_list)
+            # delete rows
+            self.log.debug(del_sql)
+            curs.execute(del_sql)
+            self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
+            self.log.debug(curs.statusmessage)
+            if len(del_list) != curs.rowcount:
+                self.log.warning("Delete mismatch: expected=%s updated=%d"
+                        % (len(del_list), curs.rowcount))
+            temp_used = True
+
+        # process updated rows
+        if len(upd_list) > 0:
+            #self.log.info("Updating %d rows in %s" % (len(upd_list), tbl))
+            # delete old rows
+            q = "truncate %s" % quote_fqident(temp)
+            self.log.debug(q)
+            curs.execute(q)
+            # copy rows
+            self.log.debug("COPY %d rows into %s" % (len(upd_list), temp))
+            skytools.magic_insert(curs, temp, upd_list, col_list)
+            temp_used = True
+            if LOAD_METHOD == METH_CORRECT:
+                # update main table
+                self.log.debug(upd_sql)
+                curs.execute(upd_sql)
+                self.log.debug(curs.statusmessage)
+                # check count
+                if len(upd_list) != curs.rowcount:
+                    self.log.warning("Update mismatch: expected=%s updated=%d"
+                            % (len(upd_list), curs.rowcount))
+            else:
+                # delete from main table
+                self.log.debug(del_sql)
+                curs.execute(del_sql)
+                self.log.debug(curs.statusmessage)
+                # check count
+                if real_update_count != curs.rowcount:
+                    self.log.warning("Update mismatch: expected=%s deleted=%d"
+                            % (real_update_count, curs.rowcount))
+                # insert into main table
+                if AVOID_BIZGRES_BUG:
+                    # copy again, into main table
+                    self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl))
+                    skytools.magic_insert(curs, tbl, upd_list, col_list)
+                else:
+                    # better way, but does not work due bizgres bug
+                    self.log.debug(ins_sql)
+                    curs.execute(ins_sql)
+                    self.log.debug(curs.statusmessage)
+
+        # process new rows
+        if len(ins_list) > 0:
+            self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl))
+            skytools.magic_insert(curs, tbl, ins_list, col_list)
+
+        # delete remaining rows
+        if temp_used:
+            if USE_LONGLIVED_TEMP_TABLES:
+                q = "truncate %s" % quote_fqident(temp)
+            else:
+                # fscking problems with long-lived temp tables
+                q = "drop table %s" % quote_fqident(temp)
+            self.log.debug(q)
+            curs.execute(q)
+
+        self.reset()
+
+    def create_temp_table(self, curs):
+        # create temp table for loading
+        tempname = self.table_name.replace('.', '_') + "_loadertmp"
+
+        # check if exists
+        if USE_LONGLIVED_TEMP_TABLES:
+            if skytools.exists_temp_table(curs, tempname):
+                self.log.debug("Using existing temp table %s" % tempname)
+                return tempname
+    
+        # bizgres crashes on delete rows
+        arg = "on commit delete rows"
+        arg = "on commit preserve rows"
+        # create temp table for loading
+        q = "create temp table %s (like %s) %s" % (
+                quote_fqident(tempname), quote_fqident(self.table_name), arg)
+        self.log.debug("Creating temp table: %s" % q)
+        curs.execute(q)
+        return tempname
+
+    def find_dist_fields(self, curs):
+        if not skytools.exists_table(curs, "pg_catalog.mpp_distribution_policy"):
+            return []
+        schema, name = skytools.fq_name_parts(self.table_name)
+        q = "select a.attname"\
+            "  from pg_class t, pg_namespace n, pg_attribute a,"\
+            "       mpp_distribution_policy p"\
+            " where n.oid = t.relnamespace"\
+            "   and p.localoid = t.oid"\
+            "   and a.attrelid = t.oid"\
+            "   and a.attnum = any(p.attrnums)"\
+            "   and n.nspname = %s and t.relname = %s"
+        curs.execute(q, [schema, name])
+        res = []
+        for row in curs.fetchall():
+            res.append(row[0])
+        return res
+
+
+class TableHandler:
+    """Basic partitioned loader.
+    Splits events into partitions, if requested.
+    Then applies them without further processing.
+    """
+    def __init__(self, rowhandler, table_name, table_mode, cf, log):
+        self.part_map = {}
+        self.rowhandler = rowhandler
+        self.table_name = table_name
+        self.quoted_name = quote_fqident(table_name)
+        self.log = log
+        if table_mode == 'direct':
+            self.split = False
+        elif table_mode == 'split':
+            self.split = True
+            smode = cf.get('split_mode', 'by-batch-time')
+            sfield = None
+            if smode.find(':') > 0:
+                smode, sfield = smode.split(':', 1)
+            self.split_field = sfield
+            self.split_part = cf.get('split_part', '%(table_name)s_%(year)s_%(month)s_%(day)s')
+            self.split_part_template = cf.get('split_part_template', '')
+            if smode == 'by-batch-time':
+                self.split_format = self.split_date_from_batch
+            elif smode == 'by-event-time':
+                self.split_format = self.split_date_from_event
+            elif smode == 'by-date-field':
+                self.split_format = self.split_date_from_field
+            else:
+                raise UsageError('Bad value for split_mode: '+smode)
+            self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s" % (
+                self.table_name, smode, self.split_field, self.split_part))
+        elif table_mode == 'ignore':
+            pass
+        else:
+            raise UsageError('Bad value for table_mode: '+table_mode)
+
+    def split_date_from_batch(self, ev, data, batch_info):
+        d = batch_info['batch_end']
+        vals = {
+            'table_name': self.table_name,
+            'year': "%04d" % d.year,
+            'month': "%02d" % d.month,
+            'day': "%02d" % d.day,
+            'hour': "%02d" % d.hour,
+        }
+        dst = self.split_part % vals
+        return dst
+
+    def split_date_from_event(self, ev, data, batch_info):
+        d = ev.ev_date
+        vals = {
+            'table_name': self.table_name,
+            'year': "%04d" % d.year,
+            'month': "%02d" % d.month,
+            'day': "%02d" % d.day,
+            'hour': "%02d" % d.hour,
+        }
+        dst = self.split_part % vals
+        return dst
+
+    def split_date_from_field(self, ev, data, batch_info):
+        val = data[self.split_field]
+        date, time = val.split(' ', 1)
+        y, m, d = date.split('-')
+        h, rest = time.split(':', 1)
+        vals = {
+            'table_name': self.table_name,
+            'year': y,
+            'month': m,
+            'day': d,
+            'hour': h,
+        }
+        dst = self.split_part % vals
+        return dst
+
+    def add(self, curs, ev, batch_info):
+        data = skytools.db_urldecode(ev.data)
+        op, pkeys = ev.type.split(':', 1)
+        pkey_list = pkeys.split(',')
+        if self.split:
+            dst = self.split_format(ev, data, batch_info)
+            if dst not in self.part_map:
+                self.check_part(curs, dst, pkey_list)
+        else:
+            dst = self.table_name
+
+        if dst not in self.part_map:
+            self.part_map[dst] = self.rowhandler(dst, self.table_name, self.log)
+
+        p = self.part_map[dst]
+        p.add_row(op, data, pkey_list)
+
+    def flush(self, curs):
+        for part in self.part_map.values():
+            part.flush(curs)
+
+    def check_part(self, curs, dst, pkey_list):
+        if skytools.exists_table(curs, dst):
+            return
+        if not self.split_part_template:
+            raise UsageError('Partition %s does not exist and split_part_template not specified' % dst)
+
+        vals = {
+            'dest': quote_fqident(dst),
+            'part': quote_fqident(dst),
+            'parent': quote_fqident(self.table_name),
+            'pkey': ",".join(pkey_list), # quoting?
+        }
+        sql = self.split_part_template % vals
+        curs.execute(sql)
+
+
+class IgnoreTable(TableHandler):
+    """Do-nothing."""
+    def add(self, curs, ev, batch_info):
+        pass
+
+
+class QueueLoader(CascadedWorker):
+    """Loader script."""
+    table_state = {}
+
+    def reset(self):
+        """Drop our caches on error."""
+        self.table_state = {}
+        CascadedWorker.reset(self)
+
+    def init_state(self, tbl):
+        cf = self.cf
+        if tbl in cf.cf.sections():
+            cf = cf.clone(tbl)
+        table_mode = cf.get('table_mode', 'ignore')
+        row_mode = cf.get('row_mode', 'plain')
+        if table_mode == 'ignore':
+            tblhandler = IgnoreTable
+        else:
+            tblhandler = TableHandler
+
+        if row_mode == 'plain':
+            rowhandler = BasicLoader
+        elif row_mode == 'keep_latest':
+            rowhandler = KeepLatestLoader
+        elif row_mode == 'keep_all':
+            rowhandler = KeepAllLoader
+        elif row_mode == 'bulk':
+            rowhandler = BulkLoader
+        else:
+            raise UsageError('Bad row_mode: '+row_mode)
+        self.table_state[tbl] = tblhandler(rowhandler, tbl, table_mode, cf, self.log)
+
+    def process_remote_event(self, src_curs, dst_curs, ev):
+        t = ev.type[:2]
+        if t not in ('I:', 'U:', 'D:'):
+            CascadedWorker.process_remote_event(self, src_curs, dst_curs, ev)
+            return
+
+        tbl = ev.extra1
+        if tbl not in self.table_state:
+            self.init_state(tbl)
+        st = self.table_state[tbl]
+        st.add(dst_curs, ev, self._batch_info)
+        ev.tag_done()
+
+    def finish_remote_batch(self, src_db, dst_db, tick_id):
+        curs = dst_db.cursor()
+        for st in self.table_state.values():
+            st.flush(curs)
+        CascadedWorker.finish_remote_batch(self, src_db, dst_db, tick_id)
+
+if __name__ == '__main__':
+    script = QueueLoader('queue_loader', 'db', sys.argv[1:])
+    script.start()
+
index 825e6ca97a57df3a5b8ac213cf19de13370b9a7a..fba8c4de37ac5726ed73be175b1df8d9d392ffd8 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -39,7 +39,7 @@ setup(
                'scripts/cube_dispatcher.py', 'scripts/queue_mover.py',
                'scripts/table_dispatcher.py', 'scripts/bulk_loader.py',
                'scripts/scriptmgr.py', 'scripts/queue_splitter.py',
-               'scripts/skytools_upgrade.py',
+               'scripts/skytools_upgrade.py', 'scripts/queue_loader.py',
                ],
     data_files = [
       ('share/doc/skytools/conf', [
diff --git a/tests/queue_loader/conf/loader_dst.ini b/tests/queue_loader/conf/loader_dst.ini
new file mode 100644 (file)
index 0000000..d12a7f4
--- /dev/null
@@ -0,0 +1,100 @@
+[queue_loader]
+
+job_name = loader_dst
+db = dbname=loaderdst
+queue_name = loaderq
+
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
+
+rename_tables = 
+
+[data.simple_tbl]
+table_mode = direct
+
+[data.bulk_tbl]
+table_mode = direct
+row_mode = bulk
+
+[data.keep_all_tbl]
+table_mode = split
+row_mode = bulk
+split_mode = by-date-field:tstamp
+
+### Non-inherited partitions
+split_part_template =
+    create table %%(part)s (like %%(parent)s);
+    alter table only %%(part)s add primary key (%%(pkey)s);
+
+[data.keep_latest_tbl]
+table_mode = split
+row_mode = bulk
+split_mode = by-date-field:tstamp
+
+### Inherited partitions
+split_part_template = 
+    create table %%(part)s () inherits (%%(parent)s);
+    alter table only %%(part)s add primary key (%%(pkey)s);
+
+
+[DEFAULT]
+
+# fields - which fields to send through
+#fields = col1, col2, col3:renamed3
+#fields = *
+
+# table_mode - how to handle a table
+#
+# ignore - ignore this table
+# direct - update table directly
+# split - split data into partitions
+#table_mode = ignore
+
+# split_mode - how to split, if requested
+#
+# by-batch-time: use batch time for splitting
+# by-event-time: use event time for splitting
+# by-date-field:fld - use fld for splitting
+#split_mode = by-batch-time
+
+# split_part - partition name format
+#
+# %(table_name)s %(year)s %(month)s %(day)s %(hour)s
+#split_part = %(table_name)s_%(year)s_%(month)s_%(day)s
+
+# split_part_template - How to create new partition tables
+#
+# Available fields:
+# %(part)s
+# %(parent)s
+# %(pkey)s
+#
+### Non-inherited partitions
+#split_part_template =
+#    create table %%(part)s (like %%(parent)s);
+#    alter table only %%(part)s add primary key (%%(pkey)s);
+#
+### Inherited partitions
+#split_part_template = 
+#    create table %%(part)s () inherits (%%(parent)s);
+#    alter table only %%(part)s add primary key (%%(pkey)s);
+
+
+# row_mode - How to apply the events
+#
+# plain - each event creates SQL statement to run
+# keep_latest - change updates to DELETE + INSERT
+# keep_all - change updates to inserts, ignore deletes
+# bulk - instead of statement-per-row, do bulk updates
+#row_mode = plain
+
+
+# bulk_mode - How to do the bulk update
+#
+# correct - inserts as COPY into table,
+#           update as COPY into temp table and single UPDATE from there
+#           delete as COPY into temp table and single DELETE from there
+# delete - as 'correct', but do update as DELETE + COPY
+# merged - as 'delete', but merge insert rows with update rows
+#bulk_mode=correct
+
diff --git a/tests/queue_loader/conf/loader_src.ini b/tests/queue_loader/conf/loader_src.ini
new file mode 100644 (file)
index 0000000..42a4b1e
--- /dev/null
@@ -0,0 +1,8 @@
+[queue_loader]
+
+job_name = loader_src
+db = dbname=loadersrc
+queue_name = loaderq
+
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
diff --git a/tests/queue_loader/conf/setadm_loaderq.ini b/tests/queue_loader/conf/setadm_loaderq.ini
new file mode 100644 (file)
index 0000000..ddb5943
--- /dev/null
@@ -0,0 +1,11 @@
+[cascade_admin]
+
+job_name = setadm_loaderq
+
+node_db = dbname=loadersrc
+
+queue_name = loaderq
+
+
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
diff --git a/tests/queue_loader/conf/ticker_loadersrc.ini b/tests/queue_loader/conf/ticker_loadersrc.ini
new file mode 100644 (file)
index 0000000..8025671
--- /dev/null
@@ -0,0 +1,6 @@
+[pgqadm]
+job_name = ticker_loadersrc
+db = dbname=loadersrc
+loop_delay = 0.5
+logfile = log/%(job_name)s.log
+pidfile = pid/%(job_name)s.pid
diff --git a/tests/queue_loader/init.sh b/tests/queue_loader/init.sh
new file mode 100755 (executable)
index 0000000..0488e59
--- /dev/null
@@ -0,0 +1,14 @@
+#! /bin/sh
+
+. ../env.sh
+
+lst="loadersrc loaderdst"
+
+for db in $lst; do
+  echo dropdb $db
+  dropdb $db
+done
+for db in $lst; do
+  echo createdb $db
+  createdb $db
+done
diff --git a/tests/queue_loader/regen.sh b/tests/queue_loader/regen.sh
new file mode 100755 (executable)
index 0000000..f9a9eb3
--- /dev/null
@@ -0,0 +1,66 @@
+#! /bin/sh
+
+. ../env.sh
+
+mkdir -p log pid conf
+
+./zstop.sh
+
+v=
+v=-q
+v=-v
+
+(cd ../..; make -s python-install )
+
+echo ""
+
+cleardb() {
+  echo "Clearing database $1"
+  psql -q -d $1 -c '
+      set client_min_messages=warning;
+      drop schema if exists londiste cascade;
+      drop schema if exists pgq_node cascade;
+      drop schema if exists pgq cascade;
+      drop schema if exists data cascade;
+  '
+}
+
+run() {
+  echo "$ $*"
+  "$@"
+}
+
+db_list="loadersrc loaderdst"
+
+for db in $db_list; do
+  cleardb $db
+done
+
+echo "clean logs"
+rm -f log/*.log
+
+set -e
+
+run setadm $v conf/setadm_loaderq.ini create-root ldr-src 'dbname=loadersrc' --worker=loader_src
+run setadm $v conf/setadm_loaderq.ini create-leaf ldr-dst 'dbname=loaderdst' --worker=loader_dst --provider="dbname=loadersrc"
+
+run pgqadm $v conf/ticker_loadersrc.ini -d ticker
+
+run queue_loader $v -d conf/loader_src.ini
+run queue_loader $v -d conf/loader_dst.ini
+
+run psql -d loadersrc -f tables.sql
+run psql -d loadersrc -f triggers.sql
+
+run psql -d loaderdst -f tables.sql
+
+run psql -d loadersrc -f send.data.sql
+run psql -d loadersrc -f send.data.sql
+run psql -d loadersrc -f send.data.sql
+
+run sleep 2
+
+run setadm $v conf/setadm_loaderq.ini status
+
+./zcheck.sh
+
diff --git a/tests/queue_loader/send.data.sql b/tests/queue_loader/send.data.sql
new file mode 100644 (file)
index 0000000..1c1e256
--- /dev/null
@@ -0,0 +1,21 @@
+
+insert into data.simple_tbl (username, contactname, data)
+values ('randuser'||random()::text, 'randcontact'||random()::text, 'link');
+
+/*
+insert into data.simple_tbl (username, contactname, data)
+values ('sameuser', 'samecontact', 'link');
+update data.simple_tbl 
+*/
+
+insert into data.bulk_tbl (data) values ('newdata');
+
+
+insert into data.keep_all_tbl (username, data) values ('sameuser', 'newdata');
+
+insert into data.keep_latest_tbl (username, data) values ('sameuser', 'newdata');
+
+insert into data.random_tbl (username, data) values ('sameuser', 'newdata');
+
+
+
diff --git a/tests/queue_loader/tables.sql b/tests/queue_loader/tables.sql
new file mode 100644 (file)
index 0000000..89ac0ed
--- /dev/null
@@ -0,0 +1,38 @@
+
+set client_min_messages = 'warning';
+
+create schema data;
+
+create table data.simple_tbl (
+    username text not null,
+    contactname text not null,
+    data text,
+    primary key (username, contactname)
+);
+
+create table data.bulk_tbl (
+    id serial primary key,
+    data text
+);
+
+create table data.keep_all_tbl (
+    id serial primary key,
+    username text not null,
+    tstamp timestamptz not null default now(),
+    data text
+);
+
+create table data.keep_latest_tbl (
+    id serial primary key,
+    username text not null,
+    tstamp timestamptz not null default now(),
+    data text
+);
+
+create table data.random_tbl (
+    id serial primary key,
+    username text not null,
+    tstamp timestamptz not null default now(),
+    data text
+);
+
diff --git a/tests/queue_loader/triggers.sql b/tests/queue_loader/triggers.sql
new file mode 100644 (file)
index 0000000..f8c874f
--- /dev/null
@@ -0,0 +1,16 @@
+
+create trigger logger after insert or update or delete on data.simple_tbl
+for each row execute procedure pgq.logutriga('loaderq');
+
+create trigger logger after insert or update or delete on data.bulk_tbl
+for each row execute procedure pgq.logutriga('loaderq');
+
+create trigger logger after insert or update or delete on data.keep_all_tbl
+for each row execute procedure pgq.logutriga('loaderq');
+
+create trigger logger after insert or update or delete on data.keep_latest_tbl
+for each row execute procedure pgq.logutriga('loaderq');
+
+create trigger logger after insert or update or delete on data.random_tbl
+for each row execute procedure pgq.logutriga('loaderq');
+
diff --git a/tests/queue_loader/zcheck.sh b/tests/queue_loader/zcheck.sh
new file mode 100755 (executable)
index 0000000..96f59ae
--- /dev/null
@@ -0,0 +1,4 @@
+#! /bin/sh
+
+grep -E 'ERR|WARN|CRIT' log/*.log || echo "All OK"
+
diff --git a/tests/queue_loader/zstop.sh b/tests/queue_loader/zstop.sh
new file mode 100755 (executable)
index 0000000..69e574c
--- /dev/null
@@ -0,0 +1,14 @@
+#! /bin/sh
+
+#. ../env.sh
+
+for p in pid/*.pid*; do
+  test -f "$p" || continue
+  pid=`cat "$p"`
+  test -d "/proc/$pid" || {
+    rm -f "$p"
+    continue
+  }
+  kill "$pid"
+done
+