self.provider_location = res[0]['provider_location']
return self.get_database('provider_db', connstr = self.provider_location)
-
def expand_arg_list(self, db, kind, existing, args, needs_tbl=True):
curs = db.cursor()
else:
return lst_missing
-
allow_nonexist = not needs_tbl
if existing:
res = self.solve_globbing(args, lst_exists, map_exists, map_missing, allow_nonexist)
res = self.solve_globbing(args, lst_missing, map_missing, map_exists, allow_nonexist)
return res
-
def solve_globbing(self, args, full_list, full_map, reverse_map, allow_nonexist):
def glob2regex(s):
s = s.replace('.', '[.]').replace('?', '.').replace('*', '.*')
## NB: not all commands work ##
-"""cascaded queue administration.
+"""Cascaded queue administration.
londiste.py INI pause [NODE [CONS]]
setadm.py INI pause NODE [CONS]
-
"""
import sys, time, optparse, skytools, os.path
class CascadeAdmin(skytools.AdminScript):
- """Cascaded pgq administration."""
+ """Cascaded PgQ administration."""
queue_name = None
queue_info = None
extra_objs = []
[ self.queue_name, node_type, node_name, worker_name,
provider_name, global_watermark, combined_queue ])
-
self.extra_init(node_type, db, provider_db)
if node_attrs:
while 1:
db = self.get_database('root_db', connstr = loc)
-
# query current status
res = self.exec_query(db, "select * from pgq_node.get_node_info(%s)", [self.queue_name])
info = res[0]
except skytools.DBError, d:
self.log.warning("Failed to remove from '%s': %s", n.name, str(d))
-
-
-
def node_depends(self, sub_node, top_node):
cur_node = sub_node
# walk upstream
if info.completed_tick >= last_tick:
return info
-
def takeover_root(self, old_node_name, new_node_name, failover = False):
"""Root switchover."""
node_db.commit()
if len(cons_rows) == 1:
if prov_node:
- raise Exception('Unexcpeted situation: there are two gravestones - on nodes %s and %s' % (prov_node, node_name))
+ raise Exception('Unexpected situation: there are two gravestones - on nodes %s and %s' % (prov_node, node_name))
prov_node = node_name
failover_tick = cons_rows[0]['last_tick']
self.log.info("Found gravestone on node: %s", node_name)
def __init__(self, service_name, db_name, args):
"""Initialize new consumer.
-
+
@param service_name: service_name for DBScript
@param db_name: target database name for get_database()
@param args: cmdline args for DBScript
dst_curs.execute(q, [self.pgq_queue_name])
dst_db.commit()
self.global_wm_publish_time = t
-