_no_triggers boolean := false;
_skip boolean := false;
_queue_name text;
+ _local boolean;
begin
_extra_args := '';
fq_table_name := londiste.make_fqname(i_table_name);
-- merge all table sources on leaf
if _node.node_type = 'leaf' then
- for _queue_name in
- select t2.queue_name
+ for _queue_name, _local in
+ select t2.queue_name, t2.local
from londiste.table_info t
join pgq_node.node_info n on (n.queue_name = t.queue_name)
left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
left join londiste.table_info t2 on (t2.table_name = t.table_name and t2.queue_name = n2.queue_name)
where t.queue_name = i_queue_name
and t.table_name = fq_table_name
- and t2.local = false
+ -- and t2.local = false
and t2.queue_name != i_queue_name -- skip self
loop
- if not _merge_all then
- select 405, 'multiple source tables '|| fq_table_name
+ -- if table from some other source is already marked as local,
+ -- raise error
+ if _local then
+ select 406, 'found local table '|| fq_table_name
+ || ' in queue ' || _queue_name
+ || ', use remove-table first to remove all previous '
+ || 'table subscriptions'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- when table comes from multiple sources, merge_all switch is
+ -- required
+ if not _merge_all then
+ select 405, 'multiple source tables '|| fq_table_name
|| ' found, use merge_all'
- into ret_code, ret_note;
- return;
- end if;
+ into ret_code, ret_note;
+ return;
+ end if;
- update londiste.table_info
- set local = true,
- merge_state = new_state
- where queue_name = _queue_name and table_name = fq_table_name;
+ update londiste.table_info
+ set local = true,
+ merge_state = new_state
+ where queue_name = _queue_name and table_name = fq_table_name;
if not found then
raise exception 'lost table: %', fq_table_name;
end if;