londiste execute: fix downstream apply
authorMarko Kreen <markokr@gmail.com>
Thu, 12 Jul 2012 13:10:00 +0000 (16:10 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 13 Jul 2012 19:44:33 +0000 (22:44 +0300)
- Downgrade "already applied" message from WARN to INFO
  Unnecessary to warn about expected situation.

- ExecAttr: urldecoding fixes
  This path was not tested properly.

- Test execute-through-combined-queue

python/londiste/exec_attrs.py
python/londiste/playback.py
sql/londiste/functions/londiste.execute_start.sql
tests/merge/regen.sh

index 7e35723c1484cc51a2743848a0b7bef6e5991813..ce0f8221afe632e3c9071814df1f3b33ece1ba27 100644 (file)
@@ -175,14 +175,14 @@ class ExecAttrsException(skytools.UsageError):
 
 class ExecAttrs:
     """Container and parser for EXECUTE attributes."""
-    def __init__(self, sql=None, ustr=None):
+    def __init__(self, sql=None, urlenc=None):
         """Create container and parse either sql or urlenc string."""
 
         self.attrs = {}
-        if sql and ustr:
-            raise Exception("Both sql and ustr set.")
-        if ustr:
-            self.parse_urlenc(ustr)
+        if sql and urlenc:
+            raise Exception("Both sql and urlenc set.")
+        if urlenc:
+            self.parse_urlenc(urlenc)
         elif sql:
             self.parse_sql(sql)
 
@@ -208,7 +208,7 @@ class ExecAttrs:
     def parse_urlenc(self, ustr):
         """Parse urlencoded string adding values to current container."""
         sdict = skytools.db_urldecode(ustr)
-        for k, v in sdict:
+        for k, v in sdict.items():
             for v1 in v.split(','):
                 self.add_value(k, v1)
 
index 328286d6f98fe15d7a82814363a4b9e35c73ceff..e5fc83a77336ece3634db3419273980494eb04f4 100644 (file)
@@ -676,15 +676,13 @@ class Replicator(CascadedWorker):
 
         tbl_map = {}
         for tbl, t in self.table_map.items():
-            if not t.local:
-                continue
             tbl_map[t.name] = t.dest_table
 
         q = "select * from londiste.execute_start(%s, %s, %s, false, %s)"
         res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql, s_attrs], commit = False)
         ret = res[0]['ret_code']
-        if ret >= 300:
-            self.log.warning("Skipping execution of '%s'", fname)
+        if ret > 200:
+            self.log.info("Skipping execution of '%s'", fname)
             return
 
         if exec_attrs.need_execute(dst_curs, tbl_map, seq_map):
index 7817cb6caa88f8360252033bd017528323ed283c..d50807ca261167f39f56a4167d1b32b021da1a7d 100644 (file)
@@ -28,7 +28,7 @@ as $$
 --
 -- Returns:
 --      200 - Proceed.
---      301 - Already applied
+--      201 - Already applied
 --      401 - Not root.
 --      404 - No such queue
 -- ----------------------------------------------------------------------
@@ -47,7 +47,7 @@ begin
     perform 1 from londiste.applied_execute
         where execute_file = i_file_name;
     if found then
-        select 301, 'EXECUTE(' || i_file_name || ') already applied'
+        select 201, 'EXECUTE: "' || i_file_name || '" already applied, skipping'
             into ret_code, ret_note;
         return;
     end if;
index fe31be1a31cbd0fb7dfc5dc5cc0fbd9e7b05d67a..8a3c3ffc6a4d1ac5cae41e43efd35e318c94e4cd 100755 (executable)
@@ -188,3 +188,17 @@ run_sql full2 "select * from londiste.get_table_list('replika_part2')"
 
 ../zcheck.sh
 
+msg "Test EXECUTE through cascade"
+
+for db in part1 part2 part3 part4; do
+  run londiste3 $v conf/londiste_$db.ini execute addcol-data2.sql
+done
+msg "Sleep a bit"
+run sleep 10
+
+psql -d part1 -c '\d mydata'
+psql -d full1 -c '\d mydata'
+psql -d part1 -c '\d mydata'
+
+../zcheck.sh
+