self.handle_data_event(ev, dst_curs)
elif ev.type[:2] in ('I:', 'U:', 'D:'):
self.handle_data_event(ev, dst_curs)
- elif ev.type == "TRUNCATE":
+ elif ev.type == "R":
self.flush_sql(dst_curs)
self.handle_truncate_event(ev, dst_curs)
elif ev.type == 'EXECUTE':
return
fqname = skytools.quote_fqident(ev.extra1)
+ if dst_curs.connection.server_version >= 80400:
+ sql = "TRUNCATE ONLY %s;" % fqname
+ else:
+ sql = "TRUNCATE %s;" % fqname
sql = "TRUNCATE %s;" % fqname
self.flush_sql(dst_curs)
def interesting(self, ev):
"""See if event is interesting."""
- if ev.type not in ('I', 'U', 'D'):
+ if ev.type not in ('I', 'U', 'D', 'R'):
raise Exception('bug - bad event type in .interesting')
t = self.get_table_by_name(ev.extra1)
if not t:
200 | Table added: public.testdata
(1 row)
-select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass;
- tgname
-----------------
+select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass order by 1;
+ tgname
+-------------------------
_londiste_aset
-(1 row)
+ _londiste_aset_truncate
+(2 rows)
insert into testdata (txt) values ('test-data');
select * from londiste.get_table_list('aset');
(1 row)
insert into trg_test values (1, 'data');
+truncate trg_test;
select ev_id, ev_type, ev_data, ev_extra1, ev_extra4 from pgq.event_template where ev_extra1 = 'public.trg_test';
ev_id | ev_type | ev_data | ev_extra1 | ev_extra4
-------+---------+------------------------------+-----------------+-----------
5 | I | (id,txt) values ('1','data') | public.trg_test | test=data
-(1 row)
+ 6 | R | | public.trg_test |
+(2 rows)
new_state text;
logtrg_name text;
+ trunctrg_name text;
+ pgversion int;
logtrg_previous text;
logtrg text;
tbl record;
return;
end if;
- -- create trigger if it does not exists already
+ -- create Ins/Upd/Del trigger if it does not exists already
logtrg_name := '_londiste_' || i_queue_name;
perform 1 from pg_catalog.pg_trigger
where tgrelid = londiste.find_table_oid(fq_table_name)
execute logtrg;
end if;
+ -- create tRuncate trigger if it does not exists already
+ show server_version_num into pgversion;
+ if pgversion >= 80400 then
+ trunctrg_name := '_londiste_' || i_queue_name || '_truncate';
+ perform 1 from pg_catalog.pg_trigger
+ where tgrelid = londiste.find_table_oid(fq_table_name)
+ and tgname = trunctrg_name;
+ if not found then
+ logtrg := 'create trigger ' || quote_ident(trunctrg_name)
+ || ' after truncate on ' || londiste.quote_fqname(fq_table_name)
+ || ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
+ || ')';
+ execute logtrg;
+ end if;
+ end if;
+
-- Check that no trigger exists on the target table that will get fired
-- before londiste one (this could have londiste replicate data
-- out-of-order
select * from londiste.local_add_table('aset', 'public.testdata_nopk');
select * from londiste.local_add_table('aset', 'public.testdata');
-select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass;
+select tgname from pg_trigger where tgrelid = 'public.testdata'::regclass order by 1;
insert into testdata (txt) values ('test-data');
select * from londiste.get_table_list('aset');
select * from londiste.local_show_missing('aset');
select * from londiste.local_add_table('aset', 'public.trg_test', array['ev_extra4=''test='' || txt']);
insert into trg_test values (1, 'data');
+truncate trg_test;
select ev_id, ev_type, ev_data, ev_extra1, ev_extra4 from pgq.event_template where ev_extra1 = 'public.trg_test';
foo | 2 |
(1 row)
+-- test truncate
+create trigger trunc_triga after truncate on when_test
+for each statement execute procedure pgq.logutriga('que3');
+truncate when_test;
+NOTICE: insert_event(que3, R, , public.when_test)
+CONTEXT: SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
delete from custom_expr;
NOTICE: insert_event(que3, bat, dat1='foo', test=foo)
CONTEXT: SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
+-- test truncate
+create trigger customtrc_triga after truncate on custom_expr
+for each statement execute procedure pgq.sqltriga('que3');
+truncate custom_expr;
+NOTICE: insert_event(que3, R, , public.custom_expr)
+CONTEXT: SQL statement "select pgq.insert_event($1, $2, $3, $4, $5, $6, $7)"
insert into when_test values ('foo', '2');
select * from when_test;
-
+-- test truncate
+create trigger trunc_triga after truncate on when_test
+for each statement execute procedure pgq.logutriga('que3');
+truncate when_test;
update custom_expr set dat3 = 'bat';
delete from custom_expr;
-
+-- test truncate
+create trigger customtrc_triga after truncate on custom_expr
+for each statement execute procedure pgq.sqltriga('que3');
+truncate custom_expr;
elog(ERROR, "bad param to pgq trigger");
}
+ if (ev->op_type == 'R') {
+ if (ev->tgargs->ignore_list)
+ elog(ERROR, "Column ignore does not make sense for truncate trigger");
+ if (ev->tgargs->pkey_list)
+ elog(ERROR, "Custom pkey_list does not make sense for truncate trigger");
+ if (ev->tgargs->backup)
+ elog(ERROR, "Backup does not make sense for truncate trigger");
+ }
}
static void parse_oldstyle_args(PgqTriggerEvent *ev, TriggerData *tg)
*/
if (!TRIGGER_FIRED_AFTER(tg->tg_event))
/* dont care */ ;
- if (!TRIGGER_FIRED_FOR_ROW(tg->tg_event))
- elog(ERROR, "pgq trigger must be fired FOR EACH ROW");
+ if (TRIGGER_FIRED_BY_TRUNCATE(tg->tg_event)) {
+ if (!TRIGGER_FIRED_FOR_STATEMENT(tg->tg_event))
+ elog(ERROR, "pgq tRuncate trigger must be fired FOR EACH STATEMENT");
+ } else if (!TRIGGER_FIRED_FOR_ROW(tg->tg_event)) {
+ elog(ERROR, "pgq Ins/Upd/Del trigger must be fired FOR EACH ROW");
+ }
if (tg->tg_trigger->tgnargs < 1)
elog(ERROR, "pgq trigger must have destination queue as argument");
ev->op_type = 'U';
else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event))
ev->op_type = 'D';
+ else if (TRIGGER_FIRED_BY_TRUNCATE(tg->tg_event))
+ ev->op_type = 'R';
else
elog(ERROR, "unknown event for pgq trigger");
Oid tgoid = tg->tg_trigger->tgoid;
const char *pfx = "select ";
+ if (ev->op_type == 'R')
+ elog(ERROR, "Custom expressions do not make sense for truncater trigger");
+
/* make sure tgargs exists */
if (!ev->tgargs)
ev->tgargs = find_trigger_info(ev->info, tgoid, true);
/* logutriga.c */
void pgq_urlenc_row(PgqTriggerEvent *ev, HeapTuple row, StringInfo buf);
+#ifndef TRIGGER_FIRED_BY_TRUNCATE
+#define TRIGGER_FIRED_BY_TRUNCATE(tg) 0
+#endif
+
const char *col_ident, *col_value;
int attkind_idx = -1;
+ if (ev->op_type == 'R')
+ return;
+
for (i = 0; i < tg->tg_relation->rd_att->natts; i++) {
/* Skip dropped columns */
if (tupdesc->attrs[i]->attisdropped)
pgq_prepare_event(&ev, tg, true);
- appendStringInfoChar(ev.field[EV_TYPE], ev.op_type);
- appendStringInfoChar(ev.field[EV_TYPE], ':');
- appendStringInfoString(ev.field[EV_TYPE], ev.pkey_list);
appendStringInfoString(ev.field[EV_EXTRA1], ev.info->table_name);
+ appendStringInfoChar(ev.field[EV_TYPE], ev.op_type);
+ if (ev.op_type != 'R') {
+ appendStringInfoChar(ev.field[EV_TYPE], ':');
+ appendStringInfoString(ev.field[EV_TYPE], ev.pkey_list);
+ }
if (is_interesting_change(&ev, tg)) {
/*
need_event = process_update(ev, sql);
} else if (TRIGGER_FIRED_BY_DELETE(tg->tg_event)) {
process_delete(ev, sql);
+ } else if (TRIGGER_FIRED_BY_TRUNCATE(tg->tg_event)) {
+ /* nothing to do for truncate */
} else
elog(ERROR, "logtriga fired for unhandled event");
* 1. queue name to be inserted to.
*
* Queue events will be in format:
- * ev_type - operation type, I/U/D
+ * ev_type - operation type, I/U/D/R
* ev_data - urlencoded column values
* ev_extra1 - table name
* ev_extra2 - optional urlencoded backup