*
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.12.4.1 2005/03/12 01:55:14 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.12.4.2 2007/06/14 01:50:34 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "lib/stringinfo.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "postmaster/postmaster.h"
 #define LBF_MODE       _IOLBF
 #endif
 
+/* 
+ * We read() into a temp buffer twice as big as a chunk, so that any fragment
+ * left after processing can be moved down to the front and we'll still have
+ * room to read a full chunk.
+ */
+#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
+
 
 /*
  * GUC parameters.     Redirect_stderr cannot be changed after postmaster
  * Private state
  */
 static pg_time_t next_rotation_time;
-
 static bool redirection_done = false;
-
 static bool pipe_eof_seen = false;
-
 static FILE *syslogFile = NULL;
-
 static char *last_file_name = NULL;
 
+/* 
+ * Buffers for saving partial messages from different backends. We don't expect
+ * that there will be very many outstanding at one time, so 20 seems plenty of
+ * leeway. If this array gets full we won't lose messages, but we will lose
+ * the protocol protection against them being partially written or interleaved.
+ *
+ * An inactive buffer has pid == 0 and undefined contents of data.
+ */
+typedef struct
+{
+       int32   pid;                            /* PID of source process */
+       StringInfoData data;            /* accumulated data, as a StringInfo */
+} save_buffer;
+
+#define CHUNK_SLOTS 20
+static save_buffer saved_chunks[CHUNK_SLOTS];
+
 /* These must be exported for EXEC_BACKEND case ... annoying */
 #ifndef WIN32
 int                    syslogPipe[2] = {-1, -1};
 static HANDLE threadHandle = 0;
 static CRITICAL_SECTION sysfileSection;
 #endif
+static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
+static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
 
 /*
  * Flags set by interrupt handlers for later service in the main loop.
 NON_EXEC_STATIC void
 SysLoggerMain(int argc, char *argv[])
 {
+#ifndef WIN32
+       char            logbuffer[READ_BUF_SIZE];
+       int                     bytes_in_logbuffer = 0;
+#endif
        char       *currentLogDir;
        char       *currentLogFilename;
        int                     currentLogRotationAge;
                bool            time_based_rotation = false;
 
 #ifndef WIN32
-               char            logbuffer[1024];
                int                     bytesRead;
                int                     rc;
                fd_set          rfds;
                else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
                {
                        bytesRead = piperead(syslogPipe[0],
-                                                                logbuffer, sizeof(logbuffer));
-
+                                                                logbuffer + bytes_in_logbuffer,
+                                                                sizeof(logbuffer) - bytes_in_logbuffer);
                        if (bytesRead < 0)
                        {
                                if (errno != EINTR)
                        }
                        else if (bytesRead > 0)
                        {
-                               write_syslogger_file_binary(logbuffer, bytesRead);
+                               bytes_in_logbuffer += bytesRead;
+                               process_pipe_input(logbuffer, &bytes_in_logbuffer);
                                continue;
                        }
                        else
                                 * done.
                                 */
                                pipe_eof_seen = true;
+
+                               /* if there's any data left then force it out now */
+                               flush_pipe_input(logbuffer, &bytes_in_logbuffer);
                        }
                }
 #else                                                  /* WIN32 */
 #endif   /* EXEC_BACKEND */
 
 
+/* --------------------------------
+ *             pipe protocol handling
+ * --------------------------------
+ */
+
+/*
+ * Process data received through the syslogger pipe.
+ *
+ * This routine interprets the log pipe protocol which sends log messages as
+ * (hopefully atomic) chunks - such chunks are detected and reassembled here. 
+ *
+ * The protocol has a header that starts with two nul bytes, then has a 16 bit
+ * length, the pid of the sending process, and a flag to indicate if it is 
+ * the last chunk in a message. Incomplete chunks are saved until we read some
+ * more, and non-final chunks are accumulated until we get the final chunk.
+ *
+ * All of this is to avoid 2 problems:
+ * . partial messages being written to logfiles (messes rotation), and
+ * . messages from different backends being interleaved (messages garbled).
+ *
+ * Any non-protocol messages are written out directly. These should only come
+ * from non-PostgreSQL sources, however (e.g. third party libraries writing to
+ * stderr).
+ *
+ * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number
+ * of bytes present.  On exit, any not-yet-eaten data is left-justified in
+ * logbuffer, and *bytes_in_logbuffer is updated.
+ */
+static void
+process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+       char   *cursor = logbuffer;
+       int             count = *bytes_in_logbuffer;
+
+       /* While we have enough for a header, process data... */
+       while (count >= (int) sizeof(PipeProtoHeader))
+       {
+               PipeProtoHeader p;
+               int             chunklen;
+
+               /* Do we have a valid header? */
+               memcpy(&p, cursor, sizeof(PipeProtoHeader));
+               if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
+                       p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
+                       p.pid != 0 &&
+                       (p.is_last == 't' || p.is_last == 'f'))
+               {
+                       chunklen = PIPE_HEADER_SIZE + p.len;
+
+                       /* Fall out of loop if we don't have the whole chunk yet */
+                       if (count < chunklen)
+                               break;
+
+                       if (p.is_last == 'f')
+                       {
+                               /* 
+                                * Save a complete non-final chunk in the per-pid buffer 
+                                * if possible - if not just write it out.
+                                */
+                               int free_slot = -1, existing_slot = -1;
+                               int i;
+                               StringInfo str;
+
+                               for (i = 0; i < CHUNK_SLOTS; i++)
+                               {
+                                       if (saved_chunks[i].pid == p.pid)
+                                       {
+                                               existing_slot = i;
+                                               break;
+                                       }
+                                       if (free_slot < 0 && saved_chunks[i].pid == 0)
+                                               free_slot = i;
+                               }
+                               if (existing_slot >= 0)
+                               {
+                                       str = &(saved_chunks[existing_slot].data);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE, 
+                                                                                  p.len);
+                               }
+                               else if (free_slot >= 0)
+                               {
+                                       saved_chunks[free_slot].pid = p.pid;
+                                       str = &(saved_chunks[free_slot].data);
+                                       initStringInfo(str);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE, 
+                                                                                  p.len);
+                               }
+                               else
+                               {
+                                       /* 
+                                        * If there is no free slot we'll just have to take our
+                                        * chances and write out a partial message and hope that
+                                        * it's not followed by something from another pid.
+                                        */
+                                       write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+                               }
+                       }
+                       else
+                       {
+                               /* 
+                                * Final chunk --- add it to anything saved for that pid, and
+                                * either way write the whole thing out.
+                                */
+                               int existing_slot = -1;
+                               int i;
+                               StringInfo str;
+
+                               for (i = 0; i < CHUNK_SLOTS; i++)
+                               {
+                                       if (saved_chunks[i].pid == p.pid)
+                                       {
+                                               existing_slot = i;
+                                               break;
+                                       }
+                               }
+                               if (existing_slot >= 0)
+                               {
+                                       str = &(saved_chunks[existing_slot].data);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE,
+                                                                                  p.len);
+                                       write_syslogger_file(str->data, str->len);
+                                       saved_chunks[existing_slot].pid = 0;
+                                       pfree(str->data);
+                               }
+                               else
+                               {
+                                       /* The whole message was one chunk, evidently. */
+                                       write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+                               }
+                       }
+
+                       /* Finished processing this chunk */
+                       cursor += chunklen;
+                       count -= chunklen;
+               }
+               else 
+               {
+                       /* Process non-protocol data */
+
+                       /*
+                        * Look for the start of a protocol header.  If found, dump data
+                        * up to there and repeat the loop.  Otherwise, dump it all and
+                        * fall out of the loop.  (Note: we want to dump it all if
+                        * at all possible, so as to avoid dividing non-protocol messages
+                        * across logfiles.  We expect that in many scenarios, a
+                        * non-protocol message will arrive all in one read(), and we
+                        * want to respect the read() boundary if possible.)
+                        */
+                       for (chunklen = 1; chunklen < count; chunklen++)
+                       {
+                               if (cursor[chunklen] == '\0')
+                                       break;
+                       }
+                       write_syslogger_file(cursor, chunklen);
+                       cursor += chunklen;
+                       count -= chunklen;
+               }
+       }
+
+       /* We don't have a full chunk, so left-align what remains in the buffer */
+       if (count > 0 && cursor != logbuffer)
+               memmove(logbuffer, cursor, count);
+       *bytes_in_logbuffer = count;
+}
+
+/*
+ * Force out any buffered data
+ *
+ * This is currently used only at syslogger shutdown, but could perhaps be
+ * useful at other times, so it is careful to leave things in a clean state.
+ */
+static void
+flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+       int i;
+       StringInfo str;
+
+       /* Dump any incomplete protocol messages */
+       for (i = 0; i < CHUNK_SLOTS; i++)
+       {
+               if (saved_chunks[i].pid != 0)
+               {
+                       str = &(saved_chunks[i].data);
+                       write_syslogger_file(str->data, str->len);
+                       saved_chunks[i].pid = 0;
+                       pfree(str->data);
+               }
+       }
+       /*
+        * Force out any remaining pipe data as-is; we don't bother trying to
+        * remove any protocol headers that may exist in it.
+        */
+       if (*bytes_in_logbuffer > 0)
+               write_syslogger_file(logbuffer, *bytes_in_logbuffer);
+       *bytes_in_logbuffer = 0;
+}
+
+
 /* --------------------------------
  *             logfile routines
  * --------------------------------
 static unsigned int __stdcall
 pipeThread(void *arg)
 {
-       DWORD           bytesRead;
-       char            logbuffer[1024];
+       char            logbuffer[READ_BUF_SIZE];
+       int                     bytes_in_logbuffer = 0;
 
        for (;;)
        {
-               if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
+               DWORD           bytesRead;
+
+               if (!ReadFile(syslogPipe[0],
+                                         logbuffer + bytes_in_logbuffer,
+                                         sizeof(logbuffer) - bytes_in_logbuffer,
                                          &bytesRead, 0))
                {
                        DWORD           error = GetLastError();
                                         errmsg("could not read from logger pipe: %m")));
                }
                else if (bytesRead > 0)
-                       write_syslogger_file_binary(logbuffer, bytesRead);
+               {
+                       bytes_in_logbuffer += bytesRead;
+                       process_pipe_input(logbuffer, &bytes_in_logbuffer);
+               }
        }
 
        /* We exit the above loop only upon detecting pipe EOF */
        pipe_eof_seen = true;
+
+       /* if there's any data left then force it out now */
+       flush_pipe_input(logbuffer, &bytes_in_logbuffer);
+       
        _endthread();
        return 0;
 }
 
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.155.4.4 2005/11/05 03:05:05 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.155.4.5 2007/06/14 01:50:34 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
 static const char *useful_strerror(int errnum);
 static const char *error_severity(int elevel);
 static void append_with_tabs(StringInfo buf, const char *str);
-
+static void write_pipe_chunks(int fd, char *data, int len);
 
 /*
  * errstart --- begin an error-reporting cycle
                        write_eventlog(edata->elevel, buf.data);
                else
 #endif
-                       fprintf(stderr, "%s", buf.data);
+                       if (Redirect_stderr)
+                               write_pipe_chunks(fileno(stderr), buf.data, buf.len);
+                       else
+                               write(fileno(stderr), buf.data, buf.len);
        }
 
        /* If in the syslogger process, try to write messages direct to file */
        pfree(buf.data);
 }
 
+/*
+ * Send data to the syslogger using the chunked protocol
+ */
+static void
+write_pipe_chunks(int fd, char *data, int len)
+{
+       PipeProtoChunk p;
+
+       Assert(len > 0);
+
+       p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+       p.proto.pid = MyProcPid;
+
+       /* write all but the last chunk */
+       while (len > PIPE_MAX_PAYLOAD)
+       {
+               p.proto.is_last = 'f';
+               p.proto.len = PIPE_MAX_PAYLOAD;
+               memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
+               write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
+               data += PIPE_MAX_PAYLOAD;
+               len -= PIPE_MAX_PAYLOAD;
+       }
+
+       /* write the last chunk */
+       p.proto.is_last = 't';
+       p.proto.len = len;
+       memcpy(p.proto.data, data, len);
+       write(fd, &p, PIPE_HEADER_SIZE + len);
+}
+
 
 /*
  * Write error report to client
 #ifndef WIN32
        /* On Unix, we just fprintf to stderr */
        vfprintf(stderr, fmt, ap);
+       fflush(stderr);
 #else
 
        /*
                write_eventlog(EVENTLOG_ERROR_TYPE, errbuf);
        }
        else
+       {
                /* Not running as service, write to stderr */
                vfprintf(stderr, fmt, ap);
+               fflush(stderr);
+       }
 #endif
        va_end(ap);
 }