setconsumer fixes
authorMarko Kreen <markokr@gmail.com>
Fri, 7 Dec 2007 15:02:00 +0000 (15:02 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 7 Dec 2007 15:02:00 +0000 (15:02 +0000)
python/pgq/__init__.py
python/pgq/rawconsumer.py
python/pgq/setconsumer.py
python/setadm.py

index f0e9c1a647a2a92d205df8843b21dbeeed6cbe08..40fb835bdb27ac6d56748aba6cceb33ec9eeca7b 100644 (file)
@@ -2,5 +2,6 @@
 
 from pgq.event import *
 from pgq.consumer import *
+from pgq.setconsumer import *
 from pgq.producer import *
 
index 0cff4b2a951cc70d0c0d16494e334d9e5e223cc5..c1df916db55f8a5aa640b86edc810c4dda85bc8f 100644 (file)
@@ -31,7 +31,7 @@ class RawQueue:
 
         return self.batch_id
 
-    def finish_batch(self, curs, batch_id): pass
+    def finish_batch(self, curs, batch_id):
         q = "select * from pgq.finish_batch(%s)"
         curs.execute(q, [self.batch_id])
 
@@ -46,6 +46,6 @@ class RawQueue:
 
     def finish_bulk_insert(self, curs):
         pgq.bulk_insert_events(curs, self.bulk_insert_buf,
-                               self.bulk_insert_fields, self.queue_name):
+                               self.bulk_insert_fields, self.queue_name)
         self.bulk_insert_buf = []
 
index 6acb230a7fbb71edafac7505e0de9d592bfe4447..ea01fc04f323fafe0d66648fd661ba2e9572371c 100644 (file)
@@ -2,6 +2,10 @@
 
 import sys, time, skytools
 
+from pgq.rawconsumer import RawQueue
+
+__all__ = ['SetConsumer']
+
 ROOT = 'root'
 BRANCH = 'branch'
 LEAF = 'leaf'
@@ -29,12 +33,14 @@ class NodeInfo:
         self.local_watermark = row['local_watermark']
         self.completed_tick = row['completed_tick']
         self.provider_node = row['provider_node']
+        self.provider_location = row['provider_location']
         self.paused = row['paused']
         self.resync = row['resync']
         self.up_to_date = row['up_to_date']
         self.combined_set = row['combined_set']
         self.combined_type = row['combined_type']
         self.combined_queue = row['combined_queue']
+        self.worker_name = row['worker_name']
 
     def need_action(self, action_name):
         typ = self.type
@@ -80,12 +86,14 @@ class SetConsumer(skytools.DBScript):
     last_global_wm_event = 0
     def work(self):
 
+
         self.tick_id_cache = {}
 
         self.set_name = self.cf.get('set_name')
         target_db = self.get_database('subscriber_db')
 
         node = self.load_node_info(target_db)
+        self.consumer_name = node.worker_name
 
         if not node.up_to_date:
             self.tag_node_uptodate(target_db)
@@ -105,6 +113,7 @@ class SetConsumer(skytools.DBScript):
         # batch processing follows
         #
 
+        source_db = self.get_database('source_db', connstr = node.provider_location)
         srcnode = self.load_node_info(source_db)
         
         # get batch
@@ -238,6 +247,6 @@ class SetConsumer(skytools.DBScript):
         curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
 
 if __name__ == '__main__':
-    script = SetConsumer('setconsumer', ['test.ini'])
+    script = SetConsumer('setconsumer', sys.argv[1:])
     script.start()
 
index d9c44b9c6968bbd7cf44d8867238247fd88baa73..9cc4c332d624b61ce1d02f410782df4348b321d6 100755 (executable)
@@ -88,6 +88,8 @@ class SetAdmin(skytools.DBScript):
         if info['node_type'] is not None:
             self.log.info("Node is already initialized as %s" % info['node_type'])
             return
+        
+        worker_name = "%s_%s_worker" % (self.set_name, node_name)
 
         # register member
         if node_type in ('root', 'combined-root'):
@@ -96,8 +98,8 @@ class SetAdmin(skytools.DBScript):
             provider_name = None
             self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
         else:
             root_db = self.find_root_db()
             set = self.load_root_info(root_db)
@@ -123,7 +125,6 @@ class SetAdmin(skytools.DBScript):
                 sys.exit(1)
 
             # register on provider
-            worker_name = "qweqweqwe"
             provider_db = self.get_database('provider_db', connstr = provider.location)
             self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
@@ -136,8 +137,9 @@ class SetAdmin(skytools.DBScript):
                           [self.set_name, node_name, node_location])
             self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, provider_name, provider.location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, worker_name, provider_name,
+                           global_watermark, combined_set])
             db.commit()
 
             
@@ -201,9 +203,10 @@ class SetAdmin(skytools.DBScript):
             skytools.DBLanguage("plpgsql"),
             skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
             skytools.DBSchema("pgq", sql_file="pgq.sql"),
+            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"),
             skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"),
         ]
-        skytools.db_install(db.cursor(), objs, self.log)
+        skytools.db_install(db.cursor(), objs, self.log.debug)
         db.commit()
 
 if __name__ == '__main__':