londiste: copy expression support
authorMarko Kreen <markokr@gmail.com>
Tue, 3 Nov 2009 12:21:28 +0000 (14:21 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 3 Nov 2009 12:43:44 +0000 (14:43 +0200)
python/londiste.py
python/londiste/setup.py
python/londiste/table_copy.py
python/skytools/sqltools.py

index 11b45fbc784075d159cfa537d560135f897f9c7e..3fb6b6badf65e52b7d9abf7ec56604ee2981a777 100755 (executable)
@@ -91,6 +91,8 @@ class Londiste(skytools.DBScript):
                 help = "add: no copy needed", default=False)
         g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
                 help = "add: keep old data", default=False)
+        p.add_option("--copy-condition", dest="copy_condition",
+                help = "copy: where expression")
         g.add_option("--provider",
                 help = "init: upstream node temp connect string")
         g.add_option("--create", action="store_true",
index 0dbc0457963a722ff5cce1b53dd7b50f48c2b01d..c0e84e54c573e2549404e3e61f38275ab4db2583 100644 (file)
@@ -41,6 +41,8 @@ class LondisteSetup(CascadeAdmin):
                     help = "no copy needed", default=False)
         p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
                     help = "dont delete old data", default=False)
+        p.add_option("--copy-condition", dest="copy_condition",
+                help = "copy: where expression")
         p.add_option("--force", action="store_true",
                     help="force", default=False)
         p.add_option("--all", action="store_true",
@@ -155,6 +157,11 @@ class LondisteSetup(CascadeAdmin):
         if self.options.expect_sync:
             q = "select * from londiste.local_set_table_state(%s, %s, NULL, 'ok')"
             self.exec_cmd(dst_curs, q, [self.set_name, tbl])
+        if self.options.copy_condition:
+            q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
+            attrs = {'copy_condition': self.options.copy_condition}
+            enc_attrs = skytools.db_urlencode(attrs)
+            self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
         dst_db.commit()
 
     def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
index f229066413aaf55cee248103f82f467bb6abe276..07c9594a0a39ba71f9ba63a5a8d68a370a0204ea 100644 (file)
@@ -200,7 +200,8 @@ class CopyTable(Replicator):
         tablename = tbl_stat.name
         # do copy
         self.log.info("%s: start copy" % tablename)
-        stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list)
+        w_cond = tbl_stat.table_attrs.get('copy_condition')
+        stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list, w_cond)
         if stats:
             self.log.info("%s: copy finished: %d bytes, %d rows" % (
                           tablename, stats[0], stats[1]))
index 902bf9a6b99f19ee3d6d1596115de3f03da0d61e..76d73efad9f39c5fc71f1e887e9bd94238461fc3 100644 (file)
@@ -373,23 +373,31 @@ class CopyPipe(object):
         self.buf.seek(0)
         self.buf.truncate()
 
-def full_copy(tablename, src_curs, dst_curs, column_list = []):
+def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None):
     """COPY table from one db to another."""
 
     qtable = quote_fqident(tablename)
     if column_list:
-        qfields = [quote_ident(f) for f in column_list]
-        hdr = "%s (%s)" % (qtable, ",".join(qfields))
+        qfields = ",".join([quote_ident(f) for f in column_list])
+        src = dst = "%s (%s)" % (qtable, qfields)
     else:
-        hdr = qtable
+        qfields = '*'
+        src = dst = qtable
+
+    if condition:
+        src = "(SELECT %s FROM %s WHERE %s)" % (qfields, qtable, condition)
+
     if hasattr(src_curs, 'copy_expert'):
-        sql_to = "COPY %s TO stdout" % hdr
-        sql_from = "COPY %s FROM stdout" % hdr
+        sql_to = "COPY %s TO stdout" % src
+        sql_from = "COPY %s FROM stdin" % dst
         buf = CopyPipe(dst_curs, sql_from = sql_from)
         src_curs.copy_expert(sql_to, buf)
     else:
-        buf = CopyPipe(dst_curs, hdr)
-        src_curs.copy_to(buf, hdr)
+        if condition:
+            # regular psycopg copy_to generates invalid sql for subselect copy
+            raise Exception('copy_expert() is needed for conditional copy')
+        buf = CopyPipe(dst_curs, dst)
+        src_curs.copy_to(buf, src)
     buf.flush()
 
     return (buf.total_bytes, buf.total_rows)