import sys, time, skytools
 
+from pgq.rawconsumer import RawQueue
+
+__all__ = ['SetConsumer']
+
 ROOT = 'root'
 BRANCH = 'branch'
 LEAF = 'leaf'
         self.local_watermark = row['local_watermark']
         self.completed_tick = row['completed_tick']
         self.provider_node = row['provider_node']
+        self.provider_location = row['provider_location']
         self.paused = row['paused']
         self.resync = row['resync']
         self.up_to_date = row['up_to_date']
         self.combined_set = row['combined_set']
         self.combined_type = row['combined_type']
         self.combined_queue = row['combined_queue']
+        self.worker_name = row['worker_name']
 
     def need_action(self, action_name):
         typ = self.type
     last_global_wm_event = 0
     def work(self):
 
+
         self.tick_id_cache = {}
 
         self.set_name = self.cf.get('set_name')
         target_db = self.get_database('subscriber_db')
 
         node = self.load_node_info(target_db)
+        self.consumer_name = node.worker_name
 
         if not node.up_to_date:
             self.tag_node_uptodate(target_db)
         # batch processing follows
         #
 
+        source_db = self.get_database('source_db', connstr = node.provider_location)
         srcnode = self.load_node_info(source_db)
         
         # get batch
         curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
 
 if __name__ == '__main__':
-    script = SetConsumer('setconsumer', ['test.ini'])
+    script = SetConsumer('setconsumer', sys.argv[1:])
     script.start()
 
 
         if info['node_type'] is not None:
             self.log.info("Node is already initialized as %s" % info['node_type'])
             return
+        
+        worker_name = "%s_%s_worker" % (self.set_name, node_name)
 
         # register member
         if node_type in ('root', 'combined-root'):
             provider_name = None
             self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
         else:
             root_db = self.find_root_db()
             set = self.load_root_info(root_db)
                 sys.exit(1)
 
             # register on provider
-            worker_name = "qweqweqwe"
             provider_db = self.get_database('provider_db', connstr = provider.location)
             self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, node_name, node_location])
                           [self.set_name, node_name, node_location])
             self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)",
                           [self.set_name, provider_name, provider.location])
-            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s)",
-                          [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set])
+            self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
+                          [self.set_name, node_type, node_name, worker_name, provider_name,
+                           global_watermark, combined_set])
             db.commit()
 
             
             skytools.DBLanguage("plpgsql"),
             skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
             skytools.DBSchema("pgq", sql_file="pgq.sql"),
+            skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"),
             skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"),
         ]
-        skytools.db_install(db.cursor(), objs, self.log)
+        skytools.db_install(db.cursor(), objs, self.log.debug)
         db.commit()
 
 if __name__ == '__main__':