While piggy-packing queries to system catalog data,
pool_push_pending_data() is responsible for pushing response from
backend. To judge whether pending data remains or not, it checks
socket fd of backend and dummy close message response (close
complete). Problem is, if the socket is not ready and accidentally
actual (non dummy) close message response is returned,
pool_push_pending_data() believes that there's no pending data any
more. As a result, some of pending data is not recovered after
returning to normal process.
The idea for fix is, use pending message data list. It records
messages from frontend, and it is expected that we will receive same
number of messages (or more -- e.g. execute message). So we can use
the data to make above judgment more robust for the case described
above.
Initial patch is created by Yugo Nagata. Fix to the patch for certain
cases by me.
See:
https://www.pgpool.net/mantisbt/view.php?id=432
for more details.
return backend_id;
}
+/*
+ * Get number of pending message list entries of which target backend is same as specified one.
+ */
+int
+pool_pending_message_get_message_num_by_backend_id(int backend_id)
+{
+ ListCell *cell;
+ ListCell *next;
+ int cnt = 0;
+
+ if (!session_context)
+ {
+ ereport(ERROR,
+ (errmsg("pool_pending_message_get_message_num_by_backend_id: session context is not initialized")));
+ return 0;
+ }
+
+ for (cell = list_head(session_context->pending_messages); cell; cell = next)
+ {
+ POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
+
+ if (msg->node_ids[0] == backend_id || msg->node_ids[1] == backend_id)
+ cnt++;
+
+ next = lnext(cell);
+ }
+ return cnt;
+}
+
/*
* Dump whole pending message list
*/
extern void pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type, char kind);
extern POOL_PENDING_MESSAGE * pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc);
extern int pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg);
+extern int pool_pending_message_get_message_num_by_backend_id(int backend_id);
extern void dump_pending_message(void);
extern void pool_set_major_version(int major);
extern void pool_set_minor_version(int minor);
bool pending_data_existed = false;
static char random_statement[] = "pgpool_non_existent";
+ int num_pending_messages;
+ int num_pushed_messages;
+
if (!pool_get_session_context(true) || !pool_is_doing_extended_query_message())
return false;
(errmsg("pool_push_pending_data: send flush message to %d", con->db_node_id)));
}
+ num_pending_messages = pool_pending_message_get_message_num_by_backend_id(backend->db_node_id);
+ num_pushed_messages = 0;
+
for (;;)
{
int len;
if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend) && kind != 'E')
{
- if (kind != '3' || pending_data_existed)
+ if (kind != '3' || pending_data_existed || num_pushed_messages < num_pending_messages)
pool_set_timeout(-1);
else
pool_set_timeout(0);
pool_set_ignore_till_sync();
break;
}
+ num_pushed_messages++;
}
return data_pushed;
}