WalMgr changes:
authorMartin Pihlak <martin.pihlak@gmail.com>
Fri, 28 Nov 2008 13:27:38 +0000 (13:27 +0000)
committerMartin Pihlak <martin.pihlak@gmail.com>
Fri, 28 Nov 2008 13:27:38 +0000 (13:27 +0000)
- Added recovery restart point handling, retain WAL files since the last
  restart point. Additional slave config parameter "slave_bin" for specifying
  location for postgres binaries.
- Slave backup now creates backup_label and history files.

doc/walmgr.txt
python/conf/wal-slave.ini
python/walmgr.py

index 4685a233eaa5a1148b439652483f9e8a148c5dcf..5de754606d0c9924f669e227cc3eb8ec4d64e9ce 100644 (file)
@@ -222,7 +222,11 @@ Script to stop postmaster on slave.
 Script to start postmaster on slave.
 
 ==== slave ====
-Base directory for slave files (logs.comlete, data.master etc)
+Base directory for slave files (logs.complete, data.master etc)
+
+==== slave_bin ====
+Specifies the location of postgres binaries (pg_controldata, etc). Needed if
+they are not already in the PATH.
 
 ==== completed_wals ====
 Directory where complete WAL files are stored. Also miscellaneous control files
index e83bc57d245405e4e039a19e2aefd4c59603ac39..0c8d7d1db62d98fb9918b1d7f5fa18bde487726c 100644 (file)
@@ -4,6 +4,7 @@ logfile              = ~/log/wal-slave.log
 use_skylog           = 1
 
 slave_data           = /var/lib/postgresql/8.2/main
+slave_bin            = /usr/lib/postgresql/8.2/bin
 slave_stop_cmd       = /etc/init.d/postgresql-8.2 stop
 slave_start_cmd      = /etc/init.d/postgresql-8.2 start
 
index d3385d14547fbc44e4f6756c48ca96216d3cfc8a..67ce9927453f3a0f4c35f26c52f276863b992491 100755 (executable)
@@ -41,13 +41,9 @@ Switches:
 Additional features:
  * Simplified install. Master "setup" command should setup slave directories.
  * Add support for multiple targets on master.
- * Add an optional time based WAL retention parameter, this could be useful if base backups
- are taken from the standby (8.2 only)
  * WAL purge does not correctly purge old WAL-s if timelines are involved. The first
  useful WAL name is obtained from backup_label, WAL-s in the same timeline that are older 
  than first useful WAL are removed. 
- * xrestore should not attempt to copy the file on disk full condition - this
- will result in recovery failure. Pre 8.2 this means starting from zero.
  * Always copy the directory on "restore" add a special "--move" option.
 """
 
@@ -103,6 +99,7 @@ def copy_conf(src, dst):
     return True
 
 class WalChunk:
+    """Represents a chunk of WAL used in record based shipping"""
     def __init__(self,filename,pos=0,bytes=0):
         self.filename = filename
         self.pos = pos
@@ -114,33 +111,62 @@ class WalChunk:
     def __str__(self):
         return "%s @ %d +%d" % (self.filename, self.pos, self.bytes)
 
+class CheckpointInfo:
+    """Checkpoint info parsed from pg_controldata"""
+
+    def __init__(self, pg_controldata, findRestartPoint):
+        """Collect last checkpoint information from pg_controldata output"""
+        self.xlogid = None
+        self.xrecoff = None
+        self.timeline = None
+        self.wal_size = None
+        self.wal_name = None
+        self.is_valid = False
+        matches = 0
+        for line in os.popen(pg_controldata, "r"):
+            if findRestartPoint:
+                m = re.match("^Latest checkpoint's REDO location:\s+([0-9A-F]+)/([0-9A-F]+)", line)
+            else:
+                m = re.match("^Latest checkpoint location:\s+([0-9A-F]+)/([0-9A-F]+)", line)
+            if m:
+                matches += 1
+                self.xlogid = int(m.group(1), 16)
+                self.xrecoff = int(m.group(2), 16)
+            m = re.match("^Latest checkpoint's TimeLineID:\s+(\d+)", line)
+            if m:
+                matches += 1
+                self.timeline = int(m.group(1))
+            m = re.match("^Bytes per WAL segment:\s+(\d+)", line)
+            if m:
+                matches += 1
+                self.wal_size = int(m.group(1))
+
+        if matches == 3:
+            self.wal_name = "%08X%08X%08X" % \
+                (self.timeline, self.xlogid, self.xrecoff / self.wal_size)
+            self.is_valid = True
+
 class BackupLabel:
-    def __init__(self):
+    """Backup label contents"""
+
+    def __init__(self, backupdir):
+        """Initialize a new BackupLabel from existing file"""
+        filename = os.path.join(backupdir, "backup_label")
         self.first_wal = None
         self.start_time = None
         self.label_string = None
-        self.fromslave = None
-
-def get_backup_label(dirname):
-    label = BackupLabel()
-    filename = os.path.join(dirname, "backup_label")
-    if not os.path.exists(filename):
-        # perhaps this is a backup taken from slave, try .old suffix
-        filename += ".old"
         if not os.path.exists(filename):
-            return None
-        label.fromslave = True
-    for line in open(filename):
-        m = re.match('^START WAL LOCATION: [^\s]+ \(file ([0-9A-Z]+)\)$', line)
-        if m:
-            label.first_wal = m.group(1)
-        m = re.match('^START TIME:\s(.*)$', line)
-        if m:
-            label.start_time = m.group(1)
-        m = re.match('^LABEL: (.*)$', line)
-        if m:
-            label.label_string = m.group(1)
-    return label
+            return
+        for line in open(filename):
+            m = re.match('^START WAL LOCATION: [^\s]+ \(file ([0-9A-Z]+)\)$', line)
+            if m:
+                self.first_wal = m.group(1)
+            m = re.match('^START TIME:\s(.*)$', line)
+            if m:
+                self.start_time = m.group(1)
+            m = re.match('^LABEL: (.*)$', line)
+            if m:
+                self.label_string = m.group(1)
 
 class WalMgr(skytools.DBScript):
 
@@ -213,7 +239,7 @@ class WalMgr(skytools.DBScript):
 
     def assert_valid_role(self,role):
         if self.wtype != role:
-            self.log.warning("Action not available on current node.");
+            self.log.warning("Action not available on current node.")
             sys.exit(1)
 
     def pg_start_backup(self, code):
@@ -263,13 +289,13 @@ class WalMgr(skytools.DBScript):
     def exec_rsync(self,args,die_on_error=False):
         cmdline = [ "rsync", "-a", "--quiet" ]
         if self.cf.getint("compression", 0) > 0:
-            cmdline.append("-z");
+            cmdline.append("-z")
         cmdline += args
 
         cmd = "' '".join(cmdline)
         self.log.debug("Execute rsync cmd: '%s'" % (cmd))
         if self.not_really:
-            return
+            return 0
         res = os.spawnvp(os.P_WAIT, cmdline[0], cmdline)
         if res == 24:
             self.log.info("Some files vanished, but thats OK")
@@ -604,9 +630,10 @@ class WalMgr(skytools.DBScript):
         3. Wait for WAL apply to complete (look at PROGRESS file)
         4. Rotate old backups
         5. Copy data directory to data.master
-        6. Purge unneeded WAL-s
-        7. Resume WAL apply
-        8. Release backup lock
+        6. Create backup label and history file.
+        7. Purge unneeded WAL-s
+        8. Resume WAL apply
+        9. Release backup lock
         """
         self.assert_valid_role(SLAVE)
         if self.slave_lock_backups() != 0:
@@ -620,10 +647,73 @@ class WalMgr(skytools.DBScript):
                 self.slave_rotate_backups()
                 src = self.cf.get("slave_data")
                 dst = self.cf.get("full_backup")
+
+                start_time = time.localtime()
                 cmdline = ["cp", "-a", src, dst ]
                 self.log.info("Executing %s" % " ".join(cmdline))
                 if not self.not_really:
                     self.exec_cmd(cmdline)
+                stop_time = time.localtime()
+
+                # Obtain the last restart point information
+                slave_bin = self.cf.get("slave_bin", "")
+                pg_controldata = "%s %s" % (os.path.join(slave_bin, "pg_controldata"), dst)
+                rp = CheckpointInfo(pg_controldata, False)
+
+                # TODO: The newly created backup directory probably still contains
+                # backup_label.old and recovery.conf files. Remove these.
+
+                if not rp.is_valid:
+                    self.log.warning("Unable to determine last restart point, backup_label not created.")
+                else:
+                    # Write backup label and history file
+                    
+                    backup_label = \
+"""START WAL LOCATION: %(xlogid)X/%(xrecoff)X (file %(wal_name)s)
+CHECKPOINT LOCATION: %(xlogid)X/%(xrecoff)X
+START TIME: %(start_time)s
+LABEL: SlaveBackup"
+"""
+                    backup_history = \
+"""START WAL LOCATION: %(xlogid)X/%(xrecoff)X (file %(wal_name)s)
+STOP WAL LOCATION: %(xlogid)X/%(xrecoff)X (file %(wal_name)s)
+CHECKPOINT LOCATION: %(xlogid)X/%(xrecoff)X
+START TIME: %(start_time)s
+LABEL: SlaveBackup"
+STOP TIME: %(stop_time)s
+"""
+
+                    label_params = {
+                        "xlogid":       rp.xlogid,
+                        "xrecoff":      rp.xrecoff,
+                        "wal_name":     rp.wal_name,
+                        "start_time":   time.strftime("%Y-%m-%d %H:%M:%S %Z", start_time),
+                        "stop_time":    time.strftime("%Y-%m-%d %H:%M:%S %Z", stop_time),
+                    }
+
+                    # Write the label
+                    filename = os.path.join(dst, "backup_label")
+                    if self.not_really:
+                        self.log.info("Writing backup label to %s" % filename)
+                    else:
+                        lf = open(filename, "w")
+                        lf.write(backup_label % label_params)
+                        lf.close()
+
+                    # Now the history
+                    histfile = "%s.%08X.backup" % (rp.wal_name, rp.xrecoff % rp.wal_size)
+                    completed_wals = self.cf.get("completed_wals")
+                    filename = os.path.join(completed_wals, histfile)
+                    if os.path.exists(filename):
+                        self.log.warning("%s: already exists, refusing to overwrite." % filename)
+                    else:
+                        if self.not_really:
+                            self.log.info("Writing backup history to %s" % filename)
+                        else:
+                            lf = open(filename, "w")
+                            lf.write(backup_history % label_params)
+                            lf.close()
+
                 self.slave_purge_wals()
             finally:
                 self.slave_continue()
@@ -736,7 +826,7 @@ class WalMgr(skytools.DBScript):
             os._exit(0)
         chunk.sync_time += (time.time() - syncstart)
 
-        status = os.waitpid(childpid, 0);
+        status = os.waitpid(childpid, 0)
         rc = os.WEXITSTATUS(status[1]) 
         if rc == 0:
             log = daemon_mode and self.log.debug or self.log.info
@@ -950,10 +1040,12 @@ class WalMgr(skytools.DBScript):
         self.exec_cmd(cmdline)
 
         if self.cf.getint("keep_backups", 0) == 0:
-            # cleanup only if we don't keep backup history.
-            # historic WAL files are removed during backup rotation
+            # cleanup only if we don't keep backup history, keep the files needed
+            # to roll forward from last restart point. historic WAL files are
+            # removed during backup rotation
+            walname = self.last_restart_point(srcname)
             self.log.debug("%s: copy done, cleanup" % srcname)
-            self.slave_cleanup(srcname)
+            self.slave_cleanup(walname)
 
         if os.path.isfile(partfile) and not srcfile == partfile:
             # Remove any partial files after restore. Only leave the partial if
@@ -1225,26 +1317,52 @@ class WalMgr(skytools.DBScript):
             backups = self.get_backup_list(self.cf.get("full_backup"))
             if backups:
                 print "\nList of backups:\n"
-                print "%-15s %-24s %-10s %-24s" % \
+                print "%-15s %-24s %-11s %-24s" % \
                     ("Backup set", "Timestamp", "Label", "First WAL")
-                print "%s %s %s %s" % (15*'-', 24*'-', 10*'-',24*'-')
+                print "%s %s %s %s" % (15*'-', 24*'-', 11*'-',24*'-')
                 for backup in backups:
-                    lbl = get_backup_label(backup)
-                    print "%-15s %-24.24s %-10.10s %-24s%s" % \
-                        (os.path.basename(backup), lbl.start_time, lbl.label_string,
-                        lbl.first_wal, lbl.fromslave and "*" or "")
+                    lbl = BackupLabel(backup)
+                    print "%-15s %-24.24s %-11.11s %-24s" % \
+                        (os.path.basename(backup), lbl.start_time,
+                        lbl.label_string, lbl.first_wal)
                 print
             else:
                 print "\nNo backups found.\n"
 
     def get_first_walname(self,backupdir):
         """Returns the name of the first needed WAL segment for backupset"""
-        label = get_backup_label(backupdir)
+        label = BackupLabel(backupdir)
         if not label.first_wal:
             self.log.error("WAL name not found at %s" % backupdir)
             return None
         return label.first_wal
 
+    def last_restart_point(self,walname):
+        """
+        Determine the WAL file of the last restart point (recovery checkpoint).
+        For 8.3 this could be done with %r parameter to restore_command, for 8.2
+        we need to consult control file (parse pg_controldata output).
+        """
+        slave_data = self.cf.get("slave_data")
+        backup_label = os.path.join(slave_data, "backup_label")
+        if os.path.exists(backup_label):
+            # Label file still exists, use it for determining the restart point
+            lbl = BackupLabel(slave_data)
+            self.log.debug("Last restart point from backup_label: %s" % lbl.first_wal)
+            return lbl.first_wal
+
+        slave_bin = self.cf.get("slave_bin", "")
+        pg_controldata = "%s %s" % (os.path.join(slave_bin, "pg_controldata"), ".")
+
+        rp = CheckpointInfo(pg_controldata, True)
+        if not rp.is_valid:
+            # No restart point information, use the given wal name
+            self.log.warning("Unable to determine last restart point")
+            return walname
+
+        self.log.debug("Last restart point: %s" % rp.wal_name)
+        return rp.wal_name
+
     def order_backupdirs(self,prefix,a,b):
         """Compare the backup directory indexes numerically"""
         prefix = os.path.abspath(prefix)
@@ -1337,11 +1455,11 @@ class WalMgr(skytools.DBScript):
         completed_wals = self.cf.get("completed_wals")
         partial_wals = self.cf.get("partial_wals")
 
-        self.log.debug("cleaning completed wals since %s" % last_applied)
+        self.log.debug("cleaning completed wals before %s" % last_applied)
         last = self.del_wals(completed_wals, last_applied)
         if last:
             if os.path.isdir(partial_wals):
-                self.log.debug("cleaning partial wals since %s" % last)
+                self.log.debug("cleaning partial wals before %s" % last)
                 self.del_wals(partial_wals, last)
             else:
                 self.log.warning("partial_wals dir does not exist: %s"