londiste.handlers.dispatch: added switch to ignore events aiming at obsolete (dropped...
authormartinko <gamato@users.sf.net>
Thu, 16 May 2013 12:59:23 +0000 (14:59 +0200)
committermartinko <gamato@users.sf.net>
Thu, 16 May 2013 12:59:23 +0000 (14:59 +0200)
python/londiste/handlers/dispatch.py

index fd172969e846a80923045c5195d65989df0f5c71..e5c5ca7a94744d409da1cd59bd92c83f1b10befa 100644 (file)
@@ -138,6 +138,10 @@ post_part:
 retention_period:
     how long to keep partitions around. examples: '3 months', '1 year'
 
+ignore_old_events:
+    * 0 - handle all events in the same way (default)
+    * 1 - ignore events coming for obsolete partitions
+
 encoding:
     name of destination encoding. handler replaces all invalid encoding symbols
     and logs them as warnings
@@ -637,6 +641,7 @@ class Dispatcher (PartHandler):
 
         # show args
         self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
+        self.ignored_tables = set()
         self.batch_info = None
         self.dst_curs = None
         self.pkeys = None
@@ -686,6 +691,7 @@ class Dispatcher (PartHandler):
             conf.post_part = self.args.get('post_part')
             conf.part_func = self.args.get('part_func', PART_FUNC_NEW)
             conf.retention_period = self.args.get('retention_period')
+            conf.ignore_old_events = self.get_arg('ignore_old_events', [0, 1], 0)
         # set row mode and event types to process
         conf.row_mode = self.get_arg('row_mode', ROW_MODES)
         event_types = self.args.get('event_types', '*')
@@ -780,6 +786,7 @@ class Dispatcher (PartHandler):
             raise Exception('Unknown event type: %s' % ev.ev_type)
         # process only operations specified
         if not op in self.conf.event_types:
+            #self.log.debug('dispatch.process_event: ignored event type')
             return
         self.log.debug('dispatch.process_event: %s/%s', ev.ev_type, ev.ev_data)
         if self.pkeys is None:
@@ -788,14 +795,18 @@ class Dispatcher (PartHandler):
         # prepare split table when needed
         if self.conf.table_mode == 'part':
             dst, part_time = self.split_format(ev, data)
+            if dst in self.ignored_tables:
+                return
             if dst not in self.row_handler.table_map:
                 self.check_part(dst, part_time)
+                if dst in self.ignored_tables:
+                    return
         else:
             dst = self.dest_table
 
         if dst not in self.row_handler.table_map:
             self.row_handler.add_table(dst, LOADERS[self.conf.load_mode],
-                                    self.pkeys, self.conf)
+                                       self.pkeys, self.conf)
         self.row_handler.process(dst, op, data)
 
     def finish_batch(self, batch_info, dst_curs):
@@ -900,6 +911,8 @@ class Dispatcher (PartHandler):
 
         if self.conf.retention_period:
             self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
+            if self.conf.ignore_old_events and not skytools.exists_table(curs, dst):
+                self.ignored_tables.add(dst) # must have been just dropped
 
     def drop_obsolete_partitions (self, parent_table, retention_period, partition_period):
         """ Drop obsolete partitions of partition-by-date parent table.