londiste,pgq: Support & use TRUNCATE trigger by default.
authorMarko Kreen <markokr@gmail.com>
Mon, 6 Sep 2010 12:57:13 +0000 (15:57 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 6 Sep 2010 13:02:59 +0000 (16:02 +0300)
Event format:
 ev_type   - R
 ev_extra1 - table name

Works only on Postgres 8.4+.

Patch by Hannu Krosing.

13 files changed:
python/londiste/playback.py
sql/londiste/expected/londiste_provider.out
sql/londiste/functions/londiste.local_add_table.sql
sql/londiste/sql/londiste_provider.sql
sql/pgq/expected/logutriga.out
sql/pgq/expected/sqltriga.out
sql/pgq/sql/logutriga.sql
sql/pgq/sql/sqltriga.sql
sql/pgq/triggers/common.c
sql/pgq/triggers/common.h
sql/pgq/triggers/logutriga.c
sql/pgq/triggers/makesql.c
sql/pgq/triggers/sqltriga.c

index 0d13a81b19c29bf084b2faa81f7d2ef08a40346e..499728ad400ae24a878dc281c3d4eb6b5f67bb02 100644 (file)
@@ -476,7 +476,7 @@ class Replicator(CascadedWorker):
             self.handle_data_event(ev, dst_curs)
         elif ev.type[:2] in ('I:', 'U:', 'D:'):
             self.handle_data_event(ev, dst_curs)
-        elif ev.type == "TRUNCATE":
+        elif ev.type == "R":
             self.flush_sql(dst_curs)
             self.handle_truncate_event(ev, dst_curs)
         elif ev.type == 'EXECUTE':
@@ -521,6 +521,10 @@ class Replicator(CascadedWorker):
             return
 
         fqname = skytools.quote_fqident(ev.extra1)
+        if dst_curs.connection.server_version >= 80400:
+            sql = "TRUNCATE ONLY %s;" % fqname
+        else:
+            sql = "TRUNCATE %s;" % fqname
         sql = "TRUNCATE %s;" % fqname
 
         self.flush_sql(dst_curs)
@@ -572,7 +576,7 @@ class Replicator(CascadedWorker):
 
     def interesting(self, ev):
         """See if event is interesting."""
-        if ev.type not in ('I', 'U', 'D'):
+        if ev.type not in ('I', 'U', 'D', 'R'):
             raise Exception('bug - bad event type in .interesting')
         t = self.get_table_by_name(ev.extra1)
         if not t:
index e7c21fbb4c90d4b843ce0d2a39f8bfd128a74455..877618ddf4b8b70e879d094da58df3da8ebbd5c7 100644 (file)
@@ -41,11 +41,12 @@ select * from londiste.local_add_table('aset', 'public.testdata');
       200 | Table added: public.testdata
 (1 row)
 
-select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass;
-     tgname     
-----------------
+select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass order by 1;
+         tgname          
+-------------------------
  _londiste_aset
-(1 row)
+ _londiste_aset_truncate
+(2 rows)
 
 insert into testdata (txt) values ('test-data');
 select * from londiste.get_table_list('aset');
@@ -113,9 +114,11 @@ select * from londiste.local_add_table('aset', 'public.trg_test', array['ev_extr
 (1 row)
 
 insert into trg_test values (1, 'data');
+truncate trg_test;
 select ev_id, ev_type, ev_data, ev_extra1, ev_extra4 from pgq.event_template where ev_extra1 = 'public.trg_test';
  ev_id | ev_type |           ev_data            |    ev_extra1    | ev_extra4 
 -------+---------+------------------------------+-----------------+-----------
      5 | I       | (id,txt) values ('1','data') | public.trg_test | test=data
-(1 row)
+     6 | R       |                              | public.trg_test | 
+(2 rows)
 
index 4fd0ddbec1a1bf5f4dab932f7c1c523d34e8e75d..d89a2fc8c9ca4a76c9680e08f7123d7d7e9d8cfe 100644 (file)
@@ -21,6 +21,8 @@ declare
     new_state text;
 
     logtrg_name text;
+    trunctrg_name text;
+    pgversion int;
     logtrg_previous text;
     logtrg text;
     tbl record;
@@ -101,7 +103,7 @@ begin
         return;
     end if;
 
-    -- create trigger if it does not exists already
+    -- create Ins/Upd/Del trigger if it does not exists already
     logtrg_name := '_londiste_' || i_queue_name;
     perform 1 from pg_catalog.pg_trigger
         where tgrelid = londiste.find_table_oid(fq_table_name)
@@ -119,6 +121,22 @@ begin
         execute logtrg;
     end if;
 
+    -- create tRuncate trigger if it does not exists already
+    show server_version_num into pgversion;
+    if pgversion >= 80400 then
+        trunctrg_name  := '_londiste_' || i_queue_name || '_truncate';
+        perform 1 from pg_catalog.pg_trigger
+          where tgrelid = londiste.find_table_oid(fq_table_name)
+            and tgname = trunctrg_name;
+        if not found then
+            logtrg := 'create trigger ' || quote_ident(trunctrg_name)
+                || ' after truncate on ' || londiste.quote_fqname(fq_table_name)
+                || ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
+                || ')';
+            execute logtrg;
+        end if;
+    end if;
+
     -- Check that no trigger exists on the target table that will get fired
     -- before londiste one (this could have londiste replicate data
     -- out-of-order
index 6530737985b4a82cbde7efc7d1ac78818406cd1c..63a62f847edf48a6f661d7b752bb98ff063c832b 100644 (file)
@@ -21,7 +21,7 @@ select * from pgq_node.create_node('aset', 'root', 'rnode', 'londiste_root', nul
 
 select * from londiste.local_add_table('aset', 'public.testdata_nopk');
 select * from londiste.local_add_table('aset', 'public.testdata');
-select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass;
+select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass order by 1;
 insert into testdata (txt) values ('test-data');
 select * from londiste.get_table_list('aset');
 select * from londiste.local_show_missing('aset');
@@ -42,6 +42,7 @@ create table trg_test (
 
 select * from londiste.local_add_table('aset', 'public.trg_test', array['ev_extra4=''test='' || txt']);
 insert into trg_test values (1, 'data');
+truncate trg_test;
 select ev_id, ev_type, ev_data, ev_extra1, ev_extra4 from pgq.event_template where ev_extra1 = 'public.trg_test';
 
 
index 65ebd544d3e153749fa9178dc977669eb17a2e08..597960206482532d4f0ae0bd7bd06b664dd491dd 100644 (file)
@@ -146,3 +146,9 @@ select * from when_test;
  foo  |    2 | 
 (1 row)
 
+-- test truncate
+create trigger trunc_triga after truncate on when_test
+for each statement execute procedure pgq.logutriga('que3');
+truncate when_test;
+NOTICE:  insert_event(que3, R, , public.when_test)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
index 139ab1d49775d8b03c1fe4e71bdf2e0c0b99d026..8140051180d84c59c92a95070f0e9035b8dce9d6 100644 (file)
@@ -141,3 +141,9 @@ CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
 delete from custom_expr;
 NOTICE:  insert_event(que3, bat, dat1='foo', test=foo)
 CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
+-- test truncate
+create trigger customtrc_triga after truncate on custom_expr
+for each statement execute procedure pgq.sqltriga('que3');
+truncate custom_expr;
+NOTICE:  insert_event(que3, R, , public.custom_expr)
+CONTEXT:  SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
index 647d6b80ffe25f44d36887f6036607358a0725df..428ee871dedc97b960127246cf2d0d4c33a987ba 100644 (file)
@@ -109,4 +109,7 @@ for each row execute procedure pgq.logutriga('que3', 'when=current_user=''random
 insert into when_test values ('foo', '2');
 select * from when_test;
 
-
+-- test truncate
+create trigger trunc_triga after truncate on when_test
+for each statement execute procedure pgq.logutriga('que3');
+truncate when_test;
index 9e655b125167f3ea64138cc58b00199cec7acab4..e39d445ca7d02c6818d035640203f02acea29d9c 100644 (file)
@@ -105,4 +105,7 @@ insert into custom_expr values ('foo', '2');
 update custom_expr set dat3 = 'bat';
 delete from custom_expr;
 
-
+-- test truncate
+create trigger customtrc_triga after truncate on custom_expr
+for each statement execute procedure pgq.sqltriga('que3');
+truncate custom_expr;
index 0a1d6c8177ca061250bd5fa853d99d8b5fbf882b..f43efad7693cde40058ba0b89aeaaad2f8752523 100644 (file)
@@ -425,6 +425,14 @@ static void parse_newstyle_args(PgqTriggerEvent *ev, TriggerData *tg)
                        elog(ERROR, "bad param to pgq trigger");
        }
 
+       if (ev->op_type == 'R') {
+               if (ev->tgargs->ignore_list)
+                       elog(ERROR, "Column ignore does not make sense for truncate trigger");
+               if (ev->tgargs->pkey_list)
+                       elog(ERROR, "Custom pkey_list does not make sense for truncate trigger");
+               if (ev->tgargs->backup)
+                       elog(ERROR, "Backup does not make sense for truncate trigger");
+       }
 }
 
 static void parse_oldstyle_args(PgqTriggerEvent *ev, TriggerData *tg)
@@ -471,8 +479,12 @@ void pgq_prepare_event(struct PgqTriggerEvent *ev, TriggerData *tg, bool newstyl
         */
        if (!TRIGGER_FIRED_AFTER(tg->tg_event))
                /* dont care */ ;
-       if (!TRIGGER_FIRED_FOR_ROW(tg->tg_event))
-               elog(ERROR, "pgq trigger must be fired FOR EACH ROW");
+       if (TRIGGER_FIRED_BY_TRUNCATE(tg->tg_event)) {
+               if (!TRIGGER_FIRED_FOR_STATEMENT(tg->tg_event))
+                       elog(ERROR, "pgq tRuncate trigger must be fired FOR EACH STATEMENT");
+       } else if (!TRIGGER_FIRED_FOR_ROW(tg->tg_event)) {
+               elog(ERROR, "pgq Ins/Upd/Del trigger must be fired FOR EACH ROW");
+       }
        if (tg->tg_trigger->tgnargs < 1)
                elog(ERROR, "pgq trigger must have destination queue as argument");
 
@@ -485,6 +497,8 @@ void pgq_prepare_event(struct PgqTriggerEvent *ev, TriggerData *tg, bool newstyl
                ev->op_type = 'U';
        else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event))
                ev->op_type = 'D';
+       else if (TRIGGER_FIRED_BY_TRUNCATE(tg->tg_event))
+               ev->op_type = 'R';
        else
                elog(ERROR, "unknown event for pgq trigger");
 
@@ -668,6 +682,9 @@ static void make_query(struct PgqTriggerEvent *ev, int fld, const char *arg)
        Oid tgoid = tg->tg_trigger->tgoid;
        const char *pfx = "select ";
 
+       if (ev->op_type == 'R')
+               elog(ERROR, "Custom expressions do not make sense for truncater trigger");
+
        /* make sure tgargs exists */
        if (!ev->tgargs)
                ev->tgargs = find_trigger_info(ev->info, tgoid, true);
index 64ee4d117af578134ac19fbd6b3400416895c780..78a4e7f65e29b6d00c437afbfdf96703b0b09cc1 100644 (file)
@@ -93,3 +93,7 @@ int pgqtriga_make_sql(PgqTriggerEvent *ev, StringInfo sql);
 /* logutriga.c */
 void pgq_urlenc_row(PgqTriggerEvent *ev, HeapTuple row, StringInfo buf);
 
+#ifndef TRIGGER_FIRED_BY_TRUNCATE
+#define TRIGGER_FIRED_BY_TRUNCATE(tg)  0
+#endif
+
index 1c105819acb18ab21a6680cbaa21aa0137b7de5a..8e51b06414863dba409688a605703ed784025dc3 100644 (file)
@@ -135,6 +135,9 @@ void pgq_urlenc_row(PgqTriggerEvent *ev, HeapTuple row, StringInfo buf)
        const char *col_ident, *col_value;
        int attkind_idx = -1;
 
+       if (ev->op_type == 'R')
+               return;
+
        for (i = 0; i < tg->tg_relation->rd_att->natts; i++) {
                /* Skip dropped columns */
                if (tupdesc->attrs[i]->attisdropped)
@@ -202,10 +205,12 @@ Datum pgq_logutriga(PG_FUNCTION_ARGS)
 
        pgq_prepare_event(&ev, tg, true);
 
-       appendStringInfoChar(ev.field[EV_TYPE], ev.op_type);
-       appendStringInfoChar(ev.field[EV_TYPE], ':');
-       appendStringInfoString(ev.field[EV_TYPE], ev.pkey_list);
        appendStringInfoString(ev.field[EV_EXTRA1], ev.info->table_name);
+       appendStringInfoChar(ev.field[EV_TYPE], ev.op_type);
+       if (ev.op_type != 'R') {
+               appendStringInfoChar(ev.field[EV_TYPE], ':');
+               appendStringInfoString(ev.field[EV_TYPE], ev.pkey_list);
+       }
 
        if (is_interesting_change(&ev, tg)) {
                /*
index de4ab5659520442cb9c18d18d922d45777a8b716..7dc85a81eef2b3514f3083b7deaa685643435179 100644 (file)
@@ -333,6 +333,8 @@ int pgqtriga_make_sql(PgqTriggerEvent *ev, StringInfo sql)
                need_event = process_update(ev, sql);
        } else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event)) {
                process_delete(ev, sql);
+       } else if (TRIGGER_FIRED_BY_TRUNCATE(tg->tg_event)) {
+               /* nothing to do for truncate */
        } else
                elog(ERROR, "logtriga fired for unhandled event");
 
index 056dc343c4e7b6271e4e8f9d93b51e000049bb8d..f895a1d8d942a6a30f00087e16a83266c263a5d2 100644 (file)
@@ -32,7 +32,7 @@ Datum pgq_sqltriga(PG_FUNCTION_ARGS);
  * 1. queue name to be inserted to.
  *
  * Queue events will be in format:
- *    ev_type   - operation type, I/U/D
+ *    ev_type   - operation type, I/U/D/R
  *    ev_data   - urlencoded column values
  *    ev_extra1 - table name
  *    ev_extra2 - optional urlencoded backup