-"""
-Experimental event filtering by hash.
+"""Event filtering by hash.
+
+Parameters:
+
+
+On root node:
+* Hash of key field will be added to ev_extra3.
+ This is implemented by adding additional trigger argument:
+
+ ev_extra3='hash='||partconf.get_hash_raw(key_column)
+
+On branch/leaf node:
+
+* On COPY time, the SELECT on provider side gets filtered by hash.
+* On replay time, the events gets filtered by looking at hash in ev_extra3.
+
"""
import skytools
class PartHandler(TableHandler):
handler_name = 'part'
+ DEFAULT_HASHFUNC = "partconf.get_hash_raw"
+ DEFAULT_HASHEXPR = "%s(%s)"
+
def __init__(self, table_name, args, dest_table):
TableHandler.__init__(self, table_name, args, dest_table)
self.max_part = None # max part number
self.local_part = None # part number of local node
- self.key = args.get('key')
+
+ # primary key columns
+ self.key = args.get('key')
if self.key is None:
raise Exception('Specify key field as key agument')
+ # hash function & full expression
+ hashfunc = args.get('hashfunc', self.DEFAULT_HASHFUNC)
+ self.hashexpr = self.DEFAULT_HASHEXPR % (
+ skytools.quite_fqident(hashfunc),
+ skytools.quite_ident(self.key))
+ self.hashexpr = args.get('hashexpr', self.hashexpr)
+
def reset(self):
"""Forget config info."""
self.max_part = None
def add(self, trigger_arg_list):
"""Let trigger put hash into extra3"""
- arg = "ev_extra3='hash='||hashtext(%s)" % skytools.quote_ident(self.key)
+ arg = "ev_extra3='hash='||%s" % self.hashexpr
trigger_arg_list.append(arg)
TableHandler.add(self, trigger_arg_list)
def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
"""Copy only slots needed locally."""
self.load_part_info(dst_curs)
- fn = 'hashtext(%s)' % skytools.quote_ident(self.key)
- w = "%s & %d = %d" % (fn, self.max_part, self.local_part)
+ w = "%s & %d = %d" % (self.hashexpr, self.max_part, self.local_part)
self.log.debug('part: copy_condition=%s' % w)
cond_list.append(w)