cascade: make wm publish periods configurable
authorMarko Kreen <markokr@gmail.com>
Mon, 4 May 2009 07:37:00 +0000 (10:37 +0300)
committerMarko Kreen <markokr@gmail.com>
Mon, 4 May 2009 07:37:00 +0000 (10:37 +0300)
This is mostly meant for testing.  The options may be removed or
renamed in the future.

Also fix bug in local wm publish function

python/pgq/cascade/worker.py

index 20fbbf2688277cc95f9396cafb59e6765d438f67..79ba5479d4b3253bc9d596ce88cc795efd19427b 100644 (file)
@@ -98,6 +98,12 @@ class CascadedWorker(CascadedConsumer):
 
         CascadedConsumer.__init__(self, service_name, db_name, args)
 
+    def reload(self):
+        CascadedConsumer.reload(self)
+
+        self.global_wm_publish_period = self.cf.getfloat('global_wm_publish_period', CascadedWorker.global_wm_publish_period)
+        self.local_wm_publish_period = self.cf.getfloat('local_wm_publish_period', CascadedWorker.local_wm_publish_period)
+
     def process_remote_batch(self, src_db, tick_id, event_list, dst_db):
         """Worker-specific event processing."""
         self.ev_buf = []
@@ -147,8 +153,8 @@ class CascadedWorker(CascadedConsumer):
         if t - self.local_wm_publish_time < self.local_wm_publish_period:
             return
 
-        self.log.debug("Publishing local watermark: %d" % st.local_watermark)
         st = self._worker_state
+        self.log.debug("Publishing local watermark: %d" % st.local_watermark)
         src_curs = src_db.cursor()
         q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
         src_curs.execute(q, [self.pgq_queue_name, st.node_name, st.local_watermark])