retention_period:
how long to keep partitions around. examples: '3 months', '1 year'
+ignore_old_events:
+ * 0 - handle all events in the same way (default)
+ * 1 - ignore events coming for obsolete partitions
+
encoding:
name of destination encoding. handler replaces all invalid encoding symbols
and logs them as warnings
# show args
self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args)
+ self.ignored_tables = set()
self.batch_info = None
self.dst_curs = None
self.pkeys = None
conf.post_part = self.args.get('post_part')
conf.part_func = self.args.get('part_func', PART_FUNC_NEW)
conf.retention_period = self.args.get('retention_period')
+ conf.ignore_old_events = self.get_arg('ignore_old_events', [0, 1], 0)
# set row mode and event types to process
conf.row_mode = self.get_arg('row_mode', ROW_MODES)
event_types = self.args.get('event_types', '*')
raise Exception('Unknown event type: %s' % ev.ev_type)
# process only operations specified
if not op in self.conf.event_types:
+ #self.log.debug('dispatch.process_event: ignored event type')
return
self.log.debug('dispatch.process_event: %s/%s', ev.ev_type, ev.ev_data)
if self.pkeys is None:
# prepare split table when needed
if self.conf.table_mode == 'part':
dst, part_time = self.split_format(ev, data)
+ if dst in self.ignored_tables:
+ return
if dst not in self.row_handler.table_map:
self.check_part(dst, part_time)
+ if dst in self.ignored_tables:
+ return
else:
dst = self.dest_table
if dst not in self.row_handler.table_map:
self.row_handler.add_table(dst, LOADERS[self.conf.load_mode],
- self.pkeys, self.conf)
+ self.pkeys, self.conf)
self.row_handler.process(dst, op, data)
def finish_batch(self, batch_info, dst_curs):
if self.conf.retention_period:
self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
+ if self.conf.ignore_old_events and not skytools.exists_table(curs, dst):
+ self.ignored_tables.add(dst) # must have been just dropped
def drop_obsolete_partitions (self, parent_table, retention_period, partition_period):
""" Drop obsolete partitions of partition-by-date parent table.