def __init__(self, args):
- if len(args) > 0:
- # hack to determine the role of the node
- cf = ConfigParser.ConfigParser()
- cf.read(args[0])
- for (self.wtype, self.service_name) in [ (MASTER, "wal-master"), (SLAVE, "wal-slave") ]:
- if cf.has_section(self.service_name):
- break
+ if len(args) < 2:
+ # need at least config file and command
+ usage(1)
+
+ # determine the role of the node from provided configuration
+ cf = ConfigParser.ConfigParser()
+ cf.read(args[0])
+ for (self.wtype, self.service_name) in [ (MASTER, "wal-master"), (SLAVE, "wal-slave") ]:
+ if cf.has_section(self.service_name):
+ break
+ else:
+ print >> sys.stderr, "Invalid config file: %s" % args[0]
+ sys.exit(1)
skytools.DBScript.__init__(self, self.service_name, args)
self.set_single_loop(1)
if len(self.args) < 2:
usage(1)
+ self.cfgfile = self.args[0]
self.cmd = self.args[1]
self.args = self.args[2:]
self.script = os.path.abspath(sys.argv[0])
except:
self.log.fatal("Cannot write to %s" % fn)
+ def authfile_name(self):
+ return os.path.join(self.cf.get("master_data"), os.path.join("global", "pg_auth"))
+
def master_stop(self):
+ """Deconfigure archiving, attempt to stop syncdaemon"""
self.assert_valid_role(MASTER)
self.log.info("Disabling WAL archiving")
self.master_configure_archiving('')
+ # stop any running syncdaemons
+ pidfile = self.cf.get("pidfile")
+ if os.path.exists(pidfile):
+ self.log.info('Pidfile %s exists, attempting to stop syncdaemon.' % pidfile)
+ self.exec_cmd([self.script, self.cfgfile, "syncdaemon", "-s"])
+ self.log.info("Done")
+
def master_configure_archiving(self, cf_val):
cf_file = self.cf.get("master_config")
data_dir = self.cf.get("master_data")
self.log.info("Sending SIGHUP to postmaster")
self.signal_postmaster(data_dir, signal.SIGHUP)
- self.log.info("Done")
def change_config(self, cf_file, buf):
cf_old = cf_file + ".old"
def remote_walmgr(self, command, stdin_disabled = True):
"""Pass a command to slave WalManager"""
- slave = self.cf.get("slave")
- slave_config = self.cf.get("slave_config", "")
- tmp = slave.split(":", 1)
sshopt = "-T"
if stdin_disabled:
sshopt += "n"
- cmdline = None
+ slave_config = self.cf.get("slave_config")
+ if not slave_config:
+ raise Exception("slave_config not specified in %s" % self.cfgfile)
- if len(tmp) < 2:
- raise Exception("cannot find slave hostname")
- else:
- host, path = tmp
- cmdline = [ "ssh", sshopt, host, self.script ]
+ try:
+ slave = self.cf.get("slave")
+ host, path = slave.split(":", 1)
+ except:
+ raise Exception("invalid value for 'slave' in %s" % self.cfgfile)
- if slave_config:
- cmdline += [ slave_config ]
- cmdline += [ command ]
+ cmdline = [ "ssh", sshopt, host, self.script, slave_config, command ]
if self.not_really:
self.log.info("remote_walmgr: %s" % command)
self.master_configure_archiving(cf_val)
# ask slave to init
self.remote_walmgr("setup")
+ self.log.info("Done")
else:
# create slave directory structure
def mkdir(dir):
self.exec_big_rsync(cmdline)
self.remote_walmgr("xpurgewals")
- except:
+ except Exception, e:
+ self.log.error(e)
errors = True
try:
dst_loc += "/"
# copy data
- self.exec_rsync([ srcpath, dst_loc ], True)
+ self.exec_rsync([ srcpath, self.authfile_name(), dst_loc ], True)
self.log.debug("%s: done", srcname)
end_time = time.time()
use_xlog_functions = self.cf.getint("use_xlog_functions", False)
data_dir = self.cf.get("master_data")
xlog_dir = os.path.join(data_dir, "pg_xlog")
- dst_loc = self.cf.get("partial_wals")
- if dst_loc[-1] != "/":
- dst_loc += "/"
+
+ auth_loc = os.path.join(self.cf.get("completed_wals"), "")
+ dst_loc = os.path.join(self.cf.get("partial_wals"), "")
+
+ # sync the auth file
+ if not daemon_mode:
+ # avoid the extra rsync in daemon mode - the file is fairly static, so we
+ # don't need to sync it every N seconds.
+ if self.exec_rsync([self.authfile_name(), auth_loc]) != 0:
+ self.log.warning('Cannot sync auth file')
db = None
if use_xlog_functions:
self.assert_valid_role(SLAVE)
srcdir = self.cf.get("completed_wals")
+ datadir = self.cf.get("slave_data")
stopfile = os.path.join(srcdir, "STOP")
+ src_authfile = os.path.join(srcdir, "pg_auth")
+ dst_authfile = os.path.join(datadir, os.path.join("global", "pg_auth"))
+
if self.not_really:
self.log.info("Writing STOP file: %s" % stopfile)
else:
open(stopfile, "w").write("1")
self.log.info("Stopping recovery mode")
+ if os.path.isfile(src_authfile):
+ self.log.debug("Using pg_auth file from master.")
+ try:
+ os.rename(dst_authfile, "%s.old" % dst_authfile)
+ self.exec_cmd(["cp", src_authfile, dst_authfile])
+ except Exception, e:
+ self.log.warning("Unable to restore pg_auth file: %s" % e)
+
def slave_pause(self, waitcomplete=0):
"""Pause the WAL apply, wait until last file applied if needed"""
self.assert_valid_role(SLAVE)