londiste: moved hash_key logic from dispatch to part handler
authormartinko <gamato@users.sf.net>
Thu, 2 May 2013 09:46:53 +0000 (11:46 +0200)
committermartinko <gamato@users.sf.net>
Thu, 2 May 2013 09:46:53 +0000 (11:46 +0200)
python/londiste/handlers/dispatch.py
python/londiste/handlers/part.py

index 0f22e1fed9194b09031561fa052dae2dc9e886b0..d1af2f02a7dd30817d4219c4882e8065f2be865f 100644 (file)
@@ -725,7 +725,7 @@ class Dispatcher (PartHandler):
         return val
 
     def _validate_hash_key(self):
-        pass
+        pass # no need for hash key when not sharding
 
     def reset(self):
         """Called before starting to process a batch.
@@ -737,8 +737,7 @@ class Dispatcher (PartHandler):
         if self.conf.table_mode != 'ignore':
             self.batch_info = batch_info
             self.dst_curs = dst_curs
-        if self.hash_key is not None:
-            super(Dispatcher, self).prepare_batch(batch_info, dst_curs)
+        super(Dispatcher, self).prepare_batch(batch_info, dst_curs)
 
     def filter_data(self, data):
         """Process with fields skip and map"""
@@ -920,9 +919,7 @@ class Dispatcher (PartHandler):
     def get_copy_condition(self, src_curs, dst_curs):
         """ Prepare where condition for copy and replay filtering.
         """
-        if self.hash_key is not None:
-            return super(Dispatcher, self).get_copy_condition(src_curs, dst_curs)
-        return ''
+        return super(Dispatcher, self).get_copy_condition(src_curs, dst_curs)
 
     def real_copy(self, tablename, src_curs, dst_curs, column_list):
         """do actual table copy and return tuple with number of bytes and rows
index e3d7daee5fcc3d172043c66ea2740e9ddb53f198..1cbd99bc63f2770df8bc830b69e1dd9f6917e08a 100644 (file)
@@ -67,8 +67,9 @@ class PartHandler(TableHandler):
 
     def prepare_batch(self, batch_info, dst_curs):
         """Called on first event for this table in current batch."""
-        if not self.max_part:
-            self.load_part_info(dst_curs)
+        if self.hash_key is not None:
+            if not self.max_part:
+                self.load_part_info(dst_curs)
         TableHandler.prepare_batch(self, batch_info, dst_curs)
 
     def process_event(self, ev, sql_queue_func, arg):
@@ -88,6 +89,8 @@ class PartHandler(TableHandler):
 
     def get_copy_condition(self, src_curs, dst_curs):
         """Prepare the where condition for copy and replay filtering"""
+        if self.hash_key is None:
+            return TableHandler.get_copy_condition(self, src_curs, dst_curs)
         self.load_part_info(dst_curs)
         w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
         self.log.debug('part: copy_condition=%s', w)