pgq: rewrite triggers in C, thus not depending on plpython anymore
authorMarko Kreen <markokr@gmail.com>
Mon, 30 Jul 2007 12:31:24 +0000 (12:31 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 30 Jul 2007 12:31:24 +0000 (12:31 +0000)
16 files changed:
sql/pgq/Makefile
sql/pgq/README.pgq
sql/pgq/expected/logutriga.out
sql/pgq/expected/sqltriga.out
sql/pgq/sql/sqltriga.sql
sql/pgq/structure/triggers.sql
sql/pgq/triggers/Makefile [new file with mode: 0644]
sql/pgq/triggers/common.c [new file with mode: 0644]
sql/pgq/triggers/common.h [new file with mode: 0644]
sql/pgq/triggers/logtriga.c [new file with mode: 0644]
sql/pgq/triggers/logutriga.c [new file with mode: 0644]
sql/pgq/triggers/makesql.c [new file with mode: 0644]
sql/pgq/triggers/pgq_triggers.sql.in [new file with mode: 0644]
sql/pgq/triggers/sqltriga.c [new file with mode: 0644]
sql/pgq/triggers/stringutil.c [new file with mode: 0644]
sql/pgq/triggers/stringutil.h [new file with mode: 0644]

index 3df56237ef6e4c925af0a8b990dd35b3da841682..ff23ff7e10ed0009de3f927c334eb24ac58a69e6 100644 (file)
@@ -6,10 +6,11 @@ DATA = structure/uninstall_pgq.sql
 SRCS = $(wildcard structure/*.sql) \
        $(wildcard functions/*.sql) \
        $(wildcard triggers/*.sql) \
-       lowlevel/pgq_lowlevel.sql
+       lowlevel/pgq_lowlevel.sql \
+       triggers/pgq_triggers.sql
 
 REGRESS = pgq_init pgq_core logutriga sqltriga
-REGRESS_OPTS = --load-language=plpythonu --load-language=plpgsql
+REGRESS_OPTS = --load-language=plpgsql
 
 PGXS = $(shell pg_config --pgxs)
 include $(PGXS)
@@ -18,15 +19,20 @@ NDOC = NaturalDocs
 NDOCARGS = -r -o html docs/html -p docs -i docs/sql
 CATSQL = ../../scripts/catsql.py
 
+SUBDIRS = lowlevel triggers
+
 # PGXS does not have subdir support, thus hack to recurse into lowlevel/
-all: low-all
-install: low-install
-clean: low-clean
-distclean: low-distclean
-low-all low-install low-clean low-distclean:
-       $(MAKE) -C lowlevel $(subst low-,,$@) DESTDIR=$(DESTDIR) 
-
-lowlevel/pgq_lowlevel.sql: low-all
+all: sub-all
+install: sub-install
+clean: sub-clean
+distclean: sub-distclean
+sub-all sub-install sub-clean sub-distclean:
+       for dir in $(SUBDIRS); do \
+               $(MAKE) -C $$dir $(subst sub-,,$@) DESTDIR=$(DESTDIR); \
+       done
+
+lowlevel/pgq_lowlevel.sql: sub-all
+triggers/pgq_triggers.sql: sub-all
 
 #
 # combined SQL files
index b8757161ae9ff5c18584e43ce465a2f961ff6f88..7ef5a1eab914f2261b27041a89be3a72de0d16c7 100644 (file)
@@ -10,10 +10,3 @@ pgq.retry_queue              events to be retried
 pgq.failed_queue       events that have failed
 pgq.event_*            data tables
 
-Random ideas
-============
-
-- all ticker logic in DB (plpython)
-- more funcs in plpython
-- insert_event in C (way to get rid of plpython)
-
index 5a0384d17d04992ef734c277fa4c21927057a6b2..324a239b459b268753597b8667ae5255a14e6810 100644 (file)
@@ -15,8 +15,8 @@ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "udata_pkey" for
 create trigger utest AFTER insert or update or delete ON udata
 for each row execute procedure pgq.logutriga('udata_que');
 insert into udata (txt) values ('text1');
-NOTICE:  insert_event(udata_que, I, bin&txt=text1&id=1, public.udata)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, $5, null, null)"
+NOTICE:  insert_event(udata_que, I:id, id=1&txt=text1&bin, public.udata)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 insert into udata (bin) values (E'bi\tn\\000bin');
-NOTICE:  insert_event(udata_que, I, bin=bi%5C011n%5C000bin&txt&id=2, public.udata)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, $5, null, null)"
+NOTICE:  insert_event(udata_que, I:id, id=2&txt&bin=bi%5c011n%5c000bin, public.udata)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
index 8e39621292e2ba3f80d533fbff17595948d2a9bf..37bc2ec16d016e55f1f896e851ac1940730da42e 100644 (file)
@@ -8,79 +8,71 @@ create trigger rtest_triga after insert or update or delete on rtest
 for each row execute procedure pgq.sqltriga('que');
 -- simple test
 insert into rtest values (1, 'value1');
-NOTICE:  insert_event(que, I, (dat,id) values ('value1','1'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que, I, (id,dat) values ('1','value1'), public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat = 'value2';
 NOTICE:  insert_event(que, U, dat='value2' where id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 delete from rtest;
 NOTICE:  insert_event(que, D, id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 -- test new fields
 alter table rtest add column dat2 text;
 insert into rtest values (1, 'value1');
-NOTICE:  insert_event(que, I, (dat,id) values ('value1','1'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que, I, (id,dat,dat2) values ('1','value1',null), public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat = 'value2';
 NOTICE:  insert_event(que, U, dat='value2' where id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 delete from rtest;
 NOTICE:  insert_event(que, D, id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 -- test field ignore
 drop trigger rtest_triga on rtest;
 create trigger rtest_triga after insert or update or delete on rtest
 for each row execute procedure pgq.sqltriga('que2', 'ignore=dat2');
 insert into rtest values (1, '666', 'newdat');
-NOTICE:  insert_event(que2, I, (dat,id) values ('666','1'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que2, I, (id,dat) values ('1','666'), public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat = 5, dat2 = 'newdat2';
 NOTICE:  insert_event(que2, U, dat='5' where id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat = 6;
 NOTICE:  insert_event(que2, U, dat='6' where id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 delete from rtest;
 NOTICE:  insert_event(que2, D, id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 -- test hashed pkey
-drop trigger rtest_triga on rtest;
-create trigger rtest_triga after insert or update or delete on rtest
-for each row execute procedure pgq.sqltriga('que2', 'ignore=dat2&pkey=dat,hashtext(dat)');
-insert into rtest values (1, '666', 'newdat');
-NOTICE:  insert_event(que2, I, (dat,id) values ('666','1'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
-update rtest set dat = 5, dat2 = 'newdat2';
-NOTICE:  insert_event(que2, U, dat='5' where dat='5' and hashtext(dat) = hashtext('5'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
-update rtest set dat = 6;
-NOTICE:  insert_event(que2, U, dat='6' where dat='6' and hashtext(dat) = hashtext('6'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
-delete from rtest;
-NOTICE:  insert_event(que2, D, dat='6' and hashtext(dat) = hashtext('6'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+-- drop trigger rtest_triga on rtest;
+-- create trigger rtest_triga after insert or update or delete on rtest
+-- for each row execute procedure pgq.sqltriga('que2', 'ignore=dat2','pkey=dat,hashtext(dat)');
+-- insert into rtest values (1, '666', 'newdat');
+-- update rtest set dat = 5, dat2 = 'newdat2';
+-- update rtest set dat = 6;
+-- delete from rtest;
 -- test wrong key
 drop trigger rtest_triga on rtest;
 create trigger rtest_triga after insert or update or delete on rtest
 for each row execute procedure pgq.sqltriga('que3');
 insert into rtest values (1, 0, 'non-null');
-NOTICE:  insert_event(que3, I, (dat,id) values ('0','1'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que3, I, (id,dat,dat2) values ('1','0','non-null'), public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 insert into rtest values (2, 0, NULL);
-NOTICE:  insert_event(que3, I, (dat,id) values ('0','2'), public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que3, I, (id,dat,dat2) values ('2','0',null), public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat2 = 'non-null2' where id=1;
-NOTICE:  insert_event(que3, U, id='1' where id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que3, U, dat2='non-null2' where id='1', public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat2 = NULL where id=1;
-NOTICE:  insert_event(que3, U, id='1' where id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que3, U, dat2=NULL where id='1', public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 update rtest set dat2 = 'new-nonnull' where id=2;
-NOTICE:  insert_event(que3, U, id='2' where id='2', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+NOTICE:  insert_event(que3, U, dat2='new-nonnull' where id='2', public.rtest)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 delete from rtest where id=1;
 NOTICE:  insert_event(que3, D, id='1', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
 delete from rtest where id=2;
 NOTICE:  insert_event(que3, D, id='2', public.rtest)
-CONTEXT:  SQL statement "SELECT pgq.insert_event($1, $2, $3, $4, null, null, null)"
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, null, null, null)"
index 49b86ee78fa6ef59b9b17e2a20c9388bda2d40ec..808440b4154ae1f13d59ed12f327f48967e353b8 100644 (file)
@@ -30,14 +30,14 @@ update rtest set dat = 6;
 delete from rtest;
 
 -- test hashed pkey
-drop trigger rtest_triga on rtest;
-create trigger rtest_triga after insert or update or delete on rtest
-for each row execute procedure pgq.sqltriga('que2', 'ignore=dat2&pkey=dat,hashtext(dat)');
-
-insert into rtest values (1, '666', 'newdat');
-update rtest set dat = 5, dat2 = 'newdat2';
-update rtest set dat = 6;
-delete from rtest;
+-- drop trigger rtest_triga on rtest;
+-- create trigger rtest_triga after insert or update or delete on rtest
+-- for each row execute procedure pgq.sqltriga('que2', 'ignore=dat2','pkey=dat,hashtext(dat)');
+
+-- insert into rtest values (1, '666', 'newdat');
+-- update rtest set dat = 5, dat2 = 'newdat2';
+-- update rtest set dat = 6;
+-- delete from rtest;
 
 
 -- test wrong key
index e732347fae51f5fcfb7eff5d621ee6898c88dfd1..c36d3df9aa76cfc5cceeca04fe72a5813c0d8e55 100644 (file)
@@ -3,6 +3,6 @@
 
 -- Group: Trigger Functions
 
-\i triggers/pgq.logutriga.sql
-\i triggers/pgq.sqltriga.sql
+-- \i triggers/pgq.logutriga.sql
+\i triggers/pgq_triggers.sql
 
diff --git a/sql/pgq/triggers/Makefile b/sql/pgq/triggers/Makefile
new file mode 100644 (file)
index 0000000..582b087
--- /dev/null
@@ -0,0 +1,13 @@
+
+include ../../../config.mak
+
+MODULE_big = pgq_triggers
+SRCS = common.c logtriga.c logutriga.c sqltriga.c makesql.c stringutil.c
+OBJS = $(SRCS:.c=.o)
+DATA_built = pgq_triggers.sql
+
+include $(PGXS)
+
+cs:
+       cscope -b -f .cscope.out *.c
+
diff --git a/sql/pgq/triggers/common.c b/sql/pgq/triggers/common.c
new file mode 100644 (file)
index 0000000..7f93c54
--- /dev/null
@@ -0,0 +1,352 @@
+/*
+ * common.c - functions used by all trigger variants.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <postgres.h>
+
+#include <commands/trigger.h>
+#include <catalog/pg_type.h>
+#include <catalog/pg_namespace.h>
+#include <executor/spi.h>
+#include <utils/memutils.h>
+
+#include "common.h"
+#include "stringutil.h"
+
+/*
+ * Module tag
+ */
+#ifdef PG_MODULE_MAGIC
+PG_MODULE_MAGIC;
+#endif
+
+/*
+ * helper for queue insertion.
+ *
+ * does not support NULL arguments.
+ */
+void pgq_simple_insert(const char *queue_name, Datum ev_type, Datum ev_data, Datum ev_extra1)
+{
+       Datum values[4];
+       static void *plan = NULL;
+       int res;
+
+       if (!plan) {
+               const char *sql;
+               Oid   types[4] = { TEXTOID, TEXTOID, TEXTOID, TEXTOID };
+
+               sql = "select pgq.insert_event($1, $2, $3, $4, null, null, null)";
+               plan = SPI_saveplan(SPI_prepare(sql, 4, types));
+               if (plan == NULL)
+                       elog(ERROR, "logtriga: SPI_prepare() failed");
+       }
+       values[0] = DirectFunctionCall1(textin, (Datum)queue_name);
+       values[1] = ev_type;
+       values[2] = ev_data;
+       values[3] = ev_extra1;
+       res = SPI_execute_plan(plan, values, NULL, false, 0);
+       if (res != SPI_OK_SELECT)
+               elog(ERROR, "call of pgq.insert_event failed");
+}
+
+char *pgq_find_table_name(Relation rel)
+{
+       NameData        tname = rel->rd_rel->relname;
+       Oid                     nsoid = rel->rd_rel->relnamespace;
+       char        namebuf[NAMEDATALEN * 2 + 3];
+       HeapTuple   ns_tup;
+       Form_pg_namespace ns_struct;
+       NameData        nspname;
+
+       /* find namespace info */
+       ns_tup = SearchSysCache(NAMESPACEOID,
+                                                       ObjectIdGetDatum(nsoid), 0, 0, 0);
+       if (!HeapTupleIsValid(ns_tup))
+               elog(ERROR, "Cannot find namespace %u", nsoid);
+       ns_struct = (Form_pg_namespace) GETSTRUCT(ns_tup);
+       nspname = ns_struct->nspname;
+
+       /* fill name */
+       sprintf(namebuf, "%s.%s", NameStr(nspname), NameStr(tname));
+
+       ReleaseSysCache(ns_tup);
+       return pstrdup(namebuf);
+}
+
+/*
+ * primary key info
+ */
+
+static MemoryContext tbl_cache_ctx;
+static HTAB *tbl_cache_map;
+
+static const char pkey_sql [] =
+       "SELECT k.attnum, k.attname FROM pg_index i, pg_attribute k"
+       " WHERE i.indrelid = $1 AND k.attrelid = i.indexrelid"
+       "   AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped"
+       " ORDER BY k.attnum";
+static void * pkey_plan;
+
+/*
+ * Prepare utility plans and plan cache.
+ */
+static void
+init_tbl_cache(void)
+{
+       static int init_done = 0;
+       Oid types[1] = { OIDOID };
+       HASHCTL     ctl;
+       int         flags;
+       int         max_tables = 128;
+
+       if (init_done)
+               return;
+
+       /*
+        * Init plans.
+        */
+       pkey_plan = SPI_saveplan(SPI_prepare(pkey_sql, 1, types));
+       if (pkey_plan == NULL)
+               elog(ERROR, "pgq_triggers: SPI_prepare() failed");
+
+       /*
+        * create own context
+        */
+       tbl_cache_ctx = AllocSetContextCreate(TopMemoryContext,
+                                             "pgq_triggers table info",
+                                             ALLOCSET_SMALL_MINSIZE,
+                                             ALLOCSET_SMALL_INITSIZE,
+                                             ALLOCSET_SMALL_MAXSIZE);
+       /*
+        * init pkey cache.
+        */
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(Oid);
+       ctl.entrysize = sizeof(struct PgqTableInfo);
+       ctl.hash = oid_hash;
+       flags = HASH_ELEM | HASH_FUNCTION;
+       tbl_cache_map = hash_create("pgq_triggers pkey cache", max_tables, &ctl, flags);
+
+       init_done = 1;
+}
+
+/*
+ * Create new plan for insertion into current queue table.
+ */
+static void
+fill_tbl_info(Relation rel, struct PgqTableInfo *info)
+{
+       StringInfo pkeys;
+       Datum values[1];
+       const char *name = pgq_find_table_name(rel);
+       TupleDesc desc;
+       HeapTuple row;
+       bool isnull;
+       int res, i, attno;
+
+       values[0] = ObjectIdGetDatum(rel->rd_id);
+       res = SPI_execute_plan(pkey_plan, values, NULL, false, 0);
+       if (res != SPI_OK_SELECT)
+               elog(ERROR, "pkey_plan exec failed");
+       desc = SPI_tuptable->tupdesc;
+
+       pkeys = makeStringInfo();
+       info->n_pkeys = SPI_processed;
+       info->table_name = MemoryContextStrdup(tbl_cache_ctx, name);
+       info->pkey_attno = MemoryContextAlloc(tbl_cache_ctx, info->n_pkeys * sizeof(int));
+
+       for (i = 0; i < SPI_processed; i++) {
+               row = SPI_tuptable->vals[i];
+
+               attno = DatumGetInt16(SPI_getbinval(row, desc, 1, &isnull));
+               name = SPI_getvalue(row, desc, 2);
+               info->pkey_attno[i] = attno;
+               if (i > 0)
+                       appendStringInfoChar(pkeys, ',');
+               appendStringInfoString(pkeys, name);
+       }
+       info->pkey_list = MemoryContextStrdup(tbl_cache_ctx, pkeys->data);
+}
+
+/*
+ * fetch insert plan from cache.
+ */
+struct PgqTableInfo *
+pgq_find_table_info(Relation rel)
+{
+        struct PgqTableInfo *entry;
+        bool did_exist = false;
+
+        init_tbl_cache();
+
+        entry = hash_search(tbl_cache_map, &rel->rd_id, HASH_ENTER, &did_exist);
+        if (!did_exist)
+                fill_tbl_info(rel, entry);
+        return entry;
+}
+
+static void
+parse_newstyle_args(PgqTriggerEvent *ev, TriggerData *tg)
+{
+       int i;
+       /*
+        * parse args
+        */
+       ev->skip = false;
+       ev->queue_name = tg->tg_trigger->tgargs[0];
+       for (i = 1; i < tg->tg_trigger->tgnargs; i++) {
+               const char *arg = tg->tg_trigger->tgargs[i];
+               if (strcmp(arg, "SKIP") == 0)
+                       ev->skip = true;
+               else if (strncmp(arg, "ignore=", 7) == 0)
+                       ev->ignore_list = arg + 7;
+               else if (strncmp(arg, "pkey=", 5) == 0)
+                       ev->pkey_list = arg + 5;
+               else
+                       elog(ERROR, "bad param to pgq trigger");
+       }
+}
+
+static void
+parse_oldstyle_args(PgqTriggerEvent *ev, TriggerData *tg)
+{
+       const char *kpos;
+       int attcnt, i;
+       TupleDesc tupdesc = tg->tg_relation->rd_att;
+
+       ev->skip = false;
+       if (tg->tg_trigger->tgnargs < 2 || tg->tg_trigger->tgnargs > 3)
+               elog(ERROR, "pgq.logtriga must be used with 2 or 3 args");
+       ev->queue_name = tg->tg_trigger->tgargs[0];
+       ev->attkind = tg->tg_trigger->tgargs[1];
+       ev->attkind_len = strlen(ev->attkind);
+       if (tg->tg_trigger->tgnargs > 2)
+               ev->table_name =  tg->tg_trigger->tgargs[2];
+
+
+       /*
+        * Count number of active columns
+        */
+       tupdesc = tg->tg_relation->rd_att;
+       for (i = 0, attcnt = 0; i < tupdesc->natts; i++)
+       {
+               if (!tupdesc->attrs[i]->attisdropped)
+                       attcnt++;
+       }
+
+       /*
+        * look if last pkey column exists
+        */
+       kpos = strrchr(ev->attkind, 'k');
+       if (kpos == NULL)
+               elog(ERROR, "need at least one key column");
+       if (kpos - ev->attkind >= attcnt)
+               elog(ERROR, "key column does not exist");
+}
+
+/*
+ * parse trigger arguments.
+ */
+void pgq_prepare_event(struct PgqTriggerEvent *ev, TriggerData *tg, bool newstyle)
+{
+       memset(ev, 0, sizeof(*ev));
+
+       /*
+        * Check trigger calling conventions
+        */
+       if (!TRIGGER_FIRED_AFTER(tg->tg_event))
+               /* dont care */;
+       if (!TRIGGER_FIRED_FOR_ROW(tg->tg_event))
+               elog(ERROR, "pgq trigger must be fired FOR EACH ROW");
+       if (tg->tg_trigger->tgnargs < 1)
+               elog(ERROR, "pgq trigger must have destination queue as argument");
+
+       /*
+        * check operation type
+        */
+       if (TRIGGER_FIRED_BY_INSERT(tg->tg_event))
+               ev->op_type = 'I';
+       else if (TRIGGER_FIRED_BY_UPDATE(tg->tg_event))
+               ev->op_type = 'U';
+       else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event))
+               ev->op_type = 'D';
+       else
+               elog(ERROR, "unknown event for pgq trigger");
+
+       /*
+        * init data
+        */
+       ev->ev_type = pgq_init_varbuf();
+       ev->ev_data = pgq_init_varbuf();
+       ev->ev_extra1 = pgq_init_varbuf();
+       ev->ev_extra2 = pgq_init_varbuf();
+
+       /*
+        * load table info
+        */
+       ev->info = pgq_find_table_info(tg->tg_relation);
+       ev->table_name = ev->info->table_name;
+       ev->pkey_list = ev->info->pkey_list;
+
+       /*
+        * parse args
+        */
+       if (newstyle)
+               parse_newstyle_args(ev, tg);
+       else
+               parse_oldstyle_args(ev, tg);
+}
+
+
+bool pgqtriga_skip_col(PgqTriggerEvent *ev, TriggerData *tg, int i, int attkind_idx)
+{
+       TupleDesc tupdesc;
+       const char *name;
+
+       if (ev->attkind) {
+               if (attkind_idx >= ev->attkind_len)
+                       return true;
+               return ev->attkind[attkind_idx] == 'i';
+       } else if (ev->ignore_list) {
+               tupdesc = tg->tg_relation->rd_att;
+               if (tupdesc->attrs[i]->attisdropped)
+                       return true;
+               name = NameStr(tupdesc->attrs[i]->attname);
+               return pgq_strlist_contains(ev->ignore_list, name);
+       }
+       return false;
+}
+
+bool pgqtriga_is_pkey(PgqTriggerEvent *ev, TriggerData *tg, int i, int attkind_idx)
+{
+       TupleDesc tupdesc;
+       const char *name;
+
+       if (ev->attkind) {
+               if (attkind_idx >= ev->attkind_len)
+                       return false;
+               return ev->attkind[attkind_idx] == 'k';
+       } else if (ev->pkey_list) {
+               tupdesc = tg->tg_relation->rd_att;
+               if (tupdesc->attrs[i]->attisdropped)
+                       return false;
+               name = NameStr(tupdesc->attrs[i]->attname);
+               return pgq_strlist_contains(ev->pkey_list, name);
+       }
+       return false;
+}
+
diff --git a/sql/pgq/triggers/common.h b/sql/pgq/triggers/common.h
new file mode 100644 (file)
index 0000000..e37f2b9
--- /dev/null
@@ -0,0 +1,51 @@
+struct PgqTriggerEvent {
+       const char *table_name;
+       const char *queue_name;
+       const char *ignore_list;
+       const char *pkey_list;
+       char op_type;
+       bool skip;
+
+       const char *attkind;
+       int attkind_len;
+
+       struct PgqTableInfo *info;
+
+       StringInfo ev_type;
+       StringInfo ev_data;
+       StringInfo ev_extra1;
+       StringInfo ev_extra2;
+};
+typedef struct PgqTriggerEvent PgqTriggerEvent;
+
+void pgq_prepare_event(struct PgqTriggerEvent *ev, TriggerData *tg, bool newstyle);
+
+
+char *pgq_find_table_name(Relation rel);
+void pgq_simple_insert(const char *queue_name, Datum ev_type, Datum ev_data, Datum ev_extra1);
+
+struct PgqColumnInfo {
+       int col_no;
+       char *col_name;
+};
+
+struct PgqTableInfo {
+       Oid oid;
+       char *table_name;
+       const char *pkey_list;
+       int n_pkeys;
+       int *pkey_attno;
+};
+
+struct PgqTableInfo *
+pgq_find_table_info(Relation rel);
+
+
+int pgqtriga_make_sql(PgqTriggerEvent *ev, TriggerData *tg, StringInfo sql);
+
+bool pgqtriga_skip_col(PgqTriggerEvent *ev, TriggerData *tg, int i, int attkind_idx);
+bool pgqtriga_is_pkey(PgqTriggerEvent *ev, TriggerData *tg, int i, int attkind_idx);
+
+
+
+
diff --git a/sql/pgq/triggers/logtriga.c b/sql/pgq/triggers/logtriga.c
new file mode 100644 (file)
index 0000000..f2b0e52
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * logtriga.c - Dumb SQL logging trigger.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <postgres.h>
+
+#include <executor/spi.h>
+#include <commands/trigger.h>
+
+#include "common.h"
+#include "stringutil.h"
+
+PG_FUNCTION_INFO_V1(pgq_logtriga);
+Datum pgq_logtriga(PG_FUNCTION_ARGS);
+
+/*
+ * PGQ log trigger, takes 2 arguments:
+ * 1. queue name to be inserted to.
+ * 2. column type string
+ *
+ * Queue events will be in format:
+ *    ev_type   - operation type, I/U/D
+ *    ev_data   - partial SQL describing operation
+ *    ev_extra1 - table name
+ */
+
+Datum
+pgq_logtriga(PG_FUNCTION_ARGS)
+{
+       TriggerData *tg;
+       PgqTriggerEvent ev;
+
+       /*
+        * Get the trigger call context
+        */
+       if (!CALLED_AS_TRIGGER(fcinfo))
+               elog(ERROR, "pgq.logutriga not called as trigger");
+
+       tg = (TriggerData *) (fcinfo->context);
+
+       if (!TRIGGER_FIRED_AFTER(tg->tg_event))
+               elog(ERROR, "pgq.logtriga must be fired AFTER");
+
+       /*
+        * Connect to the SPI manager
+        */
+       if (SPI_connect() < 0)
+               elog(ERROR, "logtriga: SPI_connect() failed");
+
+       pgq_prepare_event(&ev, tg, false);
+
+       appendStringInfoChar(ev.ev_type, ev.op_type);
+       appendStringInfoString(ev.ev_extra1, ev.info->table_name);
+
+       /*
+        * create sql and insert if interesting
+        */
+       if (pgqtriga_make_sql(&ev, tg, ev.ev_data))
+       {
+               pgq_simple_insert(ev.queue_name,
+                                                 pgq_finish_varbuf(ev.ev_type),
+                                                 pgq_finish_varbuf(ev.ev_data),
+                                                 pgq_finish_varbuf(ev.ev_extra1));
+       }
+
+       if (SPI_finish() < 0)
+               elog(ERROR, "SPI_finish failed");
+
+       return PointerGetDatum(NULL);
+}
+
diff --git a/sql/pgq/triggers/logutriga.c b/sql/pgq/triggers/logutriga.c
new file mode 100644 (file)
index 0000000..c5af2cb
--- /dev/null
@@ -0,0 +1,135 @@
+/*
+ * logutriga.c - Smart trigger that logs urlencoded changes.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <postgres.h>
+#include <executor/spi.h>
+#include <commands/trigger.h>
+
+#include "common.h"
+#include "stringutil.h"
+
+PG_FUNCTION_INFO_V1(pgq_logutriga);
+Datum pgq_logutriga(PG_FUNCTION_ARGS);
+
+static void
+process_row_data(PgqTriggerEvent *ev, TriggerData *tg, HeapTuple row, StringInfo buf)
+{
+       TupleDesc       tupdesc = tg->tg_relation->rd_att;
+       bool first = true;
+       int i;
+       const char *col_ident, *col_value;
+       int attkind_idx = -1;
+
+       for (i = 0; i < tg->tg_relation->rd_att->natts; i++)
+       {
+               /* Skip dropped columns */
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+
+               attkind_idx++;
+
+               if (pgqtriga_skip_col(ev, tg, i, attkind_idx))
+                       continue;
+
+               if (first)
+                       first = false;
+               else
+                       appendStringInfoChar(buf, '&');
+
+               /* quote column name */
+               col_ident = SPI_fname(tupdesc, i + 1);
+               pgq_encode_cstring(buf, col_ident, TBUF_QUOTE_URLENC);
+
+               /* quote column value */
+               col_value = SPI_getvalue(row, tupdesc, i + 1);
+               if (col_value != NULL)
+               {
+                       appendStringInfoChar(buf, '=');
+                       pgq_encode_cstring(buf, col_value, TBUF_QUOTE_URLENC);
+               }
+       }
+}
+
+/*
+ * PgQ log trigger, takes 2 arguments:
+ * 1. queue name to be inserted to.
+ *
+ * Queue events will be in format:
+ *    ev_type   - operation type, I/U/D
+ *    ev_data   - urlencoded column values
+ *    ev_extra1 - table name
+ */
+Datum
+pgq_logutriga(PG_FUNCTION_ARGS)
+{
+       TriggerData *tg;
+       struct PgqTriggerEvent  ev;
+       HeapTuple       row;
+
+       /*
+        * Get the trigger call context
+        */
+       if (!CALLED_AS_TRIGGER(fcinfo))
+               elog(ERROR, "pgq.logutriga not called as trigger");
+
+       tg = (TriggerData *) (fcinfo->context);
+       if (TRIGGER_FIRED_BY_UPDATE(tg->tg_event))
+               row = tg->tg_newtuple;
+       else
+               row = tg->tg_trigtuple;
+
+
+       /*
+        * Connect to the SPI manager
+        */
+       if (SPI_connect() < 0)
+               elog(ERROR, "logtriga: SPI_connect() failed");
+
+       pgq_prepare_event(&ev, tg, true);
+
+       appendStringInfoChar(ev.ev_type, ev.op_type);
+       appendStringInfoChar(ev.ev_type, ':');
+       appendStringInfoString(ev.ev_type, ev.info->pkey_list);
+       appendStringInfoString(ev.ev_extra1, ev.info->table_name);
+
+       /*
+        * create type, data
+        */
+       process_row_data(&ev, tg, row, ev.ev_data);
+
+       /*
+        * Construct the parameter array and insert the log row.
+        */
+       pgq_simple_insert(ev.queue_name,
+                                         pgq_finish_varbuf(ev.ev_type),
+                                         pgq_finish_varbuf(ev.ev_data),
+                                         pgq_finish_varbuf(ev.ev_extra1));
+
+       if (SPI_finish() < 0)
+               elog(ERROR, "SPI_finish failed");
+
+       /*
+        * After trigger ignores result,
+        * before trigger skips event if NULL.
+        */
+       if (TRIGGER_FIRED_AFTER(tg->tg_event) || ev.skip)
+               return PointerGetDatum(NULL);
+       else
+               return PointerGetDatum(row);
+}
+
diff --git a/sql/pgq/triggers/makesql.c b/sql/pgq/triggers/makesql.c
new file mode 100644 (file)
index 0000000..0de7452
--- /dev/null
@@ -0,0 +1,359 @@
+/*
+ * makesql.c - generate partial SQL statement for row change.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Based on Slony-I log trigger:
+ *
+ *   Copyright (c) 2003-2006, PostgreSQL Global Development Group
+ *      Author: Jan Wieck, Afilias USA INC.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <postgres.h>
+#include <executor/spi.h>
+#include <commands/trigger.h>
+#include <catalog/pg_operator.h>
+#include <utils/typcache.h>
+
+#include "common.h"
+#include "stringutil.h"
+
+
+static void
+append_key_eq(StringInfo buf, const char *col_ident, const char *col_value)
+{
+       if (col_value == NULL)
+               elog(ERROR, "logtriga: Unexpected NULL key value");
+
+       pgq_encode_cstring(buf, col_ident, TBUF_QUOTE_IDENT);
+       appendStringInfoChar(buf, '=');
+       pgq_encode_cstring(buf, col_value, TBUF_QUOTE_LITERAL);
+}
+
+static void
+append_normal_eq(StringInfo buf, const char *col_ident, const char *col_value)
+{
+       pgq_encode_cstring(buf, col_ident, TBUF_QUOTE_IDENT);
+       appendStringInfoChar(buf, '=');
+       if (col_value != NULL)
+               pgq_encode_cstring(buf, col_value, TBUF_QUOTE_LITERAL);
+       else
+               appendStringInfoString(buf, "NULL");
+}
+
+static void
+process_insert(PgqTriggerEvent *ev, TriggerData *tg, StringInfo sql)
+{
+       HeapTuple       new_row = tg->tg_trigtuple;
+       TupleDesc       tupdesc = tg->tg_relation->rd_att;
+       int                     i;
+       int                     need_comma = false;
+       int                     attkind_idx;
+
+       /*
+        * Specify all the columns
+        */
+       appendStringInfoChar(sql, '(');
+       attkind_idx = -1;
+       for (i = 0; i < tupdesc->natts; i++)
+       {
+               char *col_ident;
+
+               /* Skip dropped columns */
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+
+               /* Check if allowed by colstring */
+               attkind_idx++;
+               if (pgqtriga_skip_col(ev, tg, i, attkind_idx))
+                       continue;
+
+               if (need_comma)
+                       appendStringInfoChar(sql, ',');
+               else
+                       need_comma = true;
+
+               /* quote column name */
+               col_ident = SPI_fname(tupdesc, i + 1);
+               pgq_encode_cstring(sql, col_ident, TBUF_QUOTE_IDENT);
+       }
+
+       /*
+        * Append the string ") values ("
+        */
+       appendStringInfoString(sql, ") values (");
+
+       /*
+        * Append the values
+        */
+       need_comma = false;
+       attkind_idx = -1;
+       for (i = 0; i < tupdesc->natts; i++)
+       {
+               char *col_value;
+
+               /* Skip dropped columns */
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+
+               /* Check if allowed by colstring */
+               attkind_idx++;
+               if (pgqtriga_skip_col(ev, tg, i, attkind_idx))
+                       continue;
+
+               if (need_comma)
+                       appendStringInfoChar(sql, ',');
+               else
+                       need_comma = true;
+
+               /* quote column value */
+               col_value = SPI_getvalue(new_row, tupdesc, i + 1);
+               if (col_value == NULL)
+                       appendStringInfoString(sql, "null");
+               else
+                       pgq_encode_cstring(sql, col_value, TBUF_QUOTE_LITERAL);
+       }
+
+       /*
+        * Terminate and done
+        */
+       appendStringInfoChar(sql, ')');
+}
+
+static int
+process_update(PgqTriggerEvent *ev, TriggerData *tg, StringInfo sql)
+{
+       HeapTuple       old_row = tg->tg_trigtuple;
+       HeapTuple       new_row = tg->tg_newtuple;
+       TupleDesc       tupdesc = tg->tg_relation->rd_att;
+       Datum           old_value;
+       Datum           new_value;
+       bool            old_isnull;
+       bool            new_isnull;
+
+       char       *col_ident;
+       char       *col_value;
+       int                     i;
+       int                     need_comma = false;
+       int                     need_and = false;
+       int                     attkind_idx;
+       int                     ignore_count = 0;
+
+       attkind_idx = -1;
+       for (i = 0; i < tupdesc->natts; i++)
+       {
+               /*
+                * Ignore dropped columns
+                */
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+
+               attkind_idx++;
+
+               old_value = SPI_getbinval(old_row, tupdesc, i + 1, &old_isnull);
+               new_value = SPI_getbinval(new_row, tupdesc, i + 1, &new_isnull);
+
+               /*
+                * If old and new value are NULL, the column is unchanged
+                */
+               if (old_isnull && new_isnull)
+                       continue;
+
+               /*
+                * If both are NOT NULL, we need to compare the values and skip
+                * setting the column if equal
+                */
+               if (!old_isnull && !new_isnull)
+               {
+                       Oid                     opr_oid;
+                       FmgrInfo   *opr_finfo_p;
+
+                       /*
+                        * Lookup the equal operators function call info using the
+                        * typecache if available
+                        */
+                       TypeCacheEntry *type_cache;
+
+                       type_cache = lookup_type_cache(SPI_gettypeid(tupdesc, i + 1),
+                                                         TYPECACHE_EQ_OPR | TYPECACHE_EQ_OPR_FINFO);
+                       opr_oid = type_cache->eq_opr;
+                       if (opr_oid == ARRAY_EQ_OP)
+                               opr_oid = InvalidOid;
+                       else
+                               opr_finfo_p = &(type_cache->eq_opr_finfo);
+
+                       /*
+                        * If we have an equal operator, use that to do binary
+                        * comparision. Else get the string representation of both
+                        * attributes and do string comparision.
+                        */
+                       if (OidIsValid(opr_oid))
+                       {
+                               if (DatumGetBool(FunctionCall2(opr_finfo_p,
+                                                                                          old_value, new_value)))
+                                       continue;
+                       }
+                       else
+                       {
+                               char       *old_strval = SPI_getvalue(old_row, tupdesc, i + 1);
+                               char       *new_strval = SPI_getvalue(new_row, tupdesc, i + 1);
+
+                               if (strcmp(old_strval, new_strval) == 0)
+                                       continue;
+                       }
+               }
+
+               if (pgqtriga_skip_col(ev, tg, i, attkind_idx))
+               {
+                       /* this change should be ignored */
+                       ignore_count++;
+                       continue;
+               }
+
+               if (need_comma)
+                       appendStringInfoChar(sql, ',');
+               else
+                       need_comma = true;
+
+               col_ident = SPI_fname(tupdesc, i + 1);
+               col_value = SPI_getvalue(new_row, tupdesc, i + 1);
+
+               append_normal_eq(sql, col_ident, col_value);
+       }
+
+       /*
+        * It can happen that the only UPDATE an application does is to set a
+        * column to the same value again. In that case, we'd end up here with
+        * no columns in the SET clause yet. We add the first key column here
+        * with it's old value to simulate the same for the replication
+        * engine.
+        */
+       if (!need_comma)
+       {
+               /* there was change in ignored columns, skip whole event */
+               if (ignore_count > 0)
+                       return 0;
+
+               for (i = 0, attkind_idx = -1; i < tupdesc->natts; i++)
+               {
+                       if (tupdesc->attrs[i]->attisdropped)
+                               continue;
+
+                       attkind_idx++;
+                       if (pgqtriga_is_pkey(ev, tg, i, attkind_idx))
+                               break;
+               }
+               col_ident = SPI_fname(tupdesc, i + 1);
+               col_value = SPI_getvalue(old_row, tupdesc, i + 1);
+
+               append_key_eq(sql, col_ident, col_value);
+       }
+
+       appendStringInfoString(sql, " where ");
+
+       for (i = 0, attkind_idx = -1; i < tupdesc->natts; i++)
+       {
+               /*
+                * Ignore dropped columns
+                */
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+
+               attkind_idx++;
+               if (!pgqtriga_is_pkey(ev, tg, i, attkind_idx))
+                       continue;
+
+               col_ident = SPI_fname(tupdesc, i + 1);
+               col_value = SPI_getvalue(old_row, tupdesc, i + 1);
+
+               if (need_and)
+                       appendStringInfoString(sql, " and ");
+               else
+                       need_and = true;
+
+               append_key_eq(sql, col_ident, col_value);
+       }
+       return 1;
+}
+
+static void
+process_delete(PgqTriggerEvent *ev, TriggerData *tg, StringInfo sql)
+{
+       HeapTuple       old_row = tg->tg_trigtuple;
+       TupleDesc       tupdesc = tg->tg_relation->rd_att;
+       char       *col_ident;
+       char       *col_value;
+       int                     i;
+       int                     need_and = false;
+       int                     attkind_idx;
+
+       for (i = 0, attkind_idx = -1; i < tupdesc->natts; i++)
+       {
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+
+               attkind_idx++;
+               if (!pgqtriga_is_pkey(ev, tg, i, attkind_idx))
+                       continue;
+               col_ident = SPI_fname(tupdesc, i + 1);
+               col_value = SPI_getvalue(old_row, tupdesc, i + 1);
+
+               if (need_and)
+                       appendStringInfoString(sql, " and ");
+               else
+                       need_and = true;
+
+               append_key_eq(sql, col_ident, col_value);
+       }
+}
+
+int
+pgqtriga_make_sql(PgqTriggerEvent *ev, TriggerData *tg, StringInfo sql)
+{
+       TupleDesc       tupdesc;
+       int                     i;
+       int                     attcnt;
+       int                     need_event = 1;
+
+       tupdesc = tg->tg_relation->rd_att;
+
+       /*
+        * Count number of active columns
+        */
+       for (i = 0, attcnt = 0; i < tupdesc->natts; i++)
+       {
+               if (tupdesc->attrs[i]->attisdropped)
+                       continue;
+               attcnt++;
+       }
+
+       /*
+        * Determine cmdtype and op_data depending on the command type
+        */
+       if (TRIGGER_FIRED_BY_INSERT(tg->tg_event)) {
+               //appendStringInfoChar(op_type, 'I');
+               process_insert(ev, tg, sql);
+       } else if (TRIGGER_FIRED_BY_UPDATE(tg->tg_event)) {
+               //appendStringInfoChar(op_type, 'U');
+               need_event = process_update(ev, tg, sql);
+       } else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event)) {
+               //appendStringInfoChar(op_type, 'D');
+               process_delete(ev, tg, sql);
+       } else
+               elog(ERROR, "logtriga fired for unhandled event");
+
+       return need_event;
+}
+
diff --git a/sql/pgq/triggers/pgq_triggers.sql.in b/sql/pgq/triggers/pgq_triggers.sql.in
new file mode 100644 (file)
index 0000000..df29599
--- /dev/null
@@ -0,0 +1,81 @@
+
+-- ----------------------------------------------------------------------
+-- Function: pgq.logtriga()
+--
+--      Non-automatic SQL trigger.  It puts row data in partial SQL form into
+--      queue.  It does not aut-detect table structure, it needs to be passed
+--      as trigger arg.
+--
+-- Parameters:
+--      arg1 - queue name
+--      arg2 - columnt type spec string where each column corresponds to one char (k/v/i).
+--              if spec string is shorter than column list, rest of columns default to 'i'.
+--
+-- Column types:
+--      k   - pkey column
+--      v   - normal data column
+--      i   - ignore column
+--
+-- Queue event fields:
+--    ev_type     - I/U/D
+--    ev_data     - partial SQL statement
+--    ev_extra1   - table name
+--
+-- ----------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION pgq.logtriga() RETURNS trigger
+AS 'MODULE_PATHNAME', 'pgq_logtriga' LANGUAGE C;
+
+-- ----------------------------------------------------------------------
+-- Function: pgq.sqltriga()
+--
+--      Automatic SQL trigger.  It puts row data in partial SQL form into
+--      queue.  It autodetects table structure.
+--
+-- Parameters:
+--    arg1 - queue name
+--    argX - any number of optional arg, in any order
+--
+-- Optinal arguments:
+--      SKIP                - The actual operation should be skipped
+--      ignore=col1[,col2]  - don't look at the specified arguments
+--      pkey=col1[,col2]    - Set pkey fields for the table, autodetection will be skipped
+--
+-- Queue event fields:
+--    ev_type     - I/U/D
+--    ev_data     - partial SQL statement
+--    ev_extra1   - table name
+--
+-- ----------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION pgq.sqltriga() RETURNS trigger
+AS 'MODULE_PATHNAME', 'pgq_sqltriga' LANGUAGE C;
+
+-- ----------------------------------------------------------------------
+-- Function: pgq.logutriga()
+--
+--      Trigger function that puts row data in urlencoded into queue.
+--
+-- Trigger parameters:
+--      arg1 - queue name
+--      argX - any number of optional arg, in any order
+--
+-- Optinal arguments:
+--      SKIP                - The actual operation should be skipped
+--      ignore=col1[,col2]  - don't look at the specified arguments
+--      pkey=col1[,col2]    - Set pkey fields for the table, autodetection will be skipped
+--
+-- Queue event fields:
+--      ev_type      - I/U/D ':' pkey_column_list
+--      ev_data      - column values urlencoded
+--      ev_extra1    - table name
+--
+-- Regular listen trigger example:
+-- >  CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
+-- >  FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname');
+--
+-- Redirect trigger example:
+-- >   CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
+-- >   FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname', 'SKIP');
+-- ----------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION pgq.logutriga() RETURNS TRIGGER
+AS 'MODULE_PATHNAME', 'pgq_logutriga' LANGUAGE C;
+
diff --git a/sql/pgq/triggers/sqltriga.c b/sql/pgq/triggers/sqltriga.c
new file mode 100644 (file)
index 0000000..a1bb05b
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * sqltriga.c - Smart SQL-logging trigger.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <postgres.h>
+#include <executor/spi.h>
+#include <commands/trigger.h>
+
+#include "common.h"
+#include "stringutil.h"
+
+PG_FUNCTION_INFO_V1(pgq_sqltriga);
+Datum pgq_sqltriga(PG_FUNCTION_ARGS);
+
+/*
+ * PgQ log trigger, takes 2 arguments:
+ * 1. queue name to be inserted to.
+ *
+ * Queue events will be in format:
+ *    ev_type   - operation type, I/U/D
+ *    ev_data   - urlencoded column values
+ *    ev_extra1 - table name
+ */
+Datum
+pgq_sqltriga(PG_FUNCTION_ARGS)
+{
+       TriggerData *tg;
+       PgqTriggerEvent ev;
+
+       /*
+        * Get the trigger call context
+        */
+       if (!CALLED_AS_TRIGGER(fcinfo))
+               elog(ERROR, "pgq.logutriga not called as trigger");
+
+       /*
+        * Connect to the SPI manager
+        */
+       if (SPI_connect() < 0)
+               elog(ERROR, "logtriga: SPI_connect() failed");
+
+       tg = (TriggerData *) (fcinfo->context);
+       pgq_prepare_event(&ev, tg, true);
+
+       appendStringInfoChar(ev.ev_type, ev.op_type);
+       appendStringInfoString(ev.ev_extra1, ev.info->table_name);
+
+       /*
+        * create sql and insert if interesting
+        */
+       if (pgqtriga_make_sql(&ev, tg, ev.ev_data))
+       {
+               pgq_simple_insert(ev.queue_name,
+                                                 pgq_finish_varbuf(ev.ev_type),
+                                                 pgq_finish_varbuf(ev.ev_data),
+                                                 pgq_finish_varbuf(ev.ev_extra1));
+       }
+
+       if (SPI_finish() < 0)
+               elog(ERROR, "SPI_finish failed");
+
+       /*
+        * After trigger ignores result,
+        * before trigger skips event if NULL.
+        */
+       if (TRIGGER_FIRED_AFTER(tg->tg_event) || ev.skip)
+               return PointerGetDatum(NULL);
+       else if (TRIGGER_FIRED_BY_UPDATE(tg->tg_event))
+               return PointerGetDatum(tg->tg_newtuple);
+       else
+               return PointerGetDatum(tg->tg_trigtuple);
+}
+
diff --git a/sql/pgq/triggers/stringutil.c b/sql/pgq/triggers/stringutil.c
new file mode 100644 (file)
index 0000000..2d7a378
--- /dev/null
@@ -0,0 +1,278 @@
+/*
+ * stringutil.c - some tools for string handling
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <postgres.h>
+#include <lib/stringinfo.h>
+#include <mb/pg_wchar.h>
+#include <parser/keywords.h>
+#include <utils/memutils.h>
+
+#include "stringutil.h"
+
+#ifndef SET_VARSIZE
+#define SET_VARSIZE(x, len) VARATT_SIZEP(x) = len
+#endif
+
+
+StringInfo pgq_init_varbuf(void)
+{
+       StringInfo buf;
+       buf = makeStringInfo();
+       appendStringInfoString(buf, "XXXX");
+       return buf;
+}
+
+Datum pgq_finish_varbuf(StringInfo buf)
+{
+       SET_VARSIZE(buf->data, buf->len);
+       return PointerGetDatum(buf->data);
+}
+
+
+/*
+ * Find a string in comma-separated list.
+ *
+ * It does not support space inside tokens.
+ */
+bool pgq_strlist_contains(const char *liststr, const char *str)
+{
+       int c, len = strlen(str);
+       const char *p, *listpos = liststr;
+       
+loop:
+       /* find string fragment, later check if actual token */
+       p = strstr(listpos, str);
+       if (p == NULL)
+               return false;
+
+       /* move listpos further */
+       listpos = p + len;
+       /* survive len=0 and avoid unneccesary compare */
+       if (*listpos)
+               listpos++;
+
+       /* check previous symbol */
+       if (p > liststr) {
+               c = *(p - 1);
+               if (!isspace(c) && c != ',')
+                       goto loop;
+       }
+
+       /* check following symbol */
+       c = p[len];
+       if (c != 0 && !isspace(c) && c != ',')
+               goto loop;
+
+       return true;
+}
+
+/*
+ * quoting
+ */
+
+static int pgq_urlencode(char *dst, const uint8 *src, int srclen)
+{
+       static const char hextbl[] = "0123456789abcdef";
+       const uint8 *end = src + srclen;
+       char *p = dst;
+       while (src < end) {
+               unsigned c = *src++;
+               if (c == ' ') {
+                       *p++ = '+';
+               } else if ((c >= '0' && c <= '9')
+                       || (c >= 'A' && c <= 'Z')
+                       || (c >= 'a' && c <= 'z')
+                       || c == '_' || c == '.')
+               {
+                       *p++ = c;
+               } else {
+                       *p++ = '%';
+                       *p++ = hextbl[c >> 4];
+                       *p++ = hextbl[c & 15];
+               }
+       }
+       return p - dst;
+}
+
+static int pgq_quote_literal(char *dst, const uint8 *src, int srclen)
+{
+       const uint8 *cp1;
+       char       *cp2;
+       int                     wl;
+
+       cp1 = src;
+       cp2 = dst;
+
+       *cp2++ = '\'';
+       while (srclen > 0)
+       {
+               if ((wl = pg_mblen((const char *)cp1)) != 1)
+               {
+                       srclen -= wl;
+
+                       while (wl-- > 0)
+                               *cp2++ = *cp1++;
+                       continue;
+               }
+
+               if (*cp1 == '\'')
+                       *cp2++ = '\'';
+               if (*cp1 == '\\')
+                       *cp2++ = '\\';
+               *cp2++ = *cp1++;
+               srclen--;
+       }
+
+       *cp2++ = '\'';
+
+       return cp2 - dst;
+}
+
+
+/*
+ * slon_quote_identifier - Quote an identifier only if needed
+ */
+static int
+pgq_quote_ident(char *dst, const uint8 *src, int srclen)
+{
+       /*
+        * Can avoid quoting if ident starts with a lowercase letter or
+        * underscore and contains only lowercase letters, digits, and
+        * underscores, *and* is not any SQL keyword.  Otherwise, supply
+        * quotes.
+        */
+       int                     nquotes = 0;
+       bool            safe;
+       const char *ptr;
+       char       *optr;
+       char ident[NAMEDATALEN + 1];
+
+       /* expect idents be not bigger than NAMEDATALEN */
+       if (srclen > NAMEDATALEN)
+               srclen = NAMEDATALEN;
+       memcpy(ident, src, srclen);
+       ident[srclen] = 0;
+
+       /*
+        * would like to use <ctype.h> macros here, but they might yield
+        * unwanted locale-specific results...
+        */
+       safe = ((ident[0] >= 'a' && ident[0] <= 'z') || ident[0] == '_');
+
+       for (ptr = ident; *ptr; ptr++)
+       {
+               char            ch = *ptr;
+
+               if ((ch >= 'a' && ch <= 'z') ||
+                       (ch >= '0' && ch <= '9') ||
+                       (ch == '_'))
+                       continue; /* okay */
+
+               safe = false;
+               if (ch == '"')
+                       nquotes++;
+       }
+
+       if (safe)
+       {
+               /*
+                * Check for keyword.  This test is overly strong, since many of
+                * the "keywords" known to the parser are usable as column names,
+                * but the parser doesn't provide any easy way to test for whether
+                * an identifier is safe or not... so be safe not sorry.
+                *
+                * Note: ScanKeywordLookup() does case-insensitive comparison, but
+                * that's fine, since we already know we have all-lower-case.
+                */
+               if (ScanKeywordLookup(ident) != NULL)
+                       safe = false;
+       }
+
+       optr = dst;
+       if (!safe)
+               *optr++ = '"';
+
+       for (ptr = ident; *ptr; ptr++)
+       {
+               char            ch = *ptr;
+
+               if (ch == '"')
+                       *optr++ = '"';
+               *optr++ = ch;
+       }
+       if (!safe)
+               *optr++ = '"';
+
+       return optr - dst;
+}
+
+static char *start_append(StringInfo buf, int alloc_len)
+{
+       enlargeStringInfo(buf, alloc_len);
+       return buf->data + buf->len;
+}
+
+static void finish_append(StringInfo buf, int final_len)
+{
+       if (buf->len + final_len > buf->maxlen)
+               elog(FATAL, "buffer overflow");
+       buf->len += final_len;
+}
+
+
+static void
+tbuf_encode_data(StringInfo buf,
+                                const uint8 *data, int len,
+                                enum PgqEncode encoding)
+{
+       int dlen = 0;
+       char *dst;
+
+       switch (encoding) {
+       case TBUF_QUOTE_LITERAL:
+               dst = start_append(buf, len * 2 + 2);
+               dlen = pgq_quote_literal(dst, data, len);
+               break;
+
+       case TBUF_QUOTE_IDENT:
+               dst = start_append(buf, len * 2 + 2);
+               dlen = pgq_quote_ident(dst, data, len);
+               break;
+
+       case TBUF_QUOTE_URLENC:
+               dst = start_append(buf, len * 3 + 2);
+               dlen = pgq_urlencode(dst, data, len);
+               break;
+
+       default:
+               elog(ERROR, "bad encoding");
+       }
+
+       finish_append(buf, dlen);
+}
+
+void
+pgq_encode_cstring(StringInfo tbuf,
+                                       const char *str,
+                                       enum PgqEncode encoding)
+{
+       if (str == NULL)
+               elog(ERROR, "tbuf_encode_cstring: NULL");
+       tbuf_encode_data(tbuf, (const uint8 *)str, strlen(str), encoding);
+}
+
diff --git a/sql/pgq/triggers/stringutil.h b/sql/pgq/triggers/stringutil.h
new file mode 100644 (file)
index 0000000..8a55c76
--- /dev/null
@@ -0,0 +1,14 @@
+
+enum PgqEncode {
+       TBUF_QUOTE_IDENT,
+       TBUF_QUOTE_LITERAL,
+       TBUF_QUOTE_URLENC,
+};
+
+StringInfo pgq_init_varbuf(void);
+Datum pgq_finish_varbuf(StringInfo buf);
+bool pgq_strlist_contains(const char *liststr, const char *str);
+void pgq_encode_cstring(StringInfo tbuf,
+                                               const char *str,
+                                               enum PgqEncode encoding);
+