Fix ancient bug of stream write modules.
authorTatsuo Ishii <ishii@postgresql.org>
Tue, 29 Aug 2017 06:28:35 +0000 (15:28 +0900)
committerTatsuo Ishii <ishii@postgresql.org>
Tue, 29 Aug 2017 06:28:35 +0000 (15:28 +0900)
- Fix bug with pool_write_noerror() when requested length exceeds
  remaining write buffer size. This could lead to a buffer overrun
  problem, which has not been reported in the field as far as I know
  though.

- When write buffer is full, pool_flush_it() is called, which could
  write data to socket in the middle of message. I found this by using
  following pgproto data. To fix the problem directly write requested
  data if the write buffer is going to be full.

  'P' "" "SELECT * FROM pgbench_accounts LIMIT 100" 0
  'B' "" "" 0 0 0
  'E' "" 0
  'S'
  'Y'
  'X'

- Enhance performance of pool_unread(). When retrieving large number
  of rows in streaming replication and extended query, pool_unread is
  very slow because it needs to memmove large number of bytes in the
  read buffer. This happens in read_kind_from_backend() since it uses
  pool_unread() to check 'A' packet. To optimize the situation, modify
  pool_unread(). If there's enough room in front of cp->po, copies the
  data there to avoid memmove.

src/utils/pool_stream.c

index f0f31d31f85da907d7092073e01f43cac995d3da..e5da1327cc7d3090f2d87fa6a8adaf07e5d286b9 100644 (file)
@@ -53,6 +53,7 @@ static MemoryContext SwitchToConnectionContext(bool backend_connection);
 #ifdef DEBUG
 static void dump_buffer(char *buf, int len);
 #endif
+static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len);
 
 static MemoryContext
     SwitchToConnectionContext(bool backend_connection)
@@ -435,11 +436,40 @@ int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len)
                ereport(DEBUG1,
                                (errmsg("pool_write: to backend: %d kind:%c", cp->db_node_id, c)));
        }
-    
+
+       if (!cp->isbackend)
+       {
+               char c;
+
+               c = ((char *)buf)[0];
+
+               if (len == 1)
+                       ereport(DEBUG1,
+                                       (errmsg("pool_write: to frontend: kind:%c po:%d", c, cp->wbufpo)));
+               else
+                       ereport(DEBUG1,
+                                       (errmsg("pool_write: to frontend: length:%d po:%d", len, cp->wbufpo)));
+       }
+       
        while (len > 0)
        {
                int remainder = WRITEBUFSZ - cp->wbufpo;
-        
+
+               /*
+                * If requested data cannot be added to the write buffer, flush the
+                * buffer and directly write the requested data.  This could avoid
+                * unwanted write in the middle of message boundary.
+                */
+               if (remainder < len)
+               {
+                       if (pool_flush_it(cp) == -1)
+                               return -1;
+
+                       if (pool_write_flush(cp, buf, len) < 0)
+                               return -1;
+                       return 0;
+               }
+
                if (cp->wbufpo >= WRITEBUFSZ)
                {
                        /*
@@ -447,16 +477,13 @@ int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len)
                         * wbufpo is reset in pool_flush_it().
                         */
                        if (pool_flush_it(cp) == -1)
-                return -1;
+                               return -1;
             remainder = WRITEBUFSZ;
                }
         
                /* check buffer size */
-               if (remainder >= len)
-               {
-                       /* OK, buffer size is enough. */
-                       remainder = len;
-               }
+               remainder = Min(remainder, len);
+
                memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
                cp->wbufpo += remainder;
                buf += remainder;
@@ -485,6 +512,93 @@ int pool_write(POOL_CONNECTION *cp, void *buf, int len)
 }
 
 
+/*
+ * Direct write.
+ * This function does not throws an ereport in case of an error
+ */
+static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len)
+{
+       int sts;
+       int wlen;
+       int offset;
+       wlen = len;
+
+       ereport(DEBUG1,
+                       (errmsg("pool_write_flush_it: write size: %d", wlen)));
+
+       if (wlen == 0)
+       {
+               return 0;
+       }
+
+       offset = 0;
+
+       for (;;)
+       {
+               errno = 0;
+
+               if (cp->ssl_active > 0)
+               {
+                 sts = pool_ssl_write(cp, buf, wlen);
+               }
+               else
+               {
+                 sts = write(cp->fd, buf, wlen);
+               }
+
+               if (sts > 0)
+               {
+                       wlen -= sts;
+
+                       if (wlen == 0)
+                       {
+                               /* write completed */
+                               break;
+                       }
+
+                       else if (wlen < 0)
+                       {
+                               ereport(WARNING,
+                                               (errmsg("pool_write_flush_it: invalid write size %d", sts)));
+                               return -1;
+                       }
+
+                       else
+                       {
+                               /* need to write remaining data */
+                               ereport(DEBUG1,
+                                               (errmsg("pool_write_flush_it: write retry: %d", wlen)));
+
+                               offset += sts;
+                               continue;
+                       }
+               }
+
+               else if (errno == EAGAIN || errno == EINTR)
+               {
+                       continue;
+               }
+
+               else
+               {
+                       /* If this is the backend stream, report error. Otherwise
+                        * just report debug message.
+                        */
+                       if (cp->isbackend)
+                               ereport(WARNING,
+                                       (errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
+                                                errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
+                       else
+                               ereport(DEBUG1,
+                                       (errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
+                                                errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
 /*
  * flush write buffer
  * This function does not throws an ereport in case of an error
@@ -496,6 +610,9 @@ int pool_flush_it(POOL_CONNECTION *cp)
        int offset;
        wlen = cp->wbufpo;
 
+       ereport(DEBUG1,
+                       (errmsg("pool_flush_it: flush size: %d", wlen)));
+
        if (wlen == 0)
        {
                return 0;
@@ -537,6 +654,9 @@ int pool_flush_it(POOL_CONNECTION *cp)
                        else
                        {
                                /* need to write remaining data */
+                               ereport(DEBUG1,
+                                               (errmsg("pool_flush_it: write retry: %d", wlen)));
+
                                offset += sts;
                                continue;
                        }
@@ -1014,6 +1134,18 @@ int pool_unread(POOL_CONNECTION *cp, void *data, int len)
        int n = cp->len + len;
        int realloc_size;
 
+       /*
+        * Optimization to avoid mmove. If there's enough space in front of
+        * existing data, we can use it.
+        */
+       if (cp->po >= len)
+       {
+               memmove(cp->hp + cp->po - len, data, len);
+               cp->po -= len;
+               cp->len = n;
+               return 0;
+       }
+
        if (cp->bufsz < n)
        {
                realloc_size = (n/READBUFSZ+1)*READBUFSZ;