Cleanup and fix remaining bugs in --dest-table code
authorMarko Kreen <markokr@gmail.com>
Tue, 1 Nov 2011 21:31:57 +0000 (23:31 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 1 Nov 2011 21:31:57 +0000 (23:31 +0200)
python/londiste/setup.py
sql/londiste/functions/londiste.local_add_table.sql

index b9f69dc6f9acf052564fde12104ee59632fa24e0..0280142cfc9299dd3c72962283c3fc642a09e843 100644 (file)
@@ -104,18 +104,27 @@ class LondisteSetup(CascadeAdmin):
         node_db.commit()
         provider_db.commit()
 
+    def is_root(self):
+        return self.queue_info.local_node.type == 'root'
+
     def cmd_add_table(self, *args):
         """Attach table(s) to local node."""
 
-        dst_db = self.get_database('db')
-        dst_curs = dst_db.cursor()
+        self.load_local_info()
+
         src_db = self.get_provider_db()
-        src_curs = src_db.cursor()
+        if not self.is_root():
+            src_curs = src_db.cursor()
+            src_tbls = self.fetch_set_tables(src_curs)
+            src_db.commit()
 
-        src_tbls = self.fetch_set_tables(src_curs)
+        dst_db = self.get_database('db')
+        dst_curs = dst_db.cursor()
         dst_tbls = self.fetch_set_tables(dst_curs)
-        src_db.commit()
-        self.sync_table_list(dst_curs, src_tbls, dst_tbls)
+        if self.is_root():
+            src_tbls = dst_tbls
+        else:
+            self.sync_table_list(dst_curs, src_tbls, dst_tbls)
         dst_db.commit()
 
         needs_tbl = self.handler_needs_table()
@@ -124,6 +133,7 @@ class LondisteSetup(CascadeAdmin):
         # dont check for exist/not here (root handling)
         problems = False
         for tbl in args:
+            tbl = skytools.fq_name(tbl)
             if (tbl in src_tbls) and not src_tbls[tbl]['local']:
                 self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl)
                 problems = True
@@ -146,22 +156,31 @@ class LondisteSetup(CascadeAdmin):
 
         # seems ok
         for tbl in args:
-            tbl = skytools.fq_name(tbl)
             self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
 
     def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls):
+        # use full names
+        tbl = skytools.fq_name(tbl)
+        dest_table = self.options.dest_table or tbl
+        dest_table = skytools.fq_name(dest_table)
+
         src_curs = src_db.cursor()
         dst_curs = dst_db.cursor()
-        src_dest_table = src_tbls[tbl]['dest_table']
-        dest_table = self.options.dest_table or tbl
         tbl_exists = skytools.exists_table(dst_curs, dest_table)
+
+        if dest_table == tbl:
+            desc = tbl
+        else:
+            desc = "%s(%s)" % (tbl, dest_table)
+
         if create_flags:
             if tbl_exists:
-                self.log.info('Table %s already exist, not touching' % dest_table)
+                self.log.info('Table %s already exist, not touching' % desc)
             else:
+                src_dest_table = src_tbls[tbl]['dest_table']
                 if not skytools.exists_table(src_curs, src_dest_table):
                     # table not present on provider - nowhere to get the DDL from
-                    self.log.warning('Table "%s" missing on provider, skipping' % tbl)
+                    self.log.warning('Table %s missing on provider, skipping' % desc)
                     return
                 schema = skytools.fq_name_parts(dest_table)[0]
                 if not skytools.exists_schema(dst_curs, schema):
@@ -209,15 +228,13 @@ class LondisteSetup(CascadeAdmin):
         if self.options.max_parallel_copy:
             attrs['max_parallel_copy'] = self.options.max_parallel_copy
 
-        args = [self.set_name, tbl, tgargs]
-
-        if attrs:
-            args.append(skytools.db_urlencode(attrs))
-
-        q = "select * from londiste.local_add_table(%s)" %\
-            ','.join(['%s']*len(args))
-
         # actual table registration
+        args = [self.set_name, tbl, tgargs, None, None]
+        if attrs:
+            args[3] = skytools.db_urlencode(attrs)
+        if dest_table != tbl:
+            args[4] = dest_table
+        q = "select * from londiste.local_add_table(%s, %s, %s, %s, %s)"
         self.exec_cmd(dst_curs, q, args)
         dst_db.commit()
 
index 6b477d06f0a4e212c0ec1bb81be9be76b4811eb1..f69ca568c6782cb4655c0e679b6d6335d738fea6 100644 (file)
@@ -97,6 +97,7 @@ declare
     _dest_table text;
     _got_extra1 boolean := false;
     _table_name2 text;
+    _desc text;
 begin
 
     -------- i_trg_args ARGUMENTS PARSING
@@ -154,12 +155,18 @@ begin
         _args := array_append(_args, quote_literal(arg));
     end if;
 
+    if _dest_table = fq_table_name then
+        _desc := fq_table_name;
+    else
+        _desc := fq_table_name || '(' || _dest_table || ')';
+    end if;
+
     -------- TABLE STRUCTURE CHECK
 
     if not _virtual_table then
         _tbloid := londiste.find_table_oid(_dest_table);
         if _tbloid is null then
-            select 404, 'Table does not exist: ' || _dest_table into ret_code, ret_note;
+            select 404, 'Table does not exist: ' || _desc into ret_code, ret_note;
             return;
         end if;
         col_types := londiste.find_column_types(_dest_table);
@@ -176,7 +183,7 @@ begin
                 and coalesce(t.dest_table, t.table_name) = _dest_table
                 and t.dropped_ddl is not null;
             if not found then
-                select 400, 'Primary key missing on table: ' || _dest_table into ret_code, ret_note;
+                select 400, 'Primary key missing on table: ' || _desc into ret_code, ret_note;
                 return;
             end if;
         end if;
@@ -202,7 +209,7 @@ begin
                 return;
             end if;
         else
-            select 404, 'Table not available on queue: ' || fq_table_name
+            select 404, 'Table not available on queue: ' || _desc
                 into ret_code, ret_note;
             return;
         end if;
@@ -214,7 +221,7 @@ begin
     end if;
 
     if tbl.local then
-        select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
+        select 200, 'Table already added: ' || _desc into ret_code, ret_note;
         return;
     end if;
 
@@ -257,7 +264,7 @@ begin
             -- if table from some other source is already marked as local,
             -- raise error
             if _local then
-                select 405, 'Found local table '|| fq_table_name
+                select 405, 'Found local table '|| _desc
                         || ' in queue ' || _queue_name
                         || ', use remove-table first to remove all previous '
                         || 'table subscriptions'
@@ -268,7 +275,7 @@ begin
            -- when table comes from multiple sources, merge_all switch is
            -- required
            if not _merge_all then
-               select 405, 'Found multiple sources for table '|| fq_table_name
+               select 405, 'Found multiple sources for table '|| _desc
                        || ', use merge-all or no-merge to continue'
                into ret_code, ret_note;
                return;
@@ -354,7 +361,7 @@ begin
         perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
             where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
         if not found then
-            select 200, 'Table added with no triggers: ' || fq_table_name into ret_code, ret_note;
+            select 200, 'Table added with no triggers: ' || _desc into ret_code, ret_note;
             return;
         end if;
         -- on regular leaf, install deny trigger
@@ -384,7 +391,7 @@ begin
                 execute sql;
             end if;
         else
-            select 405, 'Multiple SKIP triggers in table: ' || _dest_table
+            select 405, 'Multiple SKIP triggers in table: ' || _desc
             into ret_code, ret_note;
             return;
         end if;
@@ -397,7 +404,7 @@ begin
     if not found then
 
         if _no_triggers then
-            select 200, 'Table added with no triggers: ' || fq_table_name
+            select 200, 'Table added with no triggers: ' || _desc
             into ret_code, ret_note;
             return;
         end if;
@@ -472,14 +479,14 @@ begin
 
     if logtrg_previous is not null then
        select 301,
-              'Table added: ' || fq_table_name
+              'Table added: ' || _desc
                               || ', but londiste trigger is not first: '
                               || logtrg_previous
          into ret_code, ret_note;
         return;
     end if;
 
-    select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
+    select 200, 'Table added: ' || _desc into ret_code, ret_note;
     return;
 end;
 $$ language plpgsql;