From 51e3562f95b5d61f985142da5b5095d44d35ef29 Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 5 Feb 2019 20:59:38 +0900 Subject: [PATCH] Reduce memory usage when large data set is returned from backend. In commit 8640abfc41ff06b1e6d31315239292f4d3d4191d, pool_wait_till_ready_for_query() was introduced to retrieve all messages into buffer from backend until it found a "ready for query" message when extended query protocol is used in streaming replication mode. It could hit memory allocation limit of palloc(), which is 1GB. This could be easily reproduced by using pgbench and pgproto for example. pgbench -s 100 pgproto data: 'P' "" "SELECT * FROM pgbench_accounts" 0 'B' "" "" 0 0 0 'E' "" 0 'S' 'Y' To reduce the memory usage, introduce "suspend_reading_from_frontend" flag in session context so that Pgpool-II does not read any message after sync message is received. The flag is turned off when a "ready for query" message is received from backend. Between this, Pgpool-II reads messages from backend and forward to frontend as usual. This way we could eliminate the necessity to store messages from backend in buffer, thus it reduces the memory foot print. Per bug 462. --- src/context/pool_session_context.c | 32 +++++++++++++++++++++- src/include/context/pool_session_context.h | 15 +++++++++- src/protocol/pool_proto_modules.c | 18 +++++++++--- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c index d6514c632..2d19fa9ea 100644 --- a/src/context/pool_session_context.c +++ b/src/context/pool_session_context.c @@ -4,7 +4,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2019 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -133,6 +133,9 @@ pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * bac /* Backends have not ignored messages yet */ pool_unset_ignore_till_sync(); + /* Unset suspend reading from frontend flag */ + pool_unset_suspend_reading_from_frontend(); + /* Initialize where to send map for PREPARE statements */ #ifdef NOT_USED memset(&session_context->prep_where, 0, sizeof(session_context->prep_where)); @@ -1709,6 +1712,33 @@ pool_get_minor_version(void) return 0; } +/* + * Is suspend_reading_from_frontend flag set? + */ +bool +pool_is_suspend_reading_from_frontend(void) +{ + return session_context->suspend_reading_from_frontend; +} + +/* + * Set suspend_reading_from_frontend flag. + */ +void +pool_set_suspend_reading_from_frontend(void) +{ + session_context->suspend_reading_from_frontend = true; +} + +/* + * Unset suspend_reading_from_frontend flag. + */ +void +pool_unset_suspend_reading_from_frontend(void) +{ + session_context->suspend_reading_from_frontend = false; +} + #ifdef NOT_USED /* * Set preferred "master" node id. diff --git a/src/include/context/pool_session_context.h b/src/include/context/pool_session_context.h index 7eb48186c..b701551c9 100644 --- a/src/include/context/pool_session_context.h +++ b/src/include/context/pool_session_context.h @@ -6,7 +6,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2019 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -256,6 +256,15 @@ typedef struct int major; /* Protocol minor version number */ int minor; + + /* + * Do not read messages from frontend. Used in extended protocol + + * streaming replication. If sync message is received from frontend, this + * flag prevent from reading any message from frontend until read for + * query message arrives from backend. + */ + bool suspend_reading_from_frontend; + #ifdef NOT_USED /* Preferred "master" node id. Only used for SimpleForwardToFrontend. */ int preferred_master_node_id; @@ -328,6 +337,10 @@ extern void dump_pending_message(void); extern void pool_set_major_version(int major); extern void pool_set_minor_version(int minor); extern int pool_get_minor_version(void); +extern bool pool_is_suspend_reading_from_frontend(void); +extern void pool_set_suspend_reading_from_frontend(void); +extern void pool_unset_suspend_reading_from_frontend(void); + #ifdef NOT_USED extern void pool_set_preferred_master_node_id(int node_id); extern int pool_get_preferred_master_node_id(void); diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index 01336ce5b..b146da90d 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -3,7 +3,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2019 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -94,7 +94,6 @@ static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend, static char *flatten_set_variable_args(const char *name, List *args); static bool process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context); -static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend); static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); @@ -2395,6 +2394,10 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend, if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0) return POOL_CONTINUE; + /* Are we suspending reading from frontend? */ + if (pool_is_suspend_reading_from_frontend()) + return POOL_CONTINUE; + pool_read(frontend, &fkind, 1); ereport(DEBUG5, @@ -2543,8 +2546,11 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend, if (SL_MODE) { - /* Wait till Ready for query received */ - pool_wait_till_ready_for_query(backend); + /* + * From now on suspend to read from frontend until we receive + * ready for query message from backend. + */ + pool_set_suspend_reading_from_frontend(); } break; @@ -2693,6 +2699,7 @@ ProcessBackendResponse(POOL_CONNECTION * frontend, ereport(DEBUG5, (errmsg("processing backend response"), errdetail("Ready For Query received"))); + pool_unset_suspend_reading_from_frontend(); status = ReadyForQuery(frontend, backend, true, true); #ifdef DEBUG extern bool stop_now; @@ -2783,6 +2790,7 @@ ProcessBackendResponse(POOL_CONNECTION * frontend, { pool_set_ignore_till_sync(); pool_unset_query_in_progress(); + pool_unset_suspend_reading_from_frontend(); if (SL_MODE) pool_discard_except_sync_and_ready_for_query(frontend, backend); } @@ -3669,6 +3677,7 @@ flatten_set_variable_args(const char *name, List *args) return buf.data; } +#ifdef NOT_USED /* Called when sync message is received. * Wait till ready for query received. */ @@ -3711,6 +3720,7 @@ pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend) } } } +#endif /* * Called when error response received in streaming replication mode and doing -- 2.39.5