londiste.handlers: .needs_table method
authorMarko Kreen <markokr@gmail.com>
Tue, 28 Jun 2011 12:28:38 +0000 (15:28 +0300)
committerMarko Kreen <markokr@gmail.com>
Tue, 28 Jun 2011 14:01:49 +0000 (17:01 +0300)
handler can specify whether is actually operates on table

python/londiste/handler.py
python/londiste/handlers/qtable.py
python/londiste/setup.py

index a0c72b0742ea2db13ad28ea607b72f4cf88a1ad2..9e52db5f1fecca5789e938ddd74d132f25ae540d 100644 (file)
@@ -107,6 +107,10 @@ class BaseHandler:
         return skytools.full_copy(tablename, src_curs, dst_curs, column_list,
                                   condition)
 
+    def needs_table(self):
+        """Does the handler need the table to exist on destination."""
+        return True
+
 class TableHandler(BaseHandler):
     """Default Londiste handler, inserts events into tables with plain SQL."""
     handler_name = 'londiste'
index a06280bcc6da513f480fc772fbe7526b3cde3dad..97b0b976b95bd0684adce156ca775f49d24a3c10 100644 (file)
@@ -31,7 +31,8 @@ class QueueTableHandler(BaseHandler):
         """Force copy not to start"""
         return (0,0)
 
-
+    def needs_table(self):
+        return False
 
 class FakeLocalHandler(BaseHandler):
     handler_name = 'fake_local'
@@ -39,6 +40,9 @@ class FakeLocalHandler(BaseHandler):
     def add(self, trigger_arg_list):
         trigger_arg_list.append('virtual_table')
 
+    def needs_table(self):
+        return False
+
 
 
 class QueueSplitterHandler(BaseHandler):
@@ -79,6 +83,9 @@ class QueueSplitterHandler(BaseHandler):
                   'extra1', 'extra2', 'extra3', 'extra4', 'time']
         pgq.bulk_insert_events(dst_curs, self.rows, fields, self.dst_queue_name)
 
+    def needs_table(self):
+        return False
+
 
 __londiste_handlers__ = [QueueTableHandler, FakeLocalHandler,
                          QueueSplitterHandler]
index 1234fcdb0696bef47c79ff7ccab77aad040bfe14..e9ef4102368876aa6a0a4c4506e856c2259aecce 100644 (file)
@@ -111,7 +111,8 @@ class LondisteSetup(CascadeAdmin):
         self.sync_table_list(dst_curs, src_tbls, dst_tbls)
         dst_db.commit()
 
-        args = self.expand_arg_list(dst_db, 'r', False, args)
+        needs_tbl = self.handler_needs_table()
+        args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
 
         # dont check for exist/not here (root handling)
         problems = False
@@ -193,6 +194,14 @@ class LondisteSetup(CascadeAdmin):
             self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
         dst_db.commit()
 
+    def handler_needs_table(self):
+        if self.options.handler:
+            hstr = londiste.handler.create_handler_string(
+                            self.options.handler, self.options.handler_arg)
+            p = londiste.handler.build_handler('unused.string', hstr, self.log)
+            return p.needs_table()
+        return True
+
     def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
         for tbl in src_tbls.keys():
             q = "select * from londiste.global_add_table(%s, %s)"
@@ -378,7 +387,7 @@ class LondisteSetup(CascadeAdmin):
         return self.get_database('provider_db', connstr = self.provider_location)
 
 
-    def expand_arg_list(self, db, kind, existing, args):
+    def expand_arg_list(self, db, kind, existing, args, needs_tbl=True):
         curs = db.cursor()
 
         if kind == 'S':
@@ -411,14 +420,16 @@ class LondisteSetup(CascadeAdmin):
             else:
                 return lst_missing
 
+
+        allow_nonexist = not needs_tbl
         if existing:
-            res = self.solve_globbing(args, lst_exists, map_exists, map_missing)
+            res = self.solve_globbing(args, lst_exists, map_exists, map_missing, allow_nonexist)
         else:
-            res = self.solve_globbing(args, lst_missing, map_missing, map_exists)
+            res = self.solve_globbing(args, lst_missing, map_missing, map_exists, allow_nonexist)
         return res
 
 
-    def solve_globbing(self, args, full_list, full_map, reverse_map):
+    def solve_globbing(self, args, full_list, full_map, reverse_map, allow_nonexist):
         def glob2regex(s):
             s = s.replace('.', '[.]').replace('?', '.').replace('*', '.*')
             return '^%s$' % s
@@ -445,12 +456,15 @@ class LondisteSetup(CascadeAdmin):
                     res_map[a] = 1
                 elif a in reverse_map:
                     self.log.info("%s already processed" % a)
+                elif allow_nonexist:
+                    res_list.append(a)
+                    res_map[a] = 1
                 elif self.options.force:
                     self.log.warning("%s not available, but --force is used" % a)
                     res_list.append(a)
                     res_map[a] = 1
                 else:
-                    self.log.error("%s not available" % a)
+                    self.log.warning("%s not available" % a)
                     err = 1
         if err:
             raise skytools.UsageError("Cannot proceed")