From 5046d68589a5e13b63e5e1b7d2ea6fd6ce669a6f Mon Sep 17 00:00:00 2001 From: Martin Pihlak Date: Fri, 24 Apr 2009 10:51:18 +0300 Subject: [PATCH] Support for drop-node. --- python/londiste.py | 2 +- python/pgq/cascade/admin.py | 20 ++++++- sql/pgq_node/functions/pgq_node.drop_node.sql | 54 +++++++++++++------ .../pgq_node.unregister_subscriber.sql | 2 +- sql/pgq_node/structure/functions.sql | 2 +- 5 files changed, 61 insertions(+), 19 deletions(-) diff --git a/python/londiste.py b/python/londiste.py index 48e591ed..8f284b2b 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -40,7 +40,7 @@ Internal Commands: cmd_handlers = ( (('create-root', 'create-branch', 'create-leaf', 'members', 'tag-dead', 'tag-alive', 'change-provider', 'rename-node', 'status', 'pause', 'resume', - 'switchover', 'failover'), londiste.LondisteSetup), + 'switchover', 'failover', 'drop-node'), londiste.LondisteSetup), (('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs', 'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup), (('worker', 'replay'), londiste.Replicator), diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index ffb61af3..82bb1966 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -44,6 +44,7 @@ Works, naming problems: Broken: + drop-node NAME rename-node OLD NEW Rename a node show-consumers [--node] failover NEWROOT @@ -434,6 +435,23 @@ class CascadeAdmin(skytools.AdminScript): # resume node self.resume_node(old_name) + def cmd_drop_node(self, node_name): + """Drop a node.""" + + self.load_local_info() + + node = self.load_node_info(node_name) + + # see if we can safely drop + subscriber_list = self.get_node_subscriber_list(node_name) + if subscriber_list: + raise Exception('node still has subscribers') + + # remove the node from all the databases + for n in self.queue_info.member_map.values(): + q = "select * from pgq_node.drop_node(%s, %s)" + self.node_cmd(n.name, q, [self.queue_name, node_name]) + def node_depends(self, sub_node, top_node): cur_node = sub_node # walk upstream @@ -683,7 +701,7 @@ class CascadeAdmin(skytools.AdminScript): def get_node_subscriber_list(self, node_name): """Fetch subscriber list from a node.""" - q = "select node_name, local_watermark from pgq_node.get_subscriber_info(%s)" + q = "select node_name, node_watermark from pgq_node.get_subscriber_info(%s)" db = self.get_node_database(node_name) rows = self.exec_query(db, q, [self.queue_name]) return [r['node_name'] for r in rows] diff --git a/sql/pgq_node/functions/pgq_node.drop_node.sql b/sql/pgq_node/functions/pgq_node.drop_node.sql index 1507d878..8a1babf2 100644 --- a/sql/pgq_node/functions/pgq_node.drop_node.sql +++ b/sql/pgq_node/functions/pgq_node.drop_node.sql @@ -1,16 +1,19 @@ create or replace function pgq_node.drop_node( in i_queue_name text, + in i_node_name text, out ret_code int4, out ret_note text) returns record as $$ -- ---------------------------------------------------------------------- --- Function: pgq_node.drop_node(1) +-- Function: pgq_node.drop_node(2) -- --- Drop node. +-- Drop node. This needs to be run on all the members of a set +-- to properly get rid of the node. -- -- Parameters: -- i_queue_name - queue name +-- i_node_name - node_name -- -- Returns: -- ret_code - error code @@ -21,29 +24,50 @@ returns record as $$ -- 404 - No such queue -- ---------------------------------------------------------------------- declare - _wm_consumer text; - _global_wm bigint; - sub record; + _is_local boolean; begin - perform 1 from pgq_node.node_info + select (node_name = i_node_name) into _is_local + from pgq_node.node_info where queue_name = i_queue_name; + if not found then - select 404, 'No such queue: ' || i_node_name into ret_code, ret_note; + select 404, 'No such node: ' || i_node_name into ret_code, ret_note; return; end if; - perform pgq_node.unsubscribe_node(s.set_name, s.node_name) - from pgq_node.subscriber_info s - where set_name = i_node_name; + begin + perform pgq_node.unregister_subscriber(i_queue_name, i_node_name); + exception + when others then + null; + end; + + delete from pgq_node.subscriber_info + where queue_name = i_queue_name + and subscriber_node = i_node_name; + + if _is_local then + -- At the dropped node, remove local state + delete from londiste.table_info + where queue_name = i_queue_name; - delete from pgq_node.completed_tick - where set_name = i_node_name; + delete from londiste.seq_info + where queue_name = i_queue_name; - delete from pgq_node.set_info - where set_name = i_node_name; + delete from londiste.applied_execute + where queue_name = i_queue_name; + + delete from pgq_node.local_state + where queue_name = i_queue_name; + + delete from pgq_node.node_info + where queue_name = i_queue_name + and node_name = i_node_name; + end if; delete from pgq_node.node_location - where set_name = i_node_name; + where queue_name = i_queue_name + and node_name = i_node_name; select 200, 'Node dropped' into ret_code, ret_note; return; diff --git a/sql/pgq_node/functions/pgq_node.unregister_subscriber.sql b/sql/pgq_node/functions/pgq_node.unregister_subscriber.sql index 4a00de02..777607e9 100644 --- a/sql/pgq_node/functions/pgq_node.unregister_subscriber.sql +++ b/sql/pgq_node/functions/pgq_node.unregister_subscriber.sql @@ -11,7 +11,7 @@ returns record as $$ -- Unsubscribe remote node from local node. -- -- Parameters: --- i_node_name - set name +-- i_queue_name - set name -- i_remote_node_name - node name -- -- Returns: diff --git a/sql/pgq_node/structure/functions.sql b/sql/pgq_node/structure/functions.sql index 3c4b73d6..1914d625 100644 --- a/sql/pgq_node/structure/functions.sql +++ b/sql/pgq_node/structure/functions.sql @@ -9,7 +9,7 @@ -- Group: Node operations \i functions/pgq_node.create_node.sql --- \i functions/pgq_node.drop_node.sql +\i functions/pgq_node.drop_node.sql -- \i functions/pgq_node.rename_node.sql \i functions/pgq_node.get_node_info.sql \i functions/pgq_node.is_root_node.sql -- 2.39.5