clarify var names
authorMarko Kreen <markokr@gmail.com>
Fri, 4 Apr 2008 09:08:43 +0000 (09:08 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 4 Apr 2008 09:08:43 +0000 (09:08 +0000)
python/pgq/setconsumer.py

index ea01fc04f323fafe0d66648fd661ba2e9572371c..6e01ccf6a96feab196701a0d08233b06553bbf68 100644 (file)
@@ -85,8 +85,6 @@ node_properties = {
 class SetConsumer(skytools.DBScript):
     last_global_wm_event = 0
     def work(self):
-
-
         self.tick_id_cache = {}
 
         self.set_name = self.cf.get('set_name')
@@ -158,41 +156,41 @@ class SetConsumer(skytools.DBScript):
         return 1
 
     def process_set_batch(self, src_db, dst_db, ev_list, copy_queue = None):
-        curs = db.cursor()
+        dst_curs = dst_db.cursor()
         for ev in ev_list:
-            self.process_set_event(curs, ev)
+            self.process_set_event(dst_curs, ev)
             if copy_queue:
-                copy_queue.bulk_insert(curs, ev)
+                copy_queue.bulk_insert(dst_curs, ev)
         self.stat_add('count', len(ev_list))
 
-    def process_set_event(self, curs, ev):
+    def process_set_event(self, dst_curs, ev):
         if ev.type == 'set-tick':
-            self.handle_set_tick(curs, ev)
+            self.handle_set_tick(dst_curs, ev)
         elif ev.type == 'set-member-info':
-            self.handle_member_info(curs, ev)
+            self.handle_member_info(dst_curs, ev)
         elif ev.type == 'global-watermark':
-            self.handle_global_watermark(curs, ev)
+            self.handle_global_watermark(dst_curs, ev)
         else:
             raise Exception('bad event for set consumer')
 
-    def handle_global_watermark(self, curs, ev):
+    def handle_global_watermark(self, dst_curs, ev):
         set_name = ev.extra1
         tick_id = ev.data
         if set_name == self.set_name:
-            self.set_global_watermark(curs, tick_id)
+            self.set_global_watermark(dst_curs, tick_id)
 
-    def handle_set_tick(self, curs, ev):
+    def handle_set_tick(self, dst_curs, ev):
         data = skytools.db_urldecode(ev.data)
         set_name = data['set_name']
         tick_id = data['tick_id']
         self.tick_id_cache[set_name] = tick_id
 
-    def move_part_positions(self, curs):
+    def move_part_positions(self, dst_curs):
         q = "select * from pgq_set.set_partition_watermark(%s, %s, %s)"
         for set_name, tick_id in self.tick_id_cache.items():
-            curs.execute(q, [self.set_name, set_name, tick_id])
+            dst_curs.execute(q, [self.set_name, set_name, tick_id])
 
-    def handle_member_info(self, curs, ev):
+    def handle_member_info(self, dst_curs, ev):
         data = skytools.db_urldecode(ev.data)
         set_name = data['set_name']
         node_name = data['node_name']
@@ -203,7 +201,7 @@ class SetConsumer(skytools.DBScript):
             return
 
         q = "select * from pgq_set.add_member(%s, %s, %s, %s)"
-        curs.execute(q, [set_name, node_name, node_location, dead])
+        dst_curs.execute(q, [set_name, node_name, node_location, dead])
 
     def send_local_watermark_upwards(self, target_db, source_db):
         target_curs = target_db.cursor()
@@ -216,9 +214,9 @@ class SetConsumer(skytools.DBScript):
         q = "select pgq_ext.set_subscriber_watermark(%s, %s, %s)"
         source_curs.execute(q, [self.set_name])
 
-    def set_global_watermark(self, curs, tick_id):
+    def set_global_watermark(self, dst_curs, tick_id):
         q = "select pgq_set.set_global_watermark(%s, %s)"
-        curs.execute(q, [self.set_name, tick_id])
+        dst_curs.execute(q, [self.set_name, tick_id])
 
     def load_node_info(self, db):
         curs = db.cursor()
@@ -236,15 +234,15 @@ class SetConsumer(skytools.DBScript):
 
         return NodeInfo(node_row, mbr_list)
 
-    def tag_node_uptodate(self, db):
-        curs = db.cursor()
+    def tag_node_uptodate(self, dst_db):
+        dst_curs = db.cursor()
         q = "select * from pgq_set.set_node_uptodate(%s, true)"
-        curs.execute(q, [self.set_name])
-        db.commit()
+        dst_curs.execute(q, [self.set_name])
+        dst_db.commit()
 
-    def copy_tick(self, curs, src_queue, dst_queue):
+    def copy_tick(self, dst_curs, src_queue, dst_queue):
         q = "select * from pgq.ticker(%s, %s)"
-        curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
+        dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
 
 if __name__ == '__main__':
     script = SetConsumer('setconsumer', sys.argv[1:])