# override consumer name
#consumer_name = %(job_name)s
+ # filter out only events for specific tables
+ #table_filter = table1, table2
+
# whether to use cursor to fetch events (0 disables)
#pgq_lazy_fetch = 300
self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None
self.pgq_min_lag = self.cf.get("pgq_keep_lag", '') or None
+ # filter out specific tables only
+ tfilt = []
+ for t in self.cf.getlist('table_filter', ''):
+ tfilt.append(skytools.quote_literal(skytools.fq_name(t)))
+ if len(tfilt) > 0:
+ expr = "ev_extra1 in (%s)" % ','.join(tfilt)
+ self.consumer_filter = expr
+
def startup(self):
"""Handle commands here. __init__ does not have error logging."""
if self.options.register: