node_db.commit()
provider_db.commit()
- def cmd_add(self, args = []):
+ def cmd_add(self, *args):
q = "select * from londiste.node_add_table(%s, %s)"
db = self.get_database('node_db')
self.exec_cmd_many(db, q, [self.set_name], args)
- def cmd_remove(self, args = []):
+ def cmd_remove(self, *args):
q = "select * from londiste.node_remove_table(%s, %s)"
db = self.get_database('node_db')
self.exec_cmd_many(db, q, [self.set_name], args)
- def cmd_add_seq(self, args = []):
+ def cmd_add_seq(self, *args):
q = "select * from londiste.node_add_seq(%s, %s)"
db = self.get_database('node_db')
self.exec_cmd_many(db, q, [self.set_name], args)
- def cmd_remove_seq(self, args = []):
+ def cmd_remove_seq(self, *args):
q = "select * from londiste.node_remove_seq(%s, %s)"
db = self.get_database('node_db')
self.exec_cmd_many(db, q, [self.set_name], args)
- def cmd_resync(self, args = []):
+ def cmd_resync(self, *args):
q = "select * from londiste.node_resync_table(%s, %s)"
db = self.get_database('node_db')
self.exec_cmd_many(db, q, [self.set_name], args)
- def cmd_tables(self, args = []):
+ def cmd_tables(self):
q = "select table_name, merge_state from londiste.node_get_table_list(%s)"
db = self.get_database('node_db')
self.db_display_table(db, "Tables on node", q, [self.set_name])
- def cmd_seqs(self, args = []):
+ def cmd_seqs(self):
q = "select seq_namefrom londiste.node_get_seq_list(%s)"
db = self.get_database('node_db')
self.db_display_table(db, "Sequences on node", q, [self.set_name])
- def cmd_missing(self, args = []):
+ def cmd_missing(self):
q = "select * from londiste.node_show_missing(%s)"
db = self.get_database('node_db')
self.db_display_table(db, "Missing objects on node", q, [self.set_name])
- def cmd_check(self, args = []):
+ def cmd_check(self):
pass
- def cmd_fkeys(self, args = []):
+ def cmd_fkeys(self):
pass
- def cmd_triggers(self, args = []):
+ def cmd_triggers(self):
pass
#
__all__ = ['SetAdmin']
-command_usage = """
+command_usage = """\
%prog [options] INI CMD [subcmd args]
-commands:
+Node Initialization:
+ init-root NODE_NAME NODE_CONSTR
+ init-branch NODE_NAME NODE_CONSTR --provider=<constr>
+ init-leaf NODE_NAME NODE_CONSTR --provider=<constr>
+ Initializes node. Given connstr is kept as global connstring
+ for that node. Those commands ignore node_db in .ini.
+ The --provider connstr is used only for initial set info
+ fetching, later actual provider's connect string is used.
+
+Node Administration:
+ status Show set state
+ members Show members in set
+ rename-node OLD NEW Rename a node
+ change-provider NODE NEWSRC
+ pause NODE
+ resume NODE
+
+ switchover NEWROOT
+ failover NEWROOT
+ tag-dead NODE .. Tag node as dead
+ tag-alive NODE .. Tag node as alive
"""
class SetAdmin(skytools.AdminScript):
# Node initialization.
#
- def cmd_init_root(self, args):
- if len(args) != 2:
- raise Exception('init-root needs 2 args')
- self.init_node('root', args[0], args[1])
+ def cmd_init_root(self, node_name, node_location):
+ self.init_node('root', node_name, node_location)
- def cmd_init_branch(self, args):
+ def cmd_init_branch(self, node_name, node_location):
if len(args) != 2:
raise Exception('init-branch needs 2 args')
- self.init_node('branch', args[0], args[1])
+ self.init_node('branch', node_name, node_location)
- def cmd_init_leaf(self, args):
- if len(args) != 2:
- raise Exception('init-leaf needs 2 args')
- self.init_node('leaf', args[0], args[1])
+ def cmd_init_leaf(self, node_name, node_location):
+ self.init_node('leaf', node_name, node_location)
def init_node(self, node_type, node_name, node_location):
provider_loc = self.options.provider
# Print status of whole set.
#
- def cmd_status(self, args):
+ def cmd_status(self):
root_db = self.find_root_db()
sinf = self.load_set_info(root_db)
# Normal commands.
#
- def cmd_change_provider(self, args):
- node_name = args[0]
- new_provider = args[1]
+ def cmd_change_provider(self, node_name, new_provider):
old_provider = None
self.load_local_info()
# resume node
self.resume_node(node_name)
- def cmd_rename_node(self, args):
- old_name = args[0]
- new_name = args[1]
+ def cmd_rename_node(self, old_name, new_name):
self.load_local_info()
self.pause_node(old_name)
node = self.load_node_info(old_name)
provider_node = node.provider_node
+ subscriber_list = self.get_node_subscriber_list(old_name)
# create copy of member info / subscriber+queue info
self.exec_cmd(root_db, step1, [self.set_name, old_name, new_name])
self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
self.node_cmd(old_name, step1, [self.set_name, old_name, new_name])
+ for child in subscriber_list:
+ self.node_cmd(child, step1, [self.set_name, old_name, new_name])
# step1
self.node_cmd(old_name, step2, [self.set_name, old_name, new_name])
self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name])
+ for child in subscriber_list:
+ self.node_cmd(child, step2, [self.set_name, old_name, new_name])
self.exec_cmd(root_db, step2, [self.set_name, old_name, new_name])
# resume node
self.resume_node(old_name)
- def cmd_promote(self):
- old_root = 'foo'
- new_root = ''
- self.pause_node(old_root)
- ctx = self.load_node_info(old_root)
- [['old-root', 'PAUSE']]
- [['old-root', 'demote, set-provider?']]
- [['new-root', 'wait-for-catch-up']]
- [['new-root', 'pause']]
- [['new-root', 'promote']]
- [['new-root', 'resume']]
- [['old-root', 'resume']]
- [['new_parent', 'select * from pgq_set.subscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
- [['node', 'select * from pgq_set.change_provider(%(set_name)s, %(new_provider)s)']]
- [['old_parent', 'select * from pgq_set.unsubscribe_node(%(set_name)s, %(node_name)s, %(node_pos)s)']]
- [['node', 'RESUME']]
-
- def cmd_pause(self, args):
+ def switchover_nonroot(self, old_node, new_node):
+ # see if we need to change new nodes' provider
+ tmp_node = new_node
+ while 1:
+ if tmp_node.is_root():
+ break
+ if tmp_node.name == old_node:
+ # yes, old_node is new_nodes provider,
+ # switch it around
+ self.change_provider(new_node, old_node.parent_node)
+ break
+ self.change_provider(old_node.name, new_node.name)
+
+ def switchover_root(self, old_node, new_node):
+ self.pause_node(old_node.name)
+ self.extra_lockdown(old_node)
+
+ self.wait_for_catchup(new_node, old_node)
+ self.pause_node(new_node.name)
+ self.promote_node(new_node.name)
+ self.subscribe_node(new_node.name, old_node.name, tick_pos)
+ self.unsubscribe_node(new_node.parent_node, new_node.name)
+ self.resume_node(new_node.name)
+
+ # demote & set provider on node
+ q = 'select * from pgq_set.demote_root(%s, %s)'
+ self.node_cmd(old_node.name, q, [self.set_name, new_node.name])
+
+ self.resume_node(old_node.name)
+
+ def cmd_switchover(self, old_node_name, new_node_name):
self.load_local_info()
- self.pause_node(args[0])
+ old_node = self.get_node_info(old_node_name)
+ new_node = self.get_node_info(new_node_name)
+ if old_node.name == new_node.name:
+ self.log.info("same node?")
+ return
- def cmd_resume(self, args):
+ if old_node.is_root():
+ self.switchover_root(old_node, new_node)
+ else:
+ self.switchover_nonroot(old_node, new_node)
+
+ # switch subscribers around
+ if self.options.all:
+ for n in self.get_node_subscriber_list(old_node.name):
+ self.change_provider(n, new_node.name)
+
+ def cmd_pause(self, node_name):
+ self.load_local_info()
+ self.pause_node(node_name)
+
+ def cmd_resume(self, node_name):
self.load_local_info()
- self.resume_node(args[0])
+ self.resume_node(node_name)
- def cmd_members(self, args):
+ def cmd_members(self):
db = self.get_database(self.initial_db_name)
q = "select node_name from pgq_set.get_node_info(%s)"
rows = self.exec_query(db, q, [self.set_name])
def subscribe_node(self, target_node, subscriber_node, tick_pos):
q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
- self.node_cmd(target_node, q, [self.set_name, target_node, tick_pos])
+ self.node_cmd(target_node, q, [self.set_name, subscribe_node, tick_pos])
- def unsubscribe_node(self, target_node, subscriber_node, tick_pos):
- q = "select * from pgq_set.subscribe_node(%s, %s, %s)"
- self.node_cmd(target_node, q, [self.set_name, target_node, tick_pos])
+ def unsubscribe_node(self, target_node, subscriber_node):
+ q = "select * from pgq_set.unsubscribe_node(%s, %s)"
+ self.node_cmd(target_node, q, [self.set_name, subscribe_node])
def load_node_info(self, node_name):
db = self.get_node_database(node_name)
return SetInfo(self.set_name, info, member_list)
+ def get_node_subscriber_list(self, node_name):
+ q = "select node_name, local_watermark from pgq_set.get_subscriber_info(%s)"
+ db = self.get_node_database(node_name)
+ rows = self.exec_query(db, q, [self.set_name])
+ return [r['node_name'] for r in rows]
+
if __name__ == '__main__':
script = SetAdmin('set_admin', sys.argv[1:])
script.start()
last_global_wm_publish_time = 0
main_worker = True
reg_ok = False
+ actual_dst_event_id = 0
+ batch_max_event_id = 0
+ seq_buffer = 10000
def __init__(self, service_name, args,
node_db_name = 'node_db'):
skytools.DBScript.__init__(self, service_name, args)
# COMBINED_BRANCH needs to sync with part sets
if dst_node.need_action('sync-part-pos'):
self.move_part_positions(dst_curs)
+ if dst_node.need_action('update-event-seq'):
+ self.update_event_seq(dst_curs)
# we are done on target
self.set_tick_complete(dst_curs, src_queue.cur_tick)
def process_set_batch(self, src_db, dst_db, ev_list):
dst_curs = dst_db.cursor()
+ max_id = 0
for ev in ev_list:
self.process_set_event(dst_curs, ev)
if self.dst_queue:
self.dst_queue.bulk_insert(dst_curs, ev)
+ if ev.id > max_id:
+ max_id = ev.id
+ self.batch_max_event_id = max_id
self.stat_increase('count', len(ev_list))
+ def update_event_seq(self, dst_curs):
+ qname = self.dst_queue.queue_name
+ if self.actual_dst_event_id == 0:
+ q = "select pgq.seq_getval(queue_event_seq) from pgq.queue where queue_name = %s"
+ dst_curs.execute(q, [qname])
+ self.actual_dst_event_id = dst_curs.fetchone()[0]
+ self.log.debug('got local event_id value = %d' % self.actual_dst_event_id)
+
+ if self.batch_max_event_id + self.seq_buffer >= self.actual_dst_event_id:
+ next_id = self.batch_max_event_id + 2 * self.seq_buffer
+ q = "select pgq.seq_setval(queue_event_seq, %s) from pgq.queue where queue_name = %s"
+ self.log.debug('set local event_id value = %d' % next_id)
+ dst_curs.execute(q, [next_id, qname])
+ self.actual_dst_event_id = next_id
+
def process_set_event(self, dst_curs, ev):
if ev.type == 'set-tick':
self.handle_set_tick(dst_curs, ev)