return val
def _validate_hash_key(self):
- pass
+ pass # no need for hash key when not sharding
def reset(self):
"""Called before starting to process a batch.
if self.conf.table_mode != 'ignore':
self.batch_info = batch_info
self.dst_curs = dst_curs
- if self.hash_key is not None:
- super(Dispatcher, self).prepare_batch(batch_info, dst_curs)
+ super(Dispatcher, self).prepare_batch(batch_info, dst_curs)
def filter_data(self, data):
"""Process with fields skip and map"""
def get_copy_condition(self, src_curs, dst_curs):
""" Prepare where condition for copy and replay filtering.
"""
- if self.hash_key is not None:
- return super(Dispatcher, self).get_copy_condition(src_curs, dst_curs)
- return ''
+ return super(Dispatcher, self).get_copy_condition(src_curs, dst_curs)
def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
def prepare_batch(self, batch_info, dst_curs):
"""Called on first event for this table in current batch."""
- if not self.max_part:
- self.load_part_info(dst_curs)
+ if self.hash_key is not None:
+ if not self.max_part:
+ self.load_part_info(dst_curs)
TableHandler.prepare_batch(self, batch_info, dst_curs)
def process_event(self, ev, sql_queue_func, arg):
def get_copy_condition(self, src_curs, dst_curs):
"""Prepare the where condition for copy and replay filtering"""
+ if self.hash_key is None:
+ return TableHandler.get_copy_condition(self, src_curs, dst_curs)
self.load_part_info(dst_curs)
w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
self.log.debug('part: copy_condition=%s', w)