handlers.part: make hash func configurable
authorMarko Kreen <markokr@gmail.com>
Wed, 2 May 2012 12:27:40 +0000 (15:27 +0300)
committerMarko Kreen <markokr@gmail.com>
Wed, 2 May 2012 12:27:40 +0000 (15:27 +0300)
python/londiste/handlers/part.py

index 6e6440270df25a470406178f326919cc11085eb0..b6585a7bebf2fe5c256688722c0a1dd0f3f0be49 100644 (file)
@@ -1,5 +1,19 @@
-"""
-Experimental event filtering by hash.
+"""Event filtering by hash.
+
+Parameters:
+
+
+On root node:
+* Hash of key field will be added to ev_extra3.
+  This is implemented by adding additional trigger argument:
+
+        ev_extra3='hash='||partconf.get_hash_raw(key_column)
+
+On branch/leaf node:
+
+* On COPY time, the SELECT on provider side gets filtered by hash.
+* On replay time, the events gets filtered by looking at hash in ev_extra3.
+
 """
 
 import skytools
@@ -10,14 +24,26 @@ __all__ = ['PartHandler']
 class PartHandler(TableHandler):
     handler_name = 'part'
 
+    DEFAULT_HASHFUNC = "partconf.get_hash_raw"
+    DEFAULT_HASHEXPR = "%s(%s)"
+
     def __init__(self, table_name, args, dest_table):
         TableHandler.__init__(self, table_name, args, dest_table)
         self.max_part = None       # max part number
         self.local_part = None     # part number of local node
-        self.key = args.get('key')        
+
+        # primary key columns
+        self.key = args.get('key')
         if self.key is None:
             raise Exception('Specify key field as key agument')
 
+        # hash function & full expression
+        hashfunc = args.get('hashfunc', self.DEFAULT_HASHFUNC)
+        self.hashexpr = self.DEFAULT_HASHEXPR % (
+                skytools.quite_fqident(hashfunc),
+                skytools.quite_ident(self.key))
+        self.hashexpr = args.get('hashexpr', self.hashexpr)
+
     def reset(self):
         """Forget config info."""
         self.max_part = None
@@ -27,7 +53,7 @@ class PartHandler(TableHandler):
     def add(self, trigger_arg_list):
         """Let trigger put hash into extra3"""
 
-        arg = "ev_extra3='hash='||hashtext(%s)" % skytools.quote_ident(self.key)
+        arg = "ev_extra3='hash='||%s" % self.hashexpr
         trigger_arg_list.append(arg)        
         TableHandler.add(self, trigger_arg_list)
 
@@ -52,8 +78,7 @@ class PartHandler(TableHandler):
     def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
         """Copy only slots needed locally."""
         self.load_part_info(dst_curs)
-        fn = 'hashtext(%s)' % skytools.quote_ident(self.key)
-        w = "%s & %d = %d" % (fn, self.max_part, self.local_part)
+        w = "%s & %d = %d" % (self.hashexpr, self.max_part, self.local_part)
         self.log.debug('part: copy_condition=%s' % w)
         cond_list.append(w)