sql/*: Simplify queries to be able to run greenplum as leaf
authorAsko Tiidumaa <asko.tiidumaa@skype.net>
Fri, 26 Nov 2010 12:33:33 +0000 (12:33 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 29 Nov 2010 12:56:19 +0000 (14:56 +0200)
execute:
  Move session_replication_role setting out of db functions.

14 files changed:
python/londiste/playback.py
python/londiste/setup.py
sql/londiste/functions/londiste.drop_table_triggers.sql
sql/londiste/functions/londiste.execute_finish.sql
sql/londiste/functions/londiste.execute_start.sql
sql/londiste/functions/londiste.is_replica_func.sql [new file with mode: 0644]
sql/londiste/functions/londiste.local_add_table.sql
sql/londiste/structure/functions.sql
sql/pgq/functions/pgq.get_consumer_info.sql
sql/pgq/functions/pgq.get_queue_info.sql
sql/pgq_node/expected/pgq_node_test.out
sql/pgq_node/functions/pgq_node.get_node_info.sql
sql/pgq_node/functions/pgq_node.get_subscriber_info.sql
sql/pgq_node/functions/pgq_node.set_consumer_uptodate.sql

index ed892c36c6deb48e6c237ecb8534d3e65e7041ca..fce32dbb03ba50560093bea175938681d561d667 100644 (file)
@@ -559,6 +559,9 @@ class Replicator(CascadedWorker):
         sql = ev.data
 
         # fixme: curs?
+        pgver = dst_curs.connection.server_version
+        if pgver >= 80300:
+            curs.execute("set local session_replication_role = 'local'")
         q = "select * from londiste.execute_start(%s, %s, %s, false)"
         res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql], commit = False)
         ret = res[0]['ret_code']
@@ -569,6 +572,8 @@ class Replicator(CascadedWorker):
             dst_curs.execute(stmt)
         q = "select * from londiste.execute_finish(%s, %s)"
         self.exec_cmd(dst_curs, q, [self.queue_name, fname], commit = False)
+        if pgver >= 80300:
+            curs.execute("set local session_replication_role = 'replica'")
 
     def apply_sql(self, sql, dst_curs):
 
index 9ea44bbb1574d102f1aaf72287c3f9217b68b35c..e8b203101d33dbcf98c619ddbf19a08ab1655803 100644 (file)
@@ -353,7 +353,7 @@ class LondisteSetup(CascadeAdmin):
 
         # set replica role for EXECUTE transaction
         if db.server_version >= 80300:
-            curs.execute("set local session_replication_role = 'replica'")
+            curs.execute("set local session_replication_role = 'local'")
 
         for fn in files:
             fname = os.path.basename(fn)
index 2ab0c9c6425e1fd1677156942d331f42a5213bf7..3e1958b2c207ea439342fc771af78b578063a48c 100644 (file)
@@ -19,6 +19,11 @@ declare
     logtrg_name     text;
     b_queue_name    bytea;
 begin
+    -- skip if no triggers found on that table
+    perform 1 from pg_catalog.pg_trigger where tgrelid = londiste.find_table_oid(i_table_name);
+    if not found then
+        return;
+    end if;
     -- cast to bytea
     b_queue_name := replace(i_queue_name, E'\\', E'\\\\')::bytea;
 
@@ -28,7 +33,7 @@ begin
     for logtrg_name in
         select tgname from pg_catalog.pg_trigger
          where tgrelid = londiste.find_table_oid(i_table_name)
-           and tgfoid in ('pgq.sqltriga'::regproc::oid, 'pgq.logutriga'::regproc::oid)
+           and londiste.is_replica_func(tgfoid)
            and substring(tgargs for (position(E'\\000'::bytea in tgargs) - 1)) = b_queue_name
     loop
         execute 'drop trigger ' || quote_ident(logtrg_name)
index 3edf835c140bc33f108a702a1b7ed2e74f0cb16a..a7d510eb1b3688cf2525d2ce102b6b594fc9d051 100644 (file)
@@ -37,13 +37,6 @@ begin
         perform pgq.insert_event(i_queue_name, 'EXECUTE', sql, i_file_name, null, null, null);
     end if;
 
-    -- try educated guess of previous state
-    if is_root then
-        SET LOCAL session_replication_role = 'origin';
-    else
-        SET LOCAL session_replication_role = 'replica';
-    end if;
-
     select 200, 'Execute finished: ' || i_file_name into ret_code, ret_note;
     return;
 end;
index bc2874f54eb4805737169b16c01c1fc9947b00b1..9ce0071f2e8c0942a18ad03de5f8491f00394376 100644 (file)
@@ -53,8 +53,6 @@ begin
     insert into londiste.applied_execute (queue_name, execute_file, execute_sql)
         values (i_queue_name, i_file_name, i_sql);
 
-    SET LOCAL session_replication_role = 'local';
-
     select 200, 'Executing: ' || i_file_name into ret_code, ret_note;
     return;
 end;
diff --git a/sql/londiste/functions/londiste.is_replica_func.sql b/sql/londiste/functions/londiste.is_replica_func.sql
new file mode 100644 (file)
index 0000000..6bf17fa
--- /dev/null
@@ -0,0 +1,14 @@
+
+create or replace function londiste.is_replica_func(func_oid oid)
+returns boolean as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.is_replica_func(1)
+--
+--      Returns true if function is a PgQ-based replication functions.
+--      This also means it takes queue name as first argument.
+-- ----------------------------------------------------------------------
+select count(1) > 0
+  from pg_proc f join pg_namespace n on (n.oid = f.pronamespace)
+  where f.oid = $1 and n.nspname = 'pgq' and f.proname in ('sqltriga', 'logutriga');
+$$ language sql strict stable;
+
index 46d88fe34a75660ebc9ae13b3bf206bff62d13df..895673191b9569940792035879a970554f502172 100644 (file)
@@ -218,28 +218,28 @@ begin
 
     if pgversion >= 90000 then
         select tg.tgname into logtrg_previous
-        from pg_class r, pg_trigger tg
+        from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
         where r.oid = londiste.find_table_oid(fq_table_name)
           and not tg.tgisinternal
           and tg.tgname < lg_name::name
           -- per-row AFTER trigger
           and (tg.tgtype & 3) = 1   -- bits: 0:ROW, 1:BEFORE
           -- current londiste
-          and tg.tgfoid not in ('pgq.sqltriga'::regproc::oid, 'pgq.logutriga'::regproc::oid)
+          and not londiste.is_replica_func(tg.tgfoid)
           -- old londiste
           and substring(tg.tgname from 1 for 10) != '_londiste_'
           and substring(tg.tgname from char_length(tg.tgname) - 6) != '_logger'
         order by 1 limit 1;
     else
         select tg.tgname into logtrg_previous
-        from pg_class r, pg_trigger tg
+        from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
         where r.oid = londiste.find_table_oid(fq_table_name)
           and not tg.tgisconstraint
           and tg.tgname < lg_name::name
           -- per-row AFTER trigger
           and (tg.tgtype & 3) = 1   -- bits: 0:ROW, 1:BEFORE
           -- current londiste
-          and tg.tgfoid not in ('pgq.sqltriga'::regproc::oid, 'pgq.logutriga'::regproc::oid)
+          and not londiste.is_replica_func(t.tgfoid)
           -- old londiste
           and substring(tg.tgname from 1 for 10) != '_londiste_'
           and substring(tg.tgname from char_length(tg.tgname) - 6) != '_logger'
index a8320f098a04d6ad7f891c072d3a668b784efe93..adcf6258943b3c527e30d1517a5b3db59af3e24e 100644 (file)
@@ -40,4 +40,5 @@
 \i functions/londiste.split_fqname.sql
 \i functions/londiste.table_info_trigger.sql
 \i functions/londiste.drop_table_triggers.sql
+\i functions/londiste.is_replica_func.sql
 
index 76467a85cc5a39589c5e92727178c97402507304..6ea1d051b7d39ed7fcdee0fa5f1dd4a85907b353 100644 (file)
@@ -98,22 +98,19 @@ returns setof record as $$
 --      current_batch       - Current batch ID, if one is active or NULL
 --      next_tick           - If batch is active, then its final tick.
 -- ----------------------------------------------------------------------
+declare
+    _pending_events bigint;
+    _queue_id bigint;
 begin
     for queue_name, consumer_name, lag, last_seen,
-        last_tick, current_batch, next_tick, pending_events
+        last_tick, current_batch, next_tick, _pending_events, _queue_id
     in
         select q.queue_name, c.co_name,
                current_timestamp - t.tick_time,
                current_timestamp - s.sub_active,
                s.sub_last_tick, s.sub_batch, s.sub_next_tick,
-               top.tick_event_seq - t.tick_event_seq
-          from pgq.queue q
-               left join pgq.tick top
-                 on (top.tick_queue = q.queue_id
-                     and top.tick_id = (select tmp.tick_id from pgq.tick tmp
-                                         where tmp.tick_queue = q.queue_id
-                                         order by tmp.tick_queue desc, tmp.tick_id desc
-                                         limit 1)),
+               t.tick_event_seq, q.queue_id
+          from pgq.queue q,
                pgq.consumer c,
                pgq.subscription s
                left join pgq.tick t
@@ -124,6 +121,12 @@ begin
            and (i_consumer_name is null or c.co_name = i_consumer_name)
          order by 1,2
     loop
+        select t.tick_event_seq - _pending_events
+            into pending_events
+            from pgq.tick t
+            where t.tick_queue = _queue_id
+            order by t.tick_queue desc, t.tick_id desc
+            limit 1;
         return next;
     end loop;
     return;
index d512b69538f4bc066c6ed3255cb7acfbb6311679..b4a2f3c70bd65c117788c2843802a645205fa62a 100644 (file)
@@ -63,41 +63,54 @@ returns setof record as $$
 -- Returns:
 --      One pgq.ret_queue_info record.
 -- ----------------------------------------------------------------------
+declare
+    _ticker_lag interval;
+    _top_tick_id bigint;
+    _ht_tick_id bigint;
+    _top_tick_time timestamptz;
+    _top_tick_event_seq bigint;
+    _ht_tick_time timestamptz;
+    _ht_tick_event_seq bigint;
+    _queue_id integer;
+    _queue_event_seq text;
 begin
     for queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
         queue_switch_time, queue_external_ticker, queue_ticker_paused,
         queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period,
-        ticker_lag, ev_per_sec, ev_new
+        _queue_id, _queue_event_seq
     in select
         q.queue_name, q.queue_ntables, q.queue_cur_table,
         q.queue_rotation_period, q.queue_switch_time,
         q.queue_external_ticker, q.queue_ticker_paused,
         q.queue_ticker_max_count, q.queue_ticker_max_lag,
         q.queue_ticker_idle_period,
-        (select current_timestamp - tick_time
-           from pgq.tick where tick_queue = queue_id
-          order by tick_queue desc, tick_id desc limit 1),
-        case when ht.tick_time < top.tick_time
-             then (top.tick_event_seq - ht.tick_event_seq) / extract(epoch from (top.tick_time - ht.tick_time))
-             else null end,
-        pgq.seq_getval(q.queue_event_seq) - top.tick_event_seq
+        q.queue_id, q.queue_event_seq
         from pgq.queue q
-          left join pgq.tick top
-            on (top.tick_queue = q.queue_id
-                and top.tick_id = (select tmp.tick_id from pgq.tick tmp
-                                    where tmp.tick_queue = q.queue_id
-                                    order by tmp.tick_queue desc, tmp.tick_id desc
-                                    limit 1))
-          left join pgq.tick ht
-            on (ht.tick_queue = q.queue_id
-                and ht.tick_id = (select tmp2.tick_id from pgq.tick tmp2
-                                   where tmp2.tick_queue = q.queue_id
-                                     and tmp2.tick_id >= top.tick_id - 20
-                                   order by tmp2.tick_queue asc, tmp2.tick_id asc
-                                   limit 1))
         where (i_queue_name is null or q.queue_name = i_queue_name)
         order by q.queue_name
     loop
+        -- most recent tick
+        select (current_timestamp - t.tick_time),
+               tick_id, t.tick_time, t.tick_event_seq
+            into ticker_lag, _top_tick_id, _top_tick_time, _top_tick_event_seq
+            from pgq.tick t
+            where t.tick_queue = _queue_id
+            order by t.tick_queue desc, t.tick_id desc
+            limit 1;
+        -- slightly older tick
+        select ht.tick_id, ht.tick_time, ht.tick_event_seq
+            into _ht_tick_id, _ht_tick_time, _ht_tick_event_seq
+            from pgq.tick ht
+            where ht.tick_queue = _queue_id
+             and ht.tick_id >= _top_tick_id - 20
+            order by ht.tick_queue asc, ht.tick_id asc
+            limit 1;
+        if _ht_tick_time < _top_tick_time then
+            ev_per_sec = (_top_tick_event_seq - _ht_tick_event_seq) / extract(epoch from (_top_tick_time - _ht_tick_time));
+        else
+            ev_per_sec = null;
+        end if;
+        ev_new = pgq.seq_getval(_queue_event_seq) - _top_tick_event_seq;
         return next;
     end loop;
     return;
index 47195a2a05d3f05ec5fe03cc3e6088a35bc002bd..6b68660e19298cf784f6390d64513c1c9bdc3244 100644 (file)
@@ -283,9 +283,9 @@ select * from pgq_node.set_consumer_paused('bqueue', 'random_consumer2', true);
 (1 row)
 
 select * from pgq_node.set_consumer_uptodate('bqueue', 'random_consumer2', true);
- ret_code |         ret_note         
-----------+--------------------------
-      200 | Consumer uptodate = true
+ ret_code |       ret_note        
+----------+-----------------------
+      200 | Consumer uptodate = 1
 (1 row)
 
 select * from pgq_node.change_consumer_provider('bqueue', 'random_consumer2', 'node3');
index f64670a4b00b7dedacaff2c23fb73cd67d4db75e..306dde13815d0b8ade5cbd43a401495ced5d5a5e 100644 (file)
@@ -44,17 +44,16 @@ create or replace function pgq_node.get_node_info(
 declare
     sql text;
 begin
-    select 100, 'Ok', n.node_type, n.node_name, g.last_tick,
+    select 100, 'Ok', n.node_type, n.node_name,
            c.node_type, c.queue_name, w.provider_node, l.node_location,
            n.worker_name, w.paused, w.uptodate, w.last_tick_id
-      into ret_code, ret_note, node_type, node_name, global_watermark,
+      into ret_code, ret_note, node_type, node_name,
            combined_type, combined_queue, provider_node, provider_location,
            worker_name, worker_paused, worker_uptodate, worker_last_tick
       from pgq_node.node_info n
            left join pgq_node.node_info c on (c.queue_name = n.combined_queue)
            left join pgq_node.local_state w on (w.queue_name = n.queue_name and w.consumer_name = n.worker_name)
            left join pgq_node.node_location l on (l.queue_name = w.queue_name and l.node_name = w.provider_node)
-           left join pgq.get_consumer_info(i_queue_name, '.global_watermark') g on (g.queue_name = n.queue_name)
       where n.queue_name = i_queue_name;
     if not found then
         select 404, 'Unknown queue: ' || i_queue_name into ret_code, ret_note;
@@ -62,9 +61,10 @@ begin
     end if;
 
     if node_type in ('root', 'branch') then
-        select min(last_tick) into local_watermark
-          from pgq.get_consumer_info(i_queue_name)
-         where consumer_name <> '.global_watermark';
+        select min(case when consumer_name = '.global_watermark' then null else last_tick end),
+               min(case when consumer_name = '.global_watermark' then last_tick else null end)
+          into local_watermark, global_watermark
+          from pgq.get_consumer_info(i_queue_name);
         if local_watermark is null then
             select t.tick_id into local_watermark
               from pgq.tick t, pgq.queue q
index ec6981e6354a32336c93b9accada6d278f108ebb..70934da9b558bc5fac0c3f507eac1ec763c6f78d 100644 (file)
@@ -23,14 +23,17 @@ returns setof record as $$
 --      worker_name     - consumer that maintains remote node
 --      local_watermark - lowest tick_id on subscriber
 -- ----------------------------------------------------------------------
+declare
+    _watermark_name text;
 begin
-    for node_name, worker_name, node_watermark in
-        select s.subscriber_node, s.worker_name,
-               (select last_tick from pgq.get_consumer_info(i_queue_name, s.watermark_name)) as wm_pos
+    for node_name, worker_name, _watermark_name in
+        select s.subscriber_node, s.worker_name, s.watermark_name
           from pgq_node.subscriber_info s
          where s.queue_name = i_queue_name
          order by 1
     loop
+        select last_tick into node_watermark
+            from pgq.get_consumer_info(i_queue_name, _watermark_name);
         return next;
     end loop;
     return;
index 933c4672cc8aa84afae4f3782e75243de8c376e3..cbf565e39192a418a2dd0ab995adc90811d46e07 100644 (file)
@@ -25,7 +25,8 @@ begin
      where queue_name = i_queue_name
        and consumer_name = i_consumer_name;
     if found then
-        select 200, 'Consumer uptodate = '||i_uptodate into ret_code, ret_note;
+        select 200, 'Consumer uptodate = ' || i_uptodate::int4::text
+               into ret_code, ret_note;
     else
         select 404, 'Consumer not known: '
                || i_queue_name || '/' || i_consumer_name