walmgr: add option for init-slave to add password from file to .pgpass
authorTarvi Pillessaar <tarvi.pillessaar@skype.net>
Fri, 13 Apr 2012 08:57:22 +0000 (11:57 +0300)
committerTarvi Pillessaar <tarvi.pillessaar@skype.net>
Sat, 5 May 2012 18:41:47 +0000 (21:41 +0300)
walmgr: add command synch-standby

python/walmgr.py

index f990bdef5b5da2f204b6b3b5036453b86deefd5c..aa97607d5131c78bc1370fca9dcbb16b65a93456 100755 (executable)
@@ -10,6 +10,7 @@ Master commands:
   syncdaemon         Daemon mode for regular syncing
   stop               Stop archiving - de-configure PostgreSQL
   periodic           Run periodic command if configured.
+  synch-standby      Manage synchronous streaming replication.
 
 Slave commands:
   boot               Stop playback, accept queries
@@ -183,6 +184,74 @@ class BackupLabel:
             if m:
                 self.label_string = m.group(1)
 
+class Pgpass:
+    """Manipulate pgpass contents"""
+
+    def __init__(self, passfile):
+        """Load .pgpass contents"""
+        self.passfile = os.path.expanduser(passfile)
+        self.contents = []
+
+        if os.path.isfile(self.passfile):
+            self.contents = open(self.passfile).readlines()
+
+    def split_pgpass_line(selg, pgline):
+        """Parses pgpass line, returns dict"""
+        try:
+            (host, port, db, user, pwd) = pgline.rstrip('\n\r').split(":")
+            return {'host': host, 'port': port, 'db': db, 'user': user, 'pwd': pwd}
+        except ValueError:
+            return None
+
+    def ensure_user(self, host, port, user, pwd):
+        """Ensure that line for streaming replication exists in .pgpass"""
+        self.remove_user(host, port, user)
+        self.contents.insert(0, '%s:%s:%s:%s:%s\n' % (host, port, 'replication', user, pwd))
+
+    def remove_user(self, host, port, user):
+        """Remove all matching lines from .pgpass"""
+
+        new_contents = []
+        found = False
+        for l in self.contents:
+            p = self.split_pgpass_line(l)
+            if p and p['host'] == host and p['port'] == port and p['user'] == user and p['db'] == 'replication':
+                    found = True
+                    continue
+
+            new_contents.append(l)
+
+        self.contents = new_contents
+        return found
+
+    def write(self):
+        """Write contents back to file"""
+        f = open(self.passfile,'w')
+        os.chmod(self.passfile, 0600)
+        f.writelines(self.contents)
+        f.close()
+
+    def pgpass_fields_from_conninfo(self,conninfo):
+        """Extract host,user and port from primary-conninfo"""
+        m = re.match("^.*\s*host=\s*([^\s]+)\s*.*$", conninfo)
+        if m:
+            host = m.group(1)
+        else:
+            host = 'localhost'
+        m =  re.match("^.*\s*user=\s*([^\s]+)\s*.*$", conninfo)
+        if m:
+            user = m.group(1)
+        else:
+            user = os.environ['USER']
+        m = re.match("^.*\s*port=\s*([^\s]+)\s*.*$", conninfo)
+        if m:
+            port = m.group(1)
+        else:
+            port = '5432'
+
+        return host,port,user
+
+
 class PostgresConfiguration:
     """Postgres configuration manipulation"""
 
@@ -209,7 +278,7 @@ class PostgresConfiguration:
     def synchronous_standby_names(self):
         """Return value for specified parameter"""
         # see if explicitly set
-        m = re.search("^\s*synchronous_standby_names\s*=\s*'?([a-zA-Z01]+)'?\s*#?.*$", self.cf_buf, re.M | re.I)
+        m = re.search("^\s*synchronous_standby_names\s*=\s*'([^']*)'\s*#?.*$", self.cf_buf, re.M | re.I)
         if m:
             return m.group(1)
         # also, it could be commented out as initdb leaves it
@@ -273,6 +342,20 @@ class PostgresConfiguration:
         # polite method does not work, as usually not enough perms for it
         open(self.cf_file, "w").write(self.cf_buf)
 
+    def set_synchronous_standby_names(self,param_value):
+        """Helper function to change synchronous_standby_names and signal postmaster"""
+
+        self.log.info("Changing synchronous_standby_names from '%s' to '%s'" % (self.synchronous_standby_names(),param_value))
+        cf_params = dict()
+        cf_params['synchronous_standby_names'] = param_value
+        self.modify(cf_params)
+        self.write()
+
+        data_dir=self.walmgr.cf.getfile("master_data")
+        self.log.info("Sending SIGHUP to postmaster")
+        self.walmgr.signal_postmaster(data_dir, signal.SIGHUP)
+
+
 class WalMgr(skytools.DBScript):
 
     def init_optparse(self, parser=None):
@@ -294,6 +377,10 @@ class WalMgr(skytools.DBScript):
                      help = "slave: add public key to authorized_hosts", default=False)
         p.add_option("", "--ssh-remove-key", action="store", dest="ssh_remove_key",
                      help = "slave: remove master key from authorized_hosts", default=False)
+        p.add_option("", "--add-password", action="store", dest="add_password",
+                     help = "slave: add password from file to .pgpass. Additional fields will be extracted from primary-conninfo", default=False)
+        p.add_option("", "--remove-password", action="store_true", dest="remove_password",
+                     help = "slave: remove previously added line from .pgpass", default=False)
         p.add_option("", "--primary-conninfo", action="store", dest="primary_conninfo", default=None,
                      help = "slave: connect string for streaming replication master")
         p.add_option("", "--init-slave", action="store_true", dest="init_slave",
@@ -402,27 +489,28 @@ class WalMgr(skytools.DBScript):
             self.pidfile = None
 
         cmdtab = {
-            'init_master':  self.walmgr_init_master,
-            'init_slave':   self.walmgr_init_slave,
-            'setup':        self.walmgr_setup,
-            'stop':         self.master_stop,
-            'backup':       self.run_backup,
-            'listbackups':  self.list_backups,
-            'restore':      self.restore_database,
-            'periodic':     self.master_periodic,
-            'sync':         self.master_sync,
-            'syncdaemon':   self.master_syncdaemon,
-            'pause':        self.slave_pause,
-            'continue':     self.slave_continue,
-            'boot':         self.slave_boot,
-            'cleanup':      self.walmgr_cleanup,
-            'xlock':        self.slave_lock_backups_exit,
-            'xrelease':     self.slave_resume_backups,
-            'xrotate':      self.slave_rotate_backups,
-            'xpurgewals':   self.slave_purge_wals,
-            'xarchive':     self.master_xarchive,
-            'xrestore':     self.xrestore,
-            'xpartialsync': self.slave_append_partial,
+            'init_master':   self.walmgr_init_master,
+            'init_slave':    self.walmgr_init_slave,
+            'setup':         self.walmgr_setup,
+            'stop':          self.master_stop,
+            'backup':        self.run_backup,
+            'listbackups':   self.list_backups,
+            'restore':       self.restore_database,
+            'periodic':      self.master_periodic,
+            'sync':          self.master_sync,
+            'syncdaemon':    self.master_syncdaemon,
+            'pause':         self.slave_pause,
+            'continue':      self.slave_continue,
+            'boot':          self.slave_boot,
+            'cleanup':       self.walmgr_cleanup,
+            'synch-standby': self.master_synch_standby,
+            'xlock':         self.slave_lock_backups_exit,
+            'xrelease':      self.slave_resume_backups,
+            'xrotate':       self.slave_rotate_backups,
+            'xpurgewals':    self.slave_purge_wals,
+            'xarchive':      self.master_xarchive,
+            'xrestore':      self.xrestore,
+            'xpartialsync':  self.slave_append_partial,
         }
 
         if not cmdtab.has_key(self.cmd):
@@ -670,11 +758,63 @@ class WalMgr(skytools.DBScript):
                 else:
                     self.log.debug("authorized_keys:\n%s" % keys)
 
+            # remove password from .pgpass
+            primary_conninfo = self.cf.get("primary_conninfo", "")
+            if self.options.remove_password and primary_conninfo and not self.not_really:
+                pg = Pgpass('~/.pgpass')
+                host, port, user = pg.pgpass_fields_from_conninfo(primary_conninfo)
+                if pg.remove_user(host, port, user):
+                    self.log.info("Removing line from .pgpass")
+                    pg.write()
+
         # get rid of the configuration file, both master and slave
         self.log.info("Removing config file: %s" % self.cfgfile)
         if not self.not_really:
             os.remove(self.cfgfile)
 
+    def master_synch_standby(self):
+        """Manage synchronous_standby_names parameter"""
+
+        if len(self.args) < 1:
+            die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES")
+
+        names = self.args[0]
+        cf = PostgresConfiguration(self, self.cf.getfile("master_config"))
+
+        self.assert_is_master(True)
+
+        # list of slaves
+        db = self.get_database("master_db")
+        cur = db.cursor()
+        cur.execute("select application_name from pg_stat_replication")
+        slave_names = [slave[0] for slave in cur.fetchall()]
+        self.close_database("master_db")
+
+        if names.strip() == "":
+            cf.set_synchronous_standby_names("")
+            return
+
+        if names.strip() == "*":
+            if slave_names:
+                cf.set_synchronous_standby_names(names)
+                return
+            else:
+                die(1,"At least one slave must be available when enabling synchronous mode")
+
+        # ensure that at least one slave is available from new parameter value
+        slave_found = None
+        for new_synch_slave in re.findall(r"[^\s,]+",names):
+            if new_synch_slave not in slave_names:
+                self.log.warning("No slave available with name %s" % new_synch_slave)
+            else:
+                slave_found = True
+                break
+
+        if not slave_found:
+            die(1,"At least one slave must be available from new list when enabling synchronous mode")
+        else:
+            cf.set_synchronous_standby_names(names)
+
     def master_configure_archiving(self, enable_archiving, can_restart):
         """Turn the archiving on or off"""
 
@@ -1055,6 +1195,21 @@ primary_conninfo     = %(primary_conninfo)s
                     af.write(master_pubkey)
                     af.close()
 
+        if self.options.add_password and self.options.primary_conninfo:
+            # add password to pgpass
+
+            self.log.debug("Reading password from file %s" % self.options.add_password)
+            pwd = open(self.options.add_password).readline().rstrip('\n\r')
+
+            pg = Pgpass('~/.pgpass')
+            host, port, user = pg.pgpass_fields_from_conninfo(self.options.primary_conninfo)
+            pg.ensure_user(host, port, user, pwd)
+            pg.write()
+
+            self.log.info("Added password from %s to .pgpass" % self.options.add_password)
+
+
+
     def walmgr_setup(self):
         if self.is_master:
             self.log.info("Configuring WAL archiving")