Reduce memory usage when large data set is returned from backend.
authorTatsuo Ishii <ishii@sraoss.co.jp>
Tue, 5 Feb 2019 11:59:38 +0000 (20:59 +0900)
committerTatsuo Ishii <ishii@sraoss.co.jp>
Tue, 5 Feb 2019 11:59:38 +0000 (20:59 +0900)
In commit 8640abfc41ff06b1e6d31315239292f4d3d4191d,
pool_wait_till_ready_for_query() was introduced to retrieve all
messages into buffer from backend until it found a "ready for query"
message when extended query protocol is used in streaming replication
mode. It could hit memory allocation limit of palloc(), which is 1GB.

This could be easily reproduced by using pgbench and pgproto for
example.

    pgbench -s 100

    pgproto data:
    'P'     ""      "SELECT * FROM pgbench_accounts"        0
    'B'     ""      ""      0       0       0
    'E'     ""      0
    'S'
    'Y'

To reduce the memory usage, introduce "suspend_reading_from_frontend"
flag in session context so that Pgpool-II does not read any message
after sync message is received. The flag is turned off when a "ready
for query" message is received from backend. Between this, Pgpool-II
reads messages from backend and forward to frontend as usual. This way
we could eliminate the necessity to store messages from backend in
buffer, thus it reduces the memory foot print.

Per bug 462.

src/context/pool_session_context.c
src/include/context/pool_session_context.h
src/protocol/pool_proto_modules.c

index d6514c632e0e5c2177ab6fcdfcc6560932d15ecf..2d19fa9ea84ee5d5c1e94e19a3a1ff1e2eaa897f 100644 (file)
@@ -4,7 +4,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2018     PgPool Global Development Group
+ * Copyright (c) 2003-2019     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -133,6 +133,9 @@ pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * bac
        /* Backends have not ignored messages yet */
        pool_unset_ignore_till_sync();
 
+       /* Unset suspend reading from frontend flag */
+       pool_unset_suspend_reading_from_frontend();
+
        /* Initialize where to send map for PREPARE statements */
 #ifdef NOT_USED
        memset(&session_context->prep_where, 0, sizeof(session_context->prep_where));
@@ -1709,6 +1712,33 @@ pool_get_minor_version(void)
        return 0;
 }
 
+/*
+ * Is suspend_reading_from_frontend flag set?
+ */
+bool
+pool_is_suspend_reading_from_frontend(void)
+{
+       return session_context->suspend_reading_from_frontend;
+}
+
+/*
+ * Set suspend_reading_from_frontend flag.
+ */
+void
+pool_set_suspend_reading_from_frontend(void)
+{
+       session_context->suspend_reading_from_frontend = true;
+}
+
+/*
+ * Unset suspend_reading_from_frontend flag.
+ */
+void
+pool_unset_suspend_reading_from_frontend(void)
+{
+       session_context->suspend_reading_from_frontend = false;
+}
+
 #ifdef NOT_USED
 /*
  * Set preferred "master" node id.
index 7eb48186c2dba201f9bd9305d2a9ff1ae6cc3885..b701551c9b1e456cc6fb1c9966fc913c35c7f816 100644 (file)
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2018     PgPool Global Development Group
+ * Copyright (c) 2003-2019     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -256,6 +256,15 @@ typedef struct
        int                     major;
        /* Protocol minor version number */
        int                     minor;
+
+       /*
+        * Do not read messages from frontend. Used in extended protocol +
+        * streaming replication.  If sync message is received from frontend, this
+        * flag prevent from reading any message from frontend until read for
+        * query message arrives from backend.
+        */
+       bool            suspend_reading_from_frontend;
+
 #ifdef NOT_USED
        /* Preferred "master" node id. Only used for SimpleForwardToFrontend. */
        int                     preferred_master_node_id;
@@ -328,6 +337,10 @@ extern void dump_pending_message(void);
 extern void pool_set_major_version(int major);
 extern void pool_set_minor_version(int minor);
 extern int     pool_get_minor_version(void);
+extern bool pool_is_suspend_reading_from_frontend(void);
+extern void pool_set_suspend_reading_from_frontend(void);
+extern void pool_unset_suspend_reading_from_frontend(void);
+
 #ifdef NOT_USED
 extern void pool_set_preferred_master_node_id(int node_id);
 extern int     pool_get_preferred_master_node_id(void);
index 01336ce5b672696eb54053b0918ad450a2edc270..b146da90d9180e41ea7583fe5b74d61fd892241f 100644 (file)
@@ -3,7 +3,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2018     PgPool Global Development Group
+ * Copyright (c) 2003-2019     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -94,7 +94,6 @@ static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
 static char *flatten_set_variable_args(const char *name, List *args);
 static bool
                        process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context);
-static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend);
 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
                                                                                         POOL_CONNECTION_POOL * backend);
 
@@ -2395,6 +2394,10 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend,
        if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
                return POOL_CONTINUE;
 
+       /* Are we suspending reading from frontend? */
+       if (pool_is_suspend_reading_from_frontend())
+               return POOL_CONTINUE;
+
        pool_read(frontend, &fkind, 1);
 
        ereport(DEBUG5,
@@ -2543,8 +2546,11 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend,
 
                        if (SL_MODE)
                        {
-                               /* Wait till Ready for query received */
-                               pool_wait_till_ready_for_query(backend);
+                               /*
+                                * From now on suspend to read from frontend until we receive
+                                * ready for query message from backend.
+                                */
+                               pool_set_suspend_reading_from_frontend();
                        }
                        break;
 
@@ -2693,6 +2699,7 @@ ProcessBackendResponse(POOL_CONNECTION * frontend,
                                ereport(DEBUG5,
                                                (errmsg("processing backend response"),
                                                 errdetail("Ready For Query received")));
+                               pool_unset_suspend_reading_from_frontend();
                                status = ReadyForQuery(frontend, backend, true, true);
 #ifdef DEBUG
                                extern bool stop_now;
@@ -2783,6 +2790,7 @@ ProcessBackendResponse(POOL_CONNECTION * frontend,
                                {
                                        pool_set_ignore_till_sync();
                                        pool_unset_query_in_progress();
+                                       pool_unset_suspend_reading_from_frontend();
                                        if (SL_MODE)
                                                pool_discard_except_sync_and_ready_for_query(frontend, backend);
                                }
@@ -3669,6 +3677,7 @@ flatten_set_variable_args(const char *name, List *args)
        return buf.data;
 }
 
+#ifdef NOT_USED
 /* Called when sync message is received.
  * Wait till ready for query received.
  */
@@ -3711,6 +3720,7 @@ pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)
                }
        }
 }
+#endif
 
 /*
  * Called when error response received in streaming replication mode and doing