-<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.87 2010/04/03 07:22:55 petere Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.88 2010/06/03 22:17:32 tgl Exp $ -->
 
 <chapter id="protocol">
  <title>Frontend/Backend Protocol</title>
   </sect2>
  </sect1>
 
+<sect1 id="protocol-replication">
+<title>Streaming Replication Protocol</title>
+
+<para>
+To initiate streaming replication, the frontend sends the
+<literal>replication</> parameter in the startup message. This tells the
+backend to go into walsender mode, wherein a small set of replication commands
+can be issued instead of SQL statements. Only the simple query protocol can be
+used in walsender mode.
+
+The commands accepted in walsender mode are:
+
+<variablelist>
+  <varlistentry>
+    <term>IDENTIFY_SYSTEM</term>
+    <listitem>
+     <para>
+      Requests the server to identify itself. Server replies with a result
+      set of a single row, containing two fields:
+     </para>
+
+     <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+       systemid
+      </term>
+      <listitem>
+      <para>
+       The unique system identifier identifying the cluster. This
+       can be used to check that the base backup used to initialize the
+       slave came from the same cluster.
+      </para>
+      </listitem>
+      </varlistentry>
+
+      <varlistentry>
+      <term>
+       timeline
+      </term>
+      <listitem>
+      <para>
+       Current TimelineID. Also useful to check that the slave is
+       consistent with the master.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
+  <varlistentry>
+    <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term>
+    <listitem>
+     <para>
+      Instructs server to start streaming WAL, starting at
+      WAL position <replaceable>XXX</>/<replaceable>XXX</>.
+      The server can reply with an error, e.g. if the requested section of WAL
+      has already been recycled. On success, server responds with a
+      CopyOutResponse message, and then starts to stream WAL to the frontend.
+      WAL will continue to be streamed until the connection is broken;
+      no further commands will be accepted.
+     </para>
+
+     <para>
+      WAL data is sent as a series of CopyData messages.  (This allows
+      other information to be intermixed; in particular the server can send
+      an ErrorResponse message if it encounters a failure after beginning
+      to stream.)  The payload in each CopyData message follows this format:
+     </para>
+
+     <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          XLogData (B)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('w')
+      </term>
+      <listitem>
+      <para>
+          Identifies the message as WAL data.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The starting point of the WAL data in this message, given in
+          XLogRecPtr format.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The current end of WAL on the server, given in
+          XLogRecPtr format.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The server's system clock at the time of transmission,
+          given in TimestampTz format.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte<replaceable>n</replaceable>
+      </term>
+      <listitem>
+      <para>
+          A section of the WAL data stream.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+     </para>
+     <para>
+       A single WAL record is never split across two CopyData messages.
+       When a WAL record crosses a WAL page boundary, and is therefore
+       already split using continuation records, it can be split at the page
+       boundary. In other words, the first main WAL record and its
+       continuation records can be sent in different CopyData messages.
+     </para>
+     <para>
+       Note that all fields within the WAL data and the above-described header
+       will be in the sending server's native format.  Endianness, and the
+       format for the timestamp, are unpredictable unless the receiver has
+       verified that the sender's system identifier matches its own
+       <filename>pg_control</> contents.
+     </para>
+     <para>
+       If the WAL sender process is terminated normally (during postmaster
+       shutdown), it will send a CommandComplete message before exiting.
+       This might not happen during an abnormal shutdown, of course.
+     </para>
+    </listitem>
+  </varlistentry>
+</variablelist>
+
+</para>
+
+</sect1>
+
 <sect1 id="protocol-message-types">
 <title>Message Data Types</title>
 
 
 </sect1>
 
-<sect1 id="protocol-replication">
-<title>Streaming Replication Protocol</title>
-
-<para>
-To initiate streaming replication, the frontend sends the "replication"
-parameter in the startup message. This tells the backend to go into
-walsender mode, where a small set of replication commands can be issued
-instead of SQL statements. Only the simple query protocol can be used in
-walsender mode.
-
-The commands accepted in walsender mode are:
-
-<variablelist>
-  <varlistentry>
-    <term>IDENTIFY_SYSTEM</term>
-    <listitem>
-     <para>
-      Requests the server to identify itself. Server replies with a result
-      set of a single row, and two fields:
-
-      systemid: The unique system identifier identifying the cluster. This
-      can be used to check that the base backup used to initialize the
-      slave came from the same cluster.
-
-      timeline: Current TimelineID. Also used to check that the slave is
-      consistent with the master.
-     </para>
-    </listitem>
-  </varlistentry>
-
-  <varlistentry>
-    <term>START_REPLICATION XXX/XXX</term>
-    <listitem>
-     <para>
-      Instructs backend to start streaming WAL, starting at point XXX/XXX.
-      Server can reply with an error e.g if the requested piece of WAL has
-      already been recycled. On success, server responds with a
-      CopyOutResponse message, and backend starts to stream WAL as CopyData
-      messages.
-      The payload in CopyData message consists of the following format.
-     </para>
-
-     <para>
-      <variablelist>
-      <varlistentry>
-      <term>
-          XLogData (B)
-      </term>
-      <listitem>
-      <para>
-      <variablelist>
-      <varlistentry>
-      <term>
-          Byte1('w')
-      </term>
-      <listitem>
-      <para>
-          Identifies the message as WAL data.
-      </para>
-      </listitem>
-      </varlistentry>
-      <varlistentry>
-      <term>
-          Int32
-      </term>
-      <listitem>
-      <para>
-          The log file number of the LSN, indicating the starting point of
-          the WAL in the message.
-      </para>
-      </listitem>
-      </varlistentry>
-      <varlistentry>
-      <term>
-          Int32
-      </term>
-      <listitem>
-      <para>
-          The byte offset of the LSN, indicating the starting point of
-          the WAL in the message.
-      </para>
-      </listitem>
-      </varlistentry>
-      <varlistentry>
-      <term>
-          Byte<replaceable>n</replaceable>
-      </term>
-      <listitem>
-      <para>
-          Data that forms part of WAL data stream.
-      </para>
-      </listitem>
-      </varlistentry>
-      </variablelist>
-      </para>
-      </listitem>
-      </varlistentry>
-      </variablelist>
-     </para>
-     <para>
-       A single WAL record is never split across two CopyData messages. When
-       a WAL record crosses a WAL page boundary, however, and is therefore
-       already split using continuation records, it can be split at the page
-       boundary. In other words, the first main WAL record and its
-       continuation records can be split across different CopyData messages.
-     </para>
-    </listitem>
-  </varlistentry>
-</variablelist>
-
-</para>
-
-</sect1>
-
 <sect1 id="protocol-changes">
 <title>Summary of Changes since Protocol 2.0</title>
 
 
  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
  * charge of XLOG streaming sender in the primary server. At first, it is
  * started by the postmaster when the walreceiver in the standby server
- * connects to the primary server and requests XLOG streaming replication,
- * i.e., unlike any auxiliary process, it is not an always-running process.
+ * connects to the primary server and requests XLOG streaming replication.
  * It attempts to keep reading XLOG records from the disk and sending them
  * to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is an one to one relationship between a connection
+ * any backend, there is a one-to-one relationship between a connection
  * and a walsender process).
  *
  * Normal termination is by SIGTERM, which instructs the walsender to
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.24 2010/06/03 21:02:12 petere Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.25 2010/06/03 22:17:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "replication/walprotocol.h"
 #include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 
 /*
  * How far have we sent WAL already? This is also advertised in
- * MyWalSnd->sentPtr.
+ * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
 static XLogRecPtr sentPtr = {0, 0};
 
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg, bool *caughtup);
+static bool XLogSend(char *msgbuf, bool *caughtup);
 static void CheckClosedConnection(void);
 
-/*
- * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
- *
- * We don't have a good idea of what a good value would be; there's some
- * overhead per message in both walsender and walreceiver, but on the other
- * hand sending large batches makes walsender less responsive to signals
- * because signals are checked only between messages. 128kB (with
- * default 8k blocks) seems like a reasonable guess for now.
- */
-#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
 
 /* Main entry point for walsender process */
 int
        return WalSndLoop();
 }
 
+/*
+ * Execute commands from walreceiver, until we enter streaming mode.
+ */
 static void
 WalSndHandshake(void)
 {
                /* Wait for a command to arrive */
                firstchar = pq_getbyte();
 
+               /*
+                * Emergency bailout if postmaster has died.  This is to avoid the
+                * necessity for manual cleanup of all postmaster children.
+                */
+               if (!PostmasterIsAlive(true))
+                       exit(1);
+
                /*
                 * Check for any other interesting events that happened while we
                 * slept.
 
                                                /*
                                                 * Reply with a result set with one row, two columns.
-                                                * First col is system ID, and second if timeline ID
+                                                * First col is system ID, and second is timeline ID
                                                 */
 
                                                snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
                                                /* Send CommandComplete and ReadyForQuery messages */
                                                EndCommand("SELECT", DestRemote);
                                                ReadyForQuery(DestRemote);
+                                               /* ReadyForQuery did pq_flush for us */
                                        }
                                        else if (sscanf(query_string, "START_REPLICATION %X/%X",
                                                                        &recptr.xlogid, &recptr.xrecoff) == 2)
 static int
 WalSndLoop(void)
 {
-       StringInfoData output_message;
+       char       *output_message;
        bool            caughtup = false;
 
-       initStringInfo(&output_message);
+       /*
+        * Allocate buffer that will be used for each output message.  We do this
+        * just once to reduce palloc overhead.  The buffer must be made large
+        * enough for maximum-sized messages.
+        */
+       output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
 
-       /* Loop forever */
+       /* Loop forever, unless we get an error */
        for (;;)
        {
                long    remain;         /* remaining time (us) */
                 */
                if (!PostmasterIsAlive(true))
                        exit(1);
+
                /* Process any requests or signals received recently */
                if (got_SIGHUP)
                {
                 */
                if (ready_to_stop)
                {
-                       if (!XLogSend(&output_message, &caughtup))
-                               goto eof;
+                       if (!XLogSend(output_message, &caughtup))
+                               break;
                        if (caughtup)
                                shutdown_requested = true;
                }
                                remain -= NAPTIME_PER_CYCLE;
                        }
                }
+
                /* Attempt to send the log once every loop */
-               if (!XLogSend(&output_message, &caughtup))
-                       goto eof;
+               if (!XLogSend(output_message, &caughtup))
+                       break;
        }
 
-       /* can't get here because the above loop never exits */
-       return 1;
-
-eof:
-
        /*
+        * Get here on send failure.  Clean up and exit.
+        *
         * Reset whereToSendOutput to prevent ereport from attempting to send any
         * more messages to the standby.
         */
 
 /*
  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
  */
 static void
 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
 /*
  * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
- * but not yet sent to the client, and send it. If there is no unsent WAL,
- * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
- * to false.
+ * but not yet sent to the client, and send it.
+ *
+ * msgbuf is a work area in which the output message is constructed.  It's
+ * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ *
+ * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
+ * *caughtup is set to false.
  *
  * Returns true if OK, false if trouble.
  */
 static bool
-XLogSend(StringInfo outMsg, bool *caughtup)
+XLogSend(char *msgbuf, bool *caughtup)
 {
        XLogRecPtr      SendRqstPtr;
        XLogRecPtr      startptr;
        XLogRecPtr      endptr;
        Size            nbytes;
-       char            activitymsg[50];
-
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalSnd *walsnd = MyWalSnd;
+       WalDataMessageHeader msghdr;
 
        /* Attempt to send all records flushed to the disk already */
        SendRqstPtr = GetWriteRecPtr();
 
        /* Quick exit if nothing to do */
-       if (!XLByteLT(sentPtr, SendRqstPtr))
+       if (XLByteLE(SendRqstPtr, sentPtr))
        {
                *caughtup = true;
                return true;
        }
-       /*
-        * Otherwise let the caller know that we're not fully caught up. Unless
-        * there's a huge backlog, we'll be caught up to the current WriteRecPtr
-        * after we've sent everything below, but more WAL could accumulate while
-        * we're busy sending.
-        */
-       *caughtup = false;
 
        /*
-        * Figure out how much to send in one message. If there's less than
+        * Figure out how much to send in one message. If there's no more than
         * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
-        * MAX_SEND_SIZE bytes, but round to page boundary.
+        * MAX_SEND_SIZE bytes, but round to logfile or page boundary.
         *
         * The rounding is not only for performance reasons. Walreceiver
         * relies on the fact that we never split a WAL record across two
         * messages. Since a long WAL record is split at page boundary into
         * continuation records, page boundary is always a safe cut-off point.
-        * We also assume that SendRqstPtr never points in the middle of a WAL
+        * We also assume that SendRqstPtr never points to the middle of a WAL
         * record.
         */
        startptr = sentPtr;
 
        endptr = startptr;
        XLByteAdvance(endptr, MAX_SEND_SIZE);
-       /* round down to page boundary. */
-       endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
-       /* if we went beyond SendRqstPtr, back off */
-       if (XLByteLT(SendRqstPtr, endptr))
-               endptr = SendRqstPtr;
-
-       /*
-        * OK to read and send the slice.
-        *
-        * We don't need to convert the xlogid/xrecoff from host byte order to
-        * network byte order because the both server can be expected to have
-        * the same byte order. If they have different byte order, we don't
-        * reach here.
-        */
-       pq_sendbyte(outMsg, 'w');
-       pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
-
        if (endptr.xlogid != startptr.xlogid)
        {
+               /* Don't cross a logfile boundary within one message */
                Assert(endptr.xlogid == startptr.xlogid + 1);
-               nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
+               endptr.xlogid = startptr.xlogid;
+               endptr.xrecoff = XLogFileSize;
+       }
+
+       /* if we went beyond SendRqstPtr, back off */
+       if (XLByteLE(SendRqstPtr, endptr))
+       {
+               endptr = SendRqstPtr;
+               *caughtup = true;
        }
        else
-               nbytes = endptr.xrecoff - startptr.xrecoff;
+       {
+               /* round down to page boundary. */
+               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+               *caughtup = false;
+       }
 
-       sentPtr = endptr;
+       nbytes = endptr.xrecoff - startptr.xrecoff;
+       Assert(nbytes <= MAX_SEND_SIZE);
 
        /*
-        * Read the log directly into the output buffer to prevent extra
-        * memcpy calls.
+        * OK to read and send the slice.
         */
-       enlargeStringInfo(outMsg, nbytes);
+       msgbuf[0] = 'w';
 
-       XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
-       outMsg->len += nbytes;
-       outMsg->data[outMsg->len] = '\0';
+       /*
+        * Read the log directly into the output buffer to avoid extra memcpy
+        * calls.
+        */
+       XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
 
-       pq_putmessage('d', outMsg->data, outMsg->len);
-       resetStringInfo(outMsg);
+       /*
+        * We fill the message header last so that the send timestamp is taken
+        * as late as possible.
+        */
+       msghdr.dataStart = startptr;
+       msghdr.walEnd = SendRqstPtr;
+       msghdr.sendTime = GetCurrentTimestamp();
 
-       /* Update shared memory status */
-       SpinLockAcquire(&walsnd->mutex);
-       walsnd->sentPtr = sentPtr;
-       SpinLockRelease(&walsnd->mutex);
+       memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+
+       pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 
        /* Flush pending output */
        if (pq_flush())
                return false;
 
+       sentPtr = endptr;
+
+       /* Update shared memory status */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->sentPtr = sentPtr;
+               SpinLockRelease(&walsnd->mutex);
+       }
+
        /* Report progress of XLOG streaming in PS display */
-       snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                        sentPtr.xlogid, sentPtr.xrecoff);
-       set_ps_display(activitymsg, false);
+       if (update_process_title)
+       {
+               char            activitymsg[50];
+
+               snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+                                sentPtr.xlogid, sentPtr.xrecoff);
+               set_ps_display(activitymsg, false);
+       }
 
        return true;
 }