Fix bug with sending bind message to wrong target node.
authorTatsuo Ishii <ishii@postgresql.org>
Wed, 7 Jun 2017 04:53:13 +0000 (13:53 +0900)
committerTatsuo Ishii <ishii@postgresql.org>
Wed, 7 Jun 2017 04:53:13 +0000 (13:53 +0900)
When a close portal message received, Pgpool-II forwards it to both
primary and load balance node in streaming replication mode. Later on
a close complete message returned from backend, and
read_kind_from_backend gets called, then it calls
pool_pending_message_query_context_dest_set, which rewrites
where_to_send map in the current query context accordingly. If a bind
message using the same query is received, it will be forwarded to both
primary and load balance node because the where_to_send map was
rewritten so. Unfortunately, the prepared statement might not exist on
primary node, if the query was parsed only on the load balance
node. So we get an error.

Fix is, restoring the where_to_send map after processing the close
complete message. For this sake, where_to_send map back up area is
added to session context. Also new flag need_to_restore_where_to_send
is added. Those new data is handled inside
pool_pending_message_query_context_dest_set. The where_to_send map is
restored in pool_unset_query_in_progress. There were several places
which do not call pool_unset_query_in_progress. They are also fixed to
call it.

Per bug #314.
https://www.pgpool.net/mantisbt/view.php?id=314

To reproduce the problem, following pgproto data can be used. Also you
need to set the weight parameter on primary to 0 and standby to 1 to reliably
reproduce the problem.

'P' "S_1" "SELECT 1" 0
'B' "C_1" "S_1" 0 0    0
'E' "C_1" 0
'S'
'Y'

'C' 'P' "C_1"

'B' "C_2" "S_1" 0 0 0
'E' "C_2" 0
'S'
'Y'

'X'

src/context/pool_session_context.c
src/include/context/pool_session_context.h
src/protocol/pool_proto_modules.c

index cdb05562b80f05cab3e0808294b9d77253e56f96..010faeec47b54a715379d243d412971be241853a 100644 (file)
@@ -232,10 +232,19 @@ void pool_set_query_in_progress(void)
  */
 void pool_unset_query_in_progress(void)
 {
+       POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
+
        ereport(DEBUG1,
                (errmsg("session context: unsetting query in progress. DONE")));
 
-       pool_get_session_context(false)->in_progress = false;
+       s->in_progress = false;
+
+       /* Restore where_to_send map if neccessary */
+       if (s->need_to_restore_where_to_send)
+       {
+               memcpy(s->query_context->where_to_send,s->where_to_send_save, sizeof(s->where_to_send_save));
+       }
+       s->need_to_restore_where_to_send = false;
 }
 
 /*
@@ -1070,6 +1079,13 @@ void pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE* message,
 {
        int i;
 
+       POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
+
+       /* Save where_to_send map */
+       memcpy(s->where_to_send_save, query_context->where_to_send, sizeof(s->where_to_send_save));
+       s->need_to_restore_where_to_send = true;
+
+       /* Rewrite where_to_send map */
        memset(query_context->where_to_send, 0, sizeof(query_context->where_to_send));
 
        for (i=0;i<2;i++)
index f5d52cbbe3be89f6d9b9ee32676b33a926270032..2f47ff0c1c6b4158bb25eef4fd547721856570bb 100644 (file)
@@ -152,6 +152,14 @@ typedef struct {
        /* If true, we are doing extended query message */
        bool doing_extended_query_message;
 
+       /* If true, we have rewritten where_to_send map in the current query
+        * context. pool_unset_query_in_progress() should restore the data from
+        * where_to_send_save.  For now, this is only necessary while doing
+        * extended query protocol and in streaming replication mode.
+        */
+       bool need_to_restore_where_to_send;
+       bool where_to_send_save[MAX_NUM_BACKENDS];
+
        /* If true, the command in progress has finished successfully. */
        bool command_success;
 
index 93f309bf53ee32fdc5ccef21cf954d25ca2b9209..833d0c3ca55a51446dedf5acffaf1f4612b6340e 100644 (file)
@@ -905,6 +905,7 @@ POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
                                pool_set_writing_transaction();
                        }
                }
+               pool_unset_query_in_progress();
        }
 
        return POOL_CONTINUE;
@@ -1552,9 +1553,18 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
        else
        {
                POOL_PENDING_MESSAGE *pmsg;
+               bool where_to_send_save[MAX_NUM_BACKENDS];
 
+               /* Parse_before_bind() may have sent a bind message to the primary
+                * node id. So send the close message to the primary node as well.
+                * Even if not, sending a close message for non existing
+                * statement/portal is harmless. No error will happen.
+                */
                if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
                {
+                       /* save where_to_send map */
+                       memcpy(where_to_send_save, query_context->where_to_send, sizeof(where_to_send_save));
+
                        query_context->where_to_send[PRIMARY_NODE_ID] = true;
                        query_context->where_to_send[session_context->load_balance_node_id] = true;
                }
@@ -1569,6 +1579,12 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
                pool_pending_message_add(pmsg);
                pool_pending_message_free_pending_message(pmsg);
 
+               if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
+               {
+                       /* Restore where_to_send map */
+                       memcpy(query_context->where_to_send, where_to_send_save, sizeof(where_to_send_save));           
+               }
+
 #ifdef NOT_USED
                dump_pending_message();
 #endif
@@ -2482,6 +2498,12 @@ POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
                                        pool_set_doing_extended_query_message();
                                }
                                status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
+
+                               if (pool_is_doing_extended_query_message())
+                               {
+                                       pool_unset_doing_extended_query_message();
+                               }
+
                                break;
                        }