From d307d369fae6d817e3cf670c1f824e86a2f8d8e4 Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 29 Aug 2017 15:28:35 +0900 Subject: [PATCH] Fix ancient bug of stream write modules. - Fix bug with pool_write_noerror() when requested length exceeds remaining write buffer size. This could lead to a buffer overrun problem, which has not been reported in the field as far as I know though. - When write buffer is full, pool_flush_it() is called, which could write data to socket in the middle of message. I found this by using following pgproto data. To fix the problem directly write requested data if the write buffer is going to be full. 'P' "" "SELECT * FROM pgbench_accounts LIMIT 100" 0 'B' "" "" 0 0 0 'E' "" 0 'S' 'Y' 'X' - Enhance performance of pool_unread(). When retrieving large number of rows in streaming replication and extended query, pool_unread is very slow because it needs to memmove large number of bytes in the read buffer. This happens in read_kind_from_backend() since it uses pool_unread() to check 'A' packet. To optimize the situation, modify pool_unread(). If there's enough room in front of cp->po, copies the data there to avoid memmove. --- src/utils/pool_stream.c | 148 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 140 insertions(+), 8 deletions(-) diff --git a/src/utils/pool_stream.c b/src/utils/pool_stream.c index f0f31d31f..e5da1327c 100644 --- a/src/utils/pool_stream.c +++ b/src/utils/pool_stream.c @@ -53,6 +53,7 @@ static MemoryContext SwitchToConnectionContext(bool backend_connection); #ifdef DEBUG static void dump_buffer(char *buf, int len); #endif +static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len); static MemoryContext SwitchToConnectionContext(bool backend_connection) @@ -435,11 +436,40 @@ int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len) ereport(DEBUG1, (errmsg("pool_write: to backend: %d kind:%c", cp->db_node_id, c))); } - + + if (!cp->isbackend) + { + char c; + + c = ((char *)buf)[0]; + + if (len == 1) + ereport(DEBUG1, + (errmsg("pool_write: to frontend: kind:%c po:%d", c, cp->wbufpo))); + else + ereport(DEBUG1, + (errmsg("pool_write: to frontend: length:%d po:%d", len, cp->wbufpo))); + } + while (len > 0) { int remainder = WRITEBUFSZ - cp->wbufpo; - + + /* + * If requested data cannot be added to the write buffer, flush the + * buffer and directly write the requested data. This could avoid + * unwanted write in the middle of message boundary. + */ + if (remainder < len) + { + if (pool_flush_it(cp) == -1) + return -1; + + if (pool_write_flush(cp, buf, len) < 0) + return -1; + return 0; + } + if (cp->wbufpo >= WRITEBUFSZ) { /* @@ -447,16 +477,13 @@ int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len) * wbufpo is reset in pool_flush_it(). */ if (pool_flush_it(cp) == -1) - return -1; + return -1; remainder = WRITEBUFSZ; } /* check buffer size */ - if (remainder >= len) - { - /* OK, buffer size is enough. */ - remainder = len; - } + remainder = Min(remainder, len); + memcpy(cp->wbuf+cp->wbufpo, buf, remainder); cp->wbufpo += remainder; buf += remainder; @@ -485,6 +512,93 @@ int pool_write(POOL_CONNECTION *cp, void *buf, int len) } +/* + * Direct write. + * This function does not throws an ereport in case of an error + */ +static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len) +{ + int sts; + int wlen; + int offset; + wlen = len; + + ereport(DEBUG1, + (errmsg("pool_write_flush_it: write size: %d", wlen))); + + if (wlen == 0) + { + return 0; + } + + offset = 0; + + for (;;) + { + errno = 0; + + if (cp->ssl_active > 0) + { + sts = pool_ssl_write(cp, buf, wlen); + } + else + { + sts = write(cp->fd, buf, wlen); + } + + if (sts > 0) + { + wlen -= sts; + + if (wlen == 0) + { + /* write completed */ + break; + } + + else if (wlen < 0) + { + ereport(WARNING, + (errmsg("pool_write_flush_it: invalid write size %d", sts))); + return -1; + } + + else + { + /* need to write remaining data */ + ereport(DEBUG1, + (errmsg("pool_write_flush_it: write retry: %d", wlen))); + + offset += sts; + continue; + } + } + + else if (errno == EAGAIN || errno == EINTR) + { + continue; + } + + else + { + /* If this is the backend stream, report error. Otherwise + * just report debug message. + */ + if (cp->isbackend) + ereport(WARNING, + (errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)), + errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen))); + else + ereport(DEBUG1, + (errmsg("write on frontend failed with error :\"%s\"",strerror(errno)), + errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen))); + return -1; + } + } + + return 0; +} + /* * flush write buffer * This function does not throws an ereport in case of an error @@ -496,6 +610,9 @@ int pool_flush_it(POOL_CONNECTION *cp) int offset; wlen = cp->wbufpo; + ereport(DEBUG1, + (errmsg("pool_flush_it: flush size: %d", wlen))); + if (wlen == 0) { return 0; @@ -537,6 +654,9 @@ int pool_flush_it(POOL_CONNECTION *cp) else { /* need to write remaining data */ + ereport(DEBUG1, + (errmsg("pool_flush_it: write retry: %d", wlen))); + offset += sts; continue; } @@ -1014,6 +1134,18 @@ int pool_unread(POOL_CONNECTION *cp, void *data, int len) int n = cp->len + len; int realloc_size; + /* + * Optimization to avoid mmove. If there's enough space in front of + * existing data, we can use it. + */ + if (cp->po >= len) + { + memmove(cp->hp + cp->po - len, data, len); + cp->po -= len; + cp->len = n; + return 0; + } + if (cp->bufsz < n) { realloc_size = (n/READBUFSZ+1)*READBUFSZ; -- 2.39.5