Additional features:
* Simplified install. Master "setup" command should setup slave directories.
* Add support for multiple targets on master.
- * Add an optional time based WAL retention parameter, this could be useful if base backups
- are taken from the standby (8.2 only)
* WAL purge does not correctly purge old WAL-s if timelines are involved. The first
useful WAL name is obtained from backup_label, WAL-s in the same timeline that are older
than first useful WAL are removed.
- * xrestore should not attempt to copy the file on disk full condition - this
- will result in recovery failure. Pre 8.2 this means starting from zero.
* Always copy the directory on "restore" add a special "--move" option.
"""
return True
class WalChunk:
+ """Represents a chunk of WAL used in record based shipping"""
def __init__(self,filename,pos=0,bytes=0):
self.filename = filename
self.pos = pos
def __str__(self):
return "%s @ %d +%d" % (self.filename, self.pos, self.bytes)
+class CheckpointInfo:
+ """Checkpoint info parsed from pg_controldata"""
+
+ def __init__(self, pg_controldata, findRestartPoint):
+ """Collect last checkpoint information from pg_controldata output"""
+ self.xlogid = None
+ self.xrecoff = None
+ self.timeline = None
+ self.wal_size = None
+ self.wal_name = None
+ self.is_valid = False
+ matches = 0
+ for line in os.popen(pg_controldata, "r"):
+ if findRestartPoint:
+ m = re.match("^Latest checkpoint's REDO location:\s+([0-9A-F]+)/([0-9A-F]+)", line)
+ else:
+ m = re.match("^Latest checkpoint location:\s+([0-9A-F]+)/([0-9A-F]+)", line)
+ if m:
+ matches += 1
+ self.xlogid = int(m.group(1), 16)
+ self.xrecoff = int(m.group(2), 16)
+ m = re.match("^Latest checkpoint's TimeLineID:\s+(\d+)", line)
+ if m:
+ matches += 1
+ self.timeline = int(m.group(1))
+ m = re.match("^Bytes per WAL segment:\s+(\d+)", line)
+ if m:
+ matches += 1
+ self.wal_size = int(m.group(1))
+
+ if matches == 3:
+ self.wal_name = "%08X%08X%08X" % \
+ (self.timeline, self.xlogid, self.xrecoff / self.wal_size)
+ self.is_valid = True
+
class BackupLabel:
- def __init__(self):
+ """Backup label contents"""
+
+ def __init__(self, backupdir):
+ """Initialize a new BackupLabel from existing file"""
+ filename = os.path.join(backupdir, "backup_label")
self.first_wal = None
self.start_time = None
self.label_string = None
- self.fromslave = None
-
-def get_backup_label(dirname):
- label = BackupLabel()
- filename = os.path.join(dirname, "backup_label")
- if not os.path.exists(filename):
- # perhaps this is a backup taken from slave, try .old suffix
- filename += ".old"
if not os.path.exists(filename):
- return None
- label.fromslave = True
- for line in open(filename):
- m = re.match('^START WAL LOCATION: [^\s]+ \(file ([0-9A-Z]+)\)$', line)
- if m:
- label.first_wal = m.group(1)
- m = re.match('^START TIME:\s(.*)$', line)
- if m:
- label.start_time = m.group(1)
- m = re.match('^LABEL: (.*)$', line)
- if m:
- label.label_string = m.group(1)
- return label
+ return
+ for line in open(filename):
+ m = re.match('^START WAL LOCATION: [^\s]+ \(file ([0-9A-Z]+)\)$', line)
+ if m:
+ self.first_wal = m.group(1)
+ m = re.match('^START TIME:\s(.*)$', line)
+ if m:
+ self.start_time = m.group(1)
+ m = re.match('^LABEL: (.*)$', line)
+ if m:
+ self.label_string = m.group(1)
class WalMgr(skytools.DBScript):
def assert_valid_role(self,role):
if self.wtype != role:
- self.log.warning("Action not available on current node.");
+ self.log.warning("Action not available on current node.")
sys.exit(1)
def pg_start_backup(self, code):
def exec_rsync(self,args,die_on_error=False):
cmdline = [ "rsync", "-a", "--quiet" ]
if self.cf.getint("compression", 0) > 0:
- cmdline.append("-z");
+ cmdline.append("-z")
cmdline += args
cmd = "' '".join(cmdline)
self.log.debug("Execute rsync cmd: '%s'" % (cmd))
if self.not_really:
- return
+ return 0
res = os.spawnvp(os.P_WAIT, cmdline[0], cmdline)
if res == 24:
self.log.info("Some files vanished, but thats OK")
3. Wait for WAL apply to complete (look at PROGRESS file)
4. Rotate old backups
5. Copy data directory to data.master
- 6. Purge unneeded WAL-s
- 7. Resume WAL apply
- 8. Release backup lock
+ 6. Create backup label and history file.
+ 7. Purge unneeded WAL-s
+ 8. Resume WAL apply
+ 9. Release backup lock
"""
self.assert_valid_role(SLAVE)
if self.slave_lock_backups() != 0:
self.slave_rotate_backups()
src = self.cf.get("slave_data")
dst = self.cf.get("full_backup")
+
+ start_time = time.localtime()
cmdline = ["cp", "-a", src, dst ]
self.log.info("Executing %s" % " ".join(cmdline))
if not self.not_really:
self.exec_cmd(cmdline)
+ stop_time = time.localtime()
+
+ # Obtain the last restart point information
+ slave_bin = self.cf.get("slave_bin", "")
+ pg_controldata = "%s %s" % (os.path.join(slave_bin, "pg_controldata"), dst)
+ rp = CheckpointInfo(pg_controldata, False)
+
+ # TODO: The newly created backup directory probably still contains
+ # backup_label.old and recovery.conf files. Remove these.
+
+ if not rp.is_valid:
+ self.log.warning("Unable to determine last restart point, backup_label not created.")
+ else:
+ # Write backup label and history file
+
+ backup_label = \
+"""START WAL LOCATION: %(xlogid)X/%(xrecoff)X (file %(wal_name)s)
+CHECKPOINT LOCATION: %(xlogid)X/%(xrecoff)X
+START TIME: %(start_time)s
+LABEL: SlaveBackup"
+"""
+ backup_history = \
+"""START WAL LOCATION: %(xlogid)X/%(xrecoff)X (file %(wal_name)s)
+STOP WAL LOCATION: %(xlogid)X/%(xrecoff)X (file %(wal_name)s)
+CHECKPOINT LOCATION: %(xlogid)X/%(xrecoff)X
+START TIME: %(start_time)s
+LABEL: SlaveBackup"
+STOP TIME: %(stop_time)s
+"""
+
+ label_params = {
+ "xlogid": rp.xlogid,
+ "xrecoff": rp.xrecoff,
+ "wal_name": rp.wal_name,
+ "start_time": time.strftime("%Y-%m-%d %H:%M:%S %Z", start_time),
+ "stop_time": time.strftime("%Y-%m-%d %H:%M:%S %Z", stop_time),
+ }
+
+ # Write the label
+ filename = os.path.join(dst, "backup_label")
+ if self.not_really:
+ self.log.info("Writing backup label to %s" % filename)
+ else:
+ lf = open(filename, "w")
+ lf.write(backup_label % label_params)
+ lf.close()
+
+ # Now the history
+ histfile = "%s.%08X.backup" % (rp.wal_name, rp.xrecoff % rp.wal_size)
+ completed_wals = self.cf.get("completed_wals")
+ filename = os.path.join(completed_wals, histfile)
+ if os.path.exists(filename):
+ self.log.warning("%s: already exists, refusing to overwrite." % filename)
+ else:
+ if self.not_really:
+ self.log.info("Writing backup history to %s" % filename)
+ else:
+ lf = open(filename, "w")
+ lf.write(backup_history % label_params)
+ lf.close()
+
self.slave_purge_wals()
finally:
self.slave_continue()
os._exit(0)
chunk.sync_time += (time.time() - syncstart)
- status = os.waitpid(childpid, 0);
+ status = os.waitpid(childpid, 0)
rc = os.WEXITSTATUS(status[1])
if rc == 0:
log = daemon_mode and self.log.debug or self.log.info
self.exec_cmd(cmdline)
if self.cf.getint("keep_backups", 0) == 0:
- # cleanup only if we don't keep backup history.
- # historic WAL files are removed during backup rotation
+ # cleanup only if we don't keep backup history, keep the files needed
+ # to roll forward from last restart point. historic WAL files are
+ # removed during backup rotation
+ walname = self.last_restart_point(srcname)
self.log.debug("%s: copy done, cleanup" % srcname)
- self.slave_cleanup(srcname)
+ self.slave_cleanup(walname)
if os.path.isfile(partfile) and not srcfile == partfile:
# Remove any partial files after restore. Only leave the partial if
backups = self.get_backup_list(self.cf.get("full_backup"))
if backups:
print "\nList of backups:\n"
- print "%-15s %-24s %-10s %-24s" % \
+ print "%-15s %-24s %-11s %-24s" % \
("Backup set", "Timestamp", "Label", "First WAL")
- print "%s %s %s %s" % (15*'-', 24*'-', 10*'-',24*'-')
+ print "%s %s %s %s" % (15*'-', 24*'-', 11*'-',24*'-')
for backup in backups:
- lbl = get_backup_label(backup)
- print "%-15s %-24.24s %-10.10s %-24s%s" % \
- (os.path.basename(backup), lbl.start_time, lbl.label_string,
- lbl.first_wal, lbl.fromslave and "*" or "")
+ lbl = BackupLabel(backup)
+ print "%-15s %-24.24s %-11.11s %-24s" % \
+ (os.path.basename(backup), lbl.start_time,
+ lbl.label_string, lbl.first_wal)
print
else:
print "\nNo backups found.\n"
def get_first_walname(self,backupdir):
"""Returns the name of the first needed WAL segment for backupset"""
- label = get_backup_label(backupdir)
+ label = BackupLabel(backupdir)
if not label.first_wal:
self.log.error("WAL name not found at %s" % backupdir)
return None
return label.first_wal
+ def last_restart_point(self,walname):
+ """
+ Determine the WAL file of the last restart point (recovery checkpoint).
+ For 8.3 this could be done with %r parameter to restore_command, for 8.2
+ we need to consult control file (parse pg_controldata output).
+ """
+ slave_data = self.cf.get("slave_data")
+ backup_label = os.path.join(slave_data, "backup_label")
+ if os.path.exists(backup_label):
+ # Label file still exists, use it for determining the restart point
+ lbl = BackupLabel(slave_data)
+ self.log.debug("Last restart point from backup_label: %s" % lbl.first_wal)
+ return lbl.first_wal
+
+ slave_bin = self.cf.get("slave_bin", "")
+ pg_controldata = "%s %s" % (os.path.join(slave_bin, "pg_controldata"), ".")
+
+ rp = CheckpointInfo(pg_controldata, True)
+ if not rp.is_valid:
+ # No restart point information, use the given wal name
+ self.log.warning("Unable to determine last restart point")
+ return walname
+
+ self.log.debug("Last restart point: %s" % rp.wal_name)
+ return rp.wal_name
+
def order_backupdirs(self,prefix,a,b):
"""Compare the backup directory indexes numerically"""
prefix = os.path.abspath(prefix)
completed_wals = self.cf.get("completed_wals")
partial_wals = self.cf.get("partial_wals")
- self.log.debug("cleaning completed wals since %s" % last_applied)
+ self.log.debug("cleaning completed wals before %s" % last_applied)
last = self.del_wals(completed_wals, last_applied)
if last:
if os.path.isdir(partial_wals):
- self.log.debug("cleaning partial wals since %s" % last)
+ self.log.debug("cleaning partial wals before %s" % last)
self.del_wals(partial_wals, last)
else:
self.log.warning("partial_wals dir does not exist: %s"