-- ----------------------------------------------------------------------
declare
fq_table_name text;
+ _cqueue text;
begin
fq_table_name := londiste.make_fqname(i_table_name);
- perform 1 from pgq_node.node_info where queue_name = i_queue_name;
+ select combined_queue into _cqueue
+ from pgq_node.node_info
+ where queue_name = i_queue_name
+ for update;
if not found then
- select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
+ select 400, 'No such queue: ' || i_queue_name into ret_code, ret_note;
return;
end if;
values (i_queue_name, fq_table_name);
select 200, 'Table added: ' || i_table_name
into ret_code, ret_note;
- return;
+ -- let the combined node know about it too
+ if _cqueue is not null then
+ perform londiste.global_add_table(_cqueue, i_table_name);
+ end if;
+
+ return;
exception
-- seems the row was added from parallel connection (setup vs. replay)
when unique_violation then
join pgq_node.node_info n2
on (n2.queue_name = n1.combined_queue)
left join londiste.table_info t
- on (t.queue_name = n2.queue_name and t.table_name = fq_table_name)
+ on (t.queue_name = n2.queue_name and t.table_name = fq_table_name and t.local)
where n1.queue_name = i_queue_name and n2.node_type = 'root'
into _combined_queue, _combined_table;
if found and _combined_table is null then