londiste copy: copy table from another node
authorMarko Kreen <markokr@gmail.com>
Thu, 17 May 2012 15:03:29 +0000 (18:03 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 17 May 2012 15:03:29 +0000 (18:03 +0300)
add-table: --copy-node=NODENAME, store it into table_attrs
copy: take node name from table_attrs

python/londiste.py
python/londiste/setup.py
python/londiste/table_copy.py

index b7bc99e0f617e9e2f22198e50a59c12bc78c3bce..3a183ab609eaefe434ba2f31c2effaef9f5c21a2 100755 (executable)
@@ -130,6 +130,8 @@ class Londiste(skytools.DBScript):
                 help="add: Custom handler for table")
         g.add_option("--handler-arg", action="append",
                 help="add: Argument to custom handler")
+        g.add_option("--copy-node", dest="copy_node",
+                help = "add: use NODE as source for initial COPY")
         g.add_option("--copy-condition", dest="copy_condition",
                 help = "add: set WHERE expression for copy")
         g.add_option("--merge-all", action="store_true",
index bb4b152f93d35eb19cabb64b2015528a4a9df741..866945b2e84c8b9ad21ad518429de77d937aa766 100644 (file)
@@ -45,6 +45,8 @@ class LondisteSetup(CascadeAdmin):
                     help = "no copy needed", default=False)
         p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
                     help = "dont delete old data", default=False)
+        p.add_option("--copy-node", dest="copy_node",
+                help = "add: use NODE as source for initial copy")
         p.add_option("--copy-condition", dest="copy_condition",
                 help = "copy: where expression")
         p.add_option("--force", action="store_true",
@@ -217,6 +219,9 @@ class LondisteSetup(CascadeAdmin):
             attrs['handler'] = hstr
             p.add(tgargs)
 
+        if self.options.copy_node:
+            attrs['copy_node'] = self.options.copy_node
+
         if self.options.expect_sync:
             tgargs.append('expect_sync')
 
@@ -430,6 +435,17 @@ class LondisteSetup(CascadeAdmin):
         db.commit()
 
     def get_provider_db(self):
+
+        # use custom node for copy
+        if self.options.copy_node:
+            source_node = self.options.copy_node
+            m = self.queue_info.get_member(source_node)
+            if not m:
+                raise UsageError("Cannot find node <%s>", source_node)
+            if source_node == self.local_node:
+                raise UsageError("Cannot use itself as provider")
+            self.provider_location = m.location
+
         if not self.provider_location:
             db = self.get_database('db')
             q = 'select * from pgq_node.get_node_info(%s)'
index 3c137ae62c4be42f1f1e8d9b1d1c51ea56d65aa6..65a702fb851cdcd8857694f99b8ed8b5a7cf1201 100644 (file)
@@ -239,14 +239,38 @@ class CopyTable(Replicator):
         return Replicator.work(self)
 
     def register_copy_consumer(self):
-        # fetch parent consumer state
         dst_db = self.get_database('db')
-        q = "select * from pgq_node.get_consumer_state(%s, %s)"
-        rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.old_consumer_name ])
-        state = rows[0]
-        loc = state['provider_location']
+        dst_curs = dst_db.cursor()
 
-        self.register_consumer(loc)
+        # fetch table attrs
+        q = "select * from londiste.get_table_list(%s) where table_name = %s"
+        dst_curs.execute(q, [ self.queue_name, self.copy_table_name ])
+        rows = dst_curs.fetchall()
+        attrs = {}
+        if len(rows) > 0:
+            v_attrs = rows[0]['table_attrs']
+            if v_attrs:
+                attrs = skytools.db_urldecode(v_attrs)
+
+        # do we have node here?
+        if 'copy_node' in attrs:
+            # take node from attrs
+            source_node = attrs['copy_node']
+            q = "select * from pgq_node.get_queue_locations(%s) where node_name = %s"
+            dst_curs.execute(q, [ self.queue_name, source_node ])
+            rows = dst_curs.fetchall()
+            if len(rows):
+                source_location = rows[0]['node_location']
+        else:
+            # fetch parent consumer state
+            q = "select * from pgq_node.get_consumer_state(%s, %s)"
+            rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.old_consumer_name ])
+            state = rows[0]
+            source_node = state['provider_node']
+            source_location = state['provider_location']
+
+        self.log.info("Using '%s' as source node", source_node)
+        self.register_consumer(source_location)
 
 if __name__ == '__main__':
     script = CopyTable(sys.argv[1:])