Sync walmgr from internal repo:
authorMarko Kreen <markokr@gmail.com>
Tue, 18 Sep 2007 10:49:17 +0000 (10:49 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 18 Sep 2007 10:49:17 +0000 (10:49 +0000)
- added "compression" option
- syncdaemon will retry on rsync errors
- syncdaemon adds -n to ssh options

By Martin Pihlak

python/walmgr.py

index cb7b735069760a416dc5738fd8e0b6dc4ef85559..01e06099d3ea2f3c9bf4687449b678456504110e 100755 (executable)
@@ -39,12 +39,10 @@ Switches:
 
 """
 Additional features:
+ * Simplified install. Master "setup" command should setup slave directories.
  * Add support for multiple targets on master.
  * Add an optional time based WAL retention parameter, this could be useful if base backups
  are taken from the standby (8.2 only)
- * On 8.2 ship all WAL-s in syncdaemon mode, use archive_command only for validating the 
- assembled log and filling gaps where necessary.
- * Allow backup label to be specified by user.
  * WAL purge does not correctly purge old WAL-s if timelines are involved. The first
  useful WAL name is obtained from backup_label, WAL-s in the same timeline that are older 
  than first useful WAL are removed. 
@@ -157,6 +155,7 @@ class WalMgr(skytools.DBScript):
             usage(1)
         self.cmd = self.args[1]
         self.args = self.args[2:]
+        self.script = os.path.abspath(sys.argv[0])
 
         cmdtab = {
             'setup':        self.walmgr_setup,
@@ -229,16 +228,29 @@ class WalMgr(skytools.DBScript):
         if not self.not_really:
             os.kill(pid, sgn)
 
-    def exec_big_rsync(self, cmdline):
+    def exec_rsync(self,args,die_on_error=False):
+        cmdline = [ "rsync", "-a", "--quiet" ]
+        if self.cf.getint("compression", 0) > 0:
+            cmdline.append("-z");
+        cmdline += args
+
         cmd = "' '".join(cmdline)
-        self.log.debug("Execute big rsync cmd: '%s'" % (cmd))
+        self.log.debug("Execute rsync cmd: '%s'" % (cmd))
         if self.not_really:
             return
         res = os.spawnvp(os.P_WAIT, cmdline[0], cmdline)
         if res == 24:
             self.log.info("Some files vanished, but thats OK")
+            res = 0
         elif res != 0:
-            self.log.fatal("exec failed, res=%d" % res)
+            self.log.fatal("rsync exec failed, res=%d" % res)
+            if die_on_error:
+                sys.exit(1)
+        return res
+
+    def exec_big_rsync(self, args):
+        if self.exec_rsync(args) != 0:
+            self.log.fatal("Big rsync failed")
             self.pg_stop_backup()
             sys.exit(1)
 
@@ -289,28 +301,6 @@ class WalMgr(skytools.DBScript):
         except:
             self.log.fatal("Cannot write to %s" % fn)
 
-    def walmgr_setup(self):
-        if self.wtype == MASTER:
-            self.log.info("Configuring WAL archiving")
-
-            script = os.path.abspath(sys.argv[0])
-            cf_file = os.path.abspath(self.cf.filename)
-            cf_val = "%s %s %s" % (script, cf_file, "xarchive %p %f")
-
-            self.master_configure_archiving(cf_val)
-            # ask slave to init
-            self.remote_walmgr("setup")
-        else:
-            # create slave directory structure
-            def mkdir(dir):
-                if not os.path.exists(dir):
-                    self.log.debug("Creating directory %s" % dir)
-                    os.mkdir(dir)
-            mkdir(self.cf.get("slave"))
-            mkdir(self.cf.get("completed_wals"))
-            mkdir(self.cf.get("partial_wals"))
-            mkdir(self.cf.get("full_backup"))
-
     def master_stop(self):
         self.assert_valid_role(MASTER)
         self.log.info("Disabling WAL archiving")
@@ -387,19 +377,23 @@ class WalMgr(skytools.DBScript):
             cmdline = ["ssh", "-nT", host, "mkdir", "-p", path]
             self.exec_cmd(cmdline)
 
-    def remote_walmgr(self, command):
+    def remote_walmgr(self, command, stdin_disabled = True):
         """Pass a command to slave WalManager"""
         slave = self.cf.get("slave")
         slave_config = self.cf.get("slave_config", "")
         tmp = slave.split(":", 1)
 
+        sshopt = "-T"
+        if stdin_disabled:
+            sshopt += "n"
+
         cmdline = None
 
         if len(tmp) < 2:
             raise Exception("cannot find slave hostname")
         else:
             host, path = tmp
-            cmdline = [ "ssh", "-T", host, os.path.abspath(sys.argv[0]) ]
+            cmdline = [ "ssh", sshopt, host, self.script  ]
 
         if slave_config:
             cmdline += [ slave_config ]
@@ -410,6 +404,27 @@ class WalMgr(skytools.DBScript):
         else:
             self.exec_cmd(cmdline)
 
+    def walmgr_setup(self):
+        if self.wtype == MASTER:
+            self.log.info("Configuring WAL archiving")
+
+            cf_file = os.path.abspath(self.cf.filename)
+            cf_val = "%s %s %s" % (self.script, cf_file, "xarchive %p %f")
+
+            self.master_configure_archiving(cf_val)
+            # ask slave to init
+            self.remote_walmgr("setup")
+        else:
+            # create slave directory structure
+            def mkdir(dir):
+                if not os.path.exists(dir):
+                    self.log.debug("Creating directory %s" % dir)
+                    os.mkdir(dir)
+            mkdir(self.cf.get("slave"))
+            mkdir(self.cf.get("completed_wals"))
+            mkdir(self.cf.get("partial_wals"))
+            mkdir(self.cf.get("full_backup"))
+
     def master_periodic(self):
         """
         Run periodic command on master node. 
@@ -457,6 +472,7 @@ class WalMgr(skytools.DBScript):
         """
 
         self.remote_walmgr("xlock")
+        errors = False
 
         try:
             self.pg_start_backup("FullBackup")
@@ -472,17 +488,16 @@ class WalMgr(skytools.DBScript):
 
             # copy data
             self.chdir(data_dir)
-            cmdline = ["rsync", "-a", "--delete",
+            cmdline = [
+                    "--delete",
                     "--exclude", ".*",
                     "--exclude", "*.pid",
                     "--exclude", "*.opts",
                     "--exclude", "*.conf",
-                    "--exclude", "*.conf.*",
                     "--exclude", "pg_xlog",
                     "--exclude", "pg_tblspc",
                     "--exclude", "pg_log/*",
                     "--copy-unsafe-links",
-                    "--quiet",
                     ".", dst_loc]
             self.exec_big_rsync(cmdline)
 
@@ -508,16 +523,12 @@ class WalMgr(skytools.DBScript):
                     except Exception, det:
                         self.log.warning("Broken link:" + str(det))
                         continue
-                    cmdline = ["rsync", "-a", "--delete",
-                                        "--exclude", ".*",
-                                        "--quiet",
-                                        "--copy-unsafe-links",
-                                        ".", dstfn]
+                    cmdline = [ "--delete", "--exclude", ".*", "--copy-unsafe-links", ".", dstfn]
                     self.exec_big_rsync(cmdline)
 
             # copy pg_xlog
             self.chdir(data_dir)
-            cmdline = ["rsync", "-a", "--quiet",
+            cmdline = [
                 "--exclude", "*.done",
                 "--exclude", "*.backup",
                 "--copy-unsafe-links",
@@ -525,13 +536,23 @@ class WalMgr(skytools.DBScript):
             self.exec_big_rsync(cmdline)
 
             self.remote_walmgr("xpurgewals")
-        finally:
-            try:
-                self.pg_stop_backup()
-            except:
-                pass
+        except:
+            errors = True
+
+        try:
+            self.pg_stop_backup()
+        except:
+            pass
+
+        try:
             self.remote_walmgr("xrelease")
-        self.log.info("Full backup successful")
+        except:
+            pass
+
+        if not errors:
+            self.log.info("Full backup successful")
+        else:
+            self.log.error("Full backup failed.")
 
     def slave_backup(self):
         """
@@ -595,8 +616,7 @@ class WalMgr(skytools.DBScript):
             dst_loc += "/"
 
         # copy data
-        cmdline = ["rsync", "--quiet", "-t", srcpath, dst_loc]
-        self.exec_cmd(cmdline)
+        self.exec_rsync([ srcpath, dst_loc ], True)
 
         self.log.debug("%s: done", srcname)
         end_time = time.time()
@@ -669,7 +689,7 @@ class WalMgr(skytools.DBScript):
         if childpid == 0:
             os.dup2(xlog.fileno(), sys.stdin.fileno())
             try:
-                self.remote_walmgr("xpartialsync %s %d %d" % (chunk.filename, chunk.pos, chunk.bytes))
+                self.remote_walmgr("xpartialsync %s %d %d" % (chunk.filename, chunk.pos, chunk.bytes), False)
             except:
                 os._exit(1)
             os._exit(0)
@@ -730,7 +750,7 @@ class WalMgr(skytools.DBScript):
             if not self.walchunk or self.walchunk.filename != file_name:
                 # Switched to new WAL segment. Don't bother to copy the last bits - it
                 # will be obsoleted by the archive_command.
-                if self.walchunk:
+                if self.walchunk and self.walchunk.sync_count > 0:
                     self.log.info("Switched in %d seconds, %f sec in %d interim syncs, avg %f"
                         % (time.time() - self.walchunk.start_time,
                         self.walchunk.sync_time,
@@ -774,9 +794,11 @@ class WalMgr(skytools.DBScript):
                 # got interesting WAL
                 xlog = os.path.join(xlog_dir, fn)
                 # copy data
-                cmdline = ["rsync", "--quiet", "-t", xlog, dst_loc]
-                self.exec_cmd(cmdline)
-            self.log.info("Partial copy done")
+                if self.exec_rsync([xlog, dst_loc]) != 0:
+                    self.log.error('Cannot sync %s' % xlog)
+                    break
+            else:
+                self.log.info("Partial copy done")
 
     def xrestore(self):
         if len(self.args) < 2:
@@ -815,9 +837,7 @@ class WalMgr(skytools.DBScript):
         for src in paths:
             self.log.debug("Looking in %s" % src)
             srcfile = os.path.join(src, srcname)
-            cmdline = [ "rsync", "--quiet", "-t", srcfile, dstpath ]
-            res = os.spawnvp(os.P_WAIT, cmdline[0], cmdline)
-            if res == 0:
+            if self.exec_rsync([srcfile, dstpath]) == 0:
                 return
         self.log.warning("Could not restore file %s" % srcname)
 
@@ -969,8 +989,7 @@ class WalMgr(skytools.DBScript):
             if not setname:
                 os.rename(full_dir, data_dir)
             else:
-                self.exec_cmd(["rsync", "--quiet", "-a", "--delete", "--no-relative",
-                    "--exclude=pg_xlog/*", os.path.join(full_dir,""), data_dir])
+                self.exec_rsync(["--delete", "--no-relative", "--exclude=pg_xlog/*", os.path.join(full_dir,""), data_dir], True)
                 if self.wtype == MASTER and createbackup and os.path.isdir(bak):
                     # restore original xlog files to data_dir/pg_xlog   
                     # symlinked directories are dereferences
@@ -1003,10 +1022,9 @@ class WalMgr(skytools.DBScript):
 
         # write recovery.conf
         rconf = os.path.join(data_dir, "recovery.conf")
-        script = os.path.abspath(sys.argv[0])
         cf_file = os.path.abspath(self.cf.filename)
 
-        conf = "\nrestore_command = '%s %s %s'\n" % (script, cf_file, 'xrestore %f "%p"')
+        conf = "\nrestore_command = '%s %s %s'\n" % (self.script, cf_file, 'xrestore %f "%p"')
         conf += "#recovery_target_time=''\n" + \
                 "#recovery_target_xid=''\n" + \
                 "#recovery_target_inclusive=true\n" + \