Implement flush tracking feature.
authorTatsuo Ishii <ishii@sraoss.co.jp>
Tue, 18 Jan 2022 05:44:49 +0000 (14:44 +0900)
committerTatsuo Ishii <ishii@sraoss.co.jp>
Tue, 18 Jan 2022 05:44:49 +0000 (14:44 +0900)
When a flush message arrives from frontend, any pending message from
backend should be flushed and sent to frontend. In order to do that,
this commit implements "flush tracking" feature. i.e. when a flush
message arrives, pgpool sets "flush pending" flag in each pending
messages. If the response message from backend corresponds to the
pending message with the flush pending flag being set, the message is
immediately flushed to frontend, rather than buffered.

Discussion: https://www.pgpool.net/pipermail/pgpool-general/2022-January/008026.html

src/context/pool_session_context.c
src/include/context/pool_session_context.h
src/protocol/pool_process_query.c
src/protocol/pool_proto_modules.c

index 4b31e4ea84b96f9dc9bc853cf5500512ac1db65b..5d406afb52f7cf561d042eb2566da4ef33327792 100644 (file)
@@ -4,7 +4,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2020     PgPool Global Development Group
+ * Copyright (c) 2003-2022     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -1192,6 +1192,7 @@ pool_pending_message_create(char kind, int len, char *contents)
        msg->is_rows_returned = false;
        msg->not_forward_to_frontend = false;
        memset(msg->node_ids, false, sizeof(msg->node_ids));
+       msg->flush_pending = false;
 
        MemoryContextSwitchTo(old_context);
 
@@ -1710,6 +1711,24 @@ pool_pending_message_get_message_num_by_backend_id(int backend_id)
        return cnt;
 }
 
+/*
+ * Set flush request flag
+ */
+void
+pool_pending_message_set_flush_request(void)
+{
+       ListCell   *msg_item;
+
+       foreach(msg_item, session_context->pending_messages)
+       {
+               POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(msg_item);
+               msg->flush_pending = true;
+               ereport(LOG,
+                               (errmsg("pool_pending_message_set_flush_request: msg: %s",
+                                               pool_pending_message_type_to_string(msg->type))));
+       }
+}
+
 /*
  * Dump whole pending message list
  */
index 8e2abc670bfd1956a4737452e41cc356074035d2..7877448d92ec8293e1e84b9ed95f4c5594b82639 100644 (file)
@@ -6,7 +6,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2020     PgPool Global Development Group
+ * Copyright (c) 2003-2022     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -142,6 +142,11 @@ typedef struct
                                                                                         * used by parse_before_bind() */
        bool            node_ids[MAX_NUM_BACKENDS];     /* backend node map which this message was sent to */
        POOL_QUERY_CONTEXT *query_context;      /* query context */
+       /*
+        * If "flush" message arrives, this flag is set to true until all buffered
+        * message for frontend are sent out.
+        */
+       bool            flush_pending;
 }                      POOL_PENDING_MESSAGE;
 
 typedef enum {
@@ -302,6 +307,11 @@ typedef struct
        /* Whether transaction is read only. Only used by Snapshot Isolation mode. */
        SI_STATE        transaction_read_only;
 
+       /*
+        * If true, the current message from backend must be flushed to frontend.
+        * Set by read_kind_from_backend and reset by SimpleForwardToFrontend.
+        */
+       bool            flush_pending;
 }                      POOL_SESSION_CONTEXT;
 
 extern void pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
@@ -368,6 +378,7 @@ extern void pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type, char ki
 extern POOL_PENDING_MESSAGE * pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc);
 extern int     pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg);
 extern int     pool_pending_message_get_message_num_by_backend_id(int backend_id);
+extern void pool_pending_message_set_flush_request(void);
 extern void dump_pending_message(void);
 extern void pool_set_major_version(int major);
 extern void pool_set_minor_version(int minor);
index cfdbb8ae9f62ffba3380621d72a6dd84df3a547d..b996cd0a6a4b90878fccc0b78844cb36dcb1aa0d 100644 (file)
@@ -705,6 +705,10 @@ SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend,
        char       *p1 = NULL;
        int                     sendlen;
        int                     i;
+       POOL_SESSION_CONTEXT *session_context;
+
+       /* Get session context */
+       session_context = pool_get_session_context(false);
 
        pool_read(MAIN(backend), &len, sizeof(len));
 
@@ -786,11 +790,19 @@ SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend,
        {
                pool_write_and_flush(frontend, p1, len1);
        }
+       else if (session_context->flush_pending)
+       {
+               pool_write_and_flush(frontend, p1, len1);
+               ereport(DEBUG5,
+                               (errmsg("SimpleForwardToFrontend: flush pending request. kind: %c", kind)));
+       }
        else
        {
                pool_write(frontend, p1, len1);
        }
 
+       session_context->flush_pending = false;
+
        ereport(DEBUG5,
                        (errmsg("SimpleForwardToFrontend: packet:%c length:%d",
                                        kind, len1)));
@@ -3281,6 +3293,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
                                pool_pending_message_set_previous_message(msg);
                                pool_pending_message_query_context_dest_set(msg, msg->query_context);
                                session_context->query_context = msg->query_context;
+                               session_context->flush_pending = msg->flush_pending;
 
                                ereport(DEBUG5,
                                                (errmsg("read_kind_from_backend: where_to_send[0]:%d [1]:%d",
@@ -3361,7 +3374,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
                                                 errdetail("backend:%d kind:'%c'", i, kind)));
 
                                /*
-                                * Read and discard parameter status and notice messages
+                                * Read and forward notice messages to frontend
                                 */
                                if (kind == 'N')
                                {
@@ -3370,17 +3383,23 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
                                        pool_process_notice_message_from_one_backend(frontend, backend, i, kind);
                                }
 
+                               /*
+                                * Read and forward ParameterStatus messages to frontend
+                                */
                                else if (kind == 'S')
                                {
+                                       int             len2;
+
                                        pool_read(CONNECTION(backend, i), &len, sizeof(len));
+                                       len2 = len;
                                        len = htonl(len) - 4;
                                        p = pool_read2(CONNECTION(backend, i), len);
                                        if (p)
                                        {
                                                value = p + strlen(p) + 1;
-                                               ereport(DEBUG5,
-                                                               (errmsg("reading backend data packet kind"),
-                                                                errdetail("parameter name: %s value: \"%s\"", p, value)));
+                                               ereport(LOG,
+                                                               (errmsg("ParameterStatus message from backend: %d", i),
+                                                                errdetail("parameter name: \"%s\" value: \"%s\"", p, value)));
 
                                                if (IS_MAIN_NODE_ID(i))
                                                {
@@ -3392,7 +3411,10 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
                                                                set_application_name_with_string(pool_find_name(&CONNECTION(backend, i)->params, p, &pos));
                                                        }
                                                }
-
+                                               /* forward to frontend */
+                                               pool_write(frontend, &kind, 1);
+                                               pool_write(frontend, &len2, sizeof(len2));
+                                               pool_write_and_flush(frontend, p, len);
                                        }
                                        else
                                        {
@@ -3401,7 +3423,6 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen
                                                                 errdetail("read from backend failed")));
                                        }
                                }
-
                        } while (kind == 'S' || kind == 'N');
 
 #ifdef DEALLOCATE_ERROR_TEST
index 8170bf5c0d662e651cce76bea2867dfdcce0f022..718fc88aae024c1ae76ad47bf906ceb304fb405e 100644 (file)
@@ -2807,6 +2807,7 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend,
                                if (fkind == 'H')
                                {
                                        pool_set_doing_extended_query_message();
+                                       pool_pending_message_set_flush_request();
                                }
                                status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
 
@@ -3072,6 +3073,9 @@ ProcessBackendResponse(POOL_CONNECTION * frontend,
                                status = SimpleForwardToFrontend(kind, frontend, backend);
                                break;
                }
+
+               pool_get_session_context(false)->flush_pending = false;
+
        }
        else
        {