* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2020 PgPool Global Development Group
+ * Copyright (c) 2003-2022 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
msg->is_rows_returned = false;
msg->not_forward_to_frontend = false;
memset(msg->node_ids, false, sizeof(msg->node_ids));
+ msg->flush_pending = false;
MemoryContextSwitchTo(old_context);
return cnt;
}
+/*
+ * Set flush request flag
+ */
+void
+pool_pending_message_set_flush_request(void)
+{
+ ListCell *msg_item;
+
+ foreach(msg_item, session_context->pending_messages)
+ {
+ POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(msg_item);
+ msg->flush_pending = true;
+ ereport(LOG,
+ (errmsg("pool_pending_message_set_flush_request: msg: %s",
+ pool_pending_message_type_to_string(msg->type))));
+ }
+}
+
/*
* Dump whole pending message list
*/
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2020 PgPool Global Development Group
+ * Copyright (c) 2003-2022 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
* used by parse_before_bind() */
bool node_ids[MAX_NUM_BACKENDS]; /* backend node map which this message was sent to */
POOL_QUERY_CONTEXT *query_context; /* query context */
+ /*
+ * If "flush" message arrives, this flag is set to true until all buffered
+ * message for frontend are sent out.
+ */
+ bool flush_pending;
} POOL_PENDING_MESSAGE;
typedef enum {
/* Whether transaction is read only. Only used by Snapshot Isolation mode. */
SI_STATE transaction_read_only;
+ /*
+ * If true, the current message from backend must be flushed to frontend.
+ * Set by read_kind_from_backend and reset by SimpleForwardToFrontend.
+ */
+ bool flush_pending;
} POOL_SESSION_CONTEXT;
extern void pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
extern POOL_PENDING_MESSAGE * pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc);
extern int pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg);
extern int pool_pending_message_get_message_num_by_backend_id(int backend_id);
+extern void pool_pending_message_set_flush_request(void);
extern void dump_pending_message(void);
extern void pool_set_major_version(int major);
extern void pool_set_minor_version(int minor);
char *p1 = NULL;
int sendlen;
int i;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Get session context */
+ session_context = pool_get_session_context(false);
pool_read(MAIN(backend), &len, sizeof(len));
{
pool_write_and_flush(frontend, p1, len1);
}
+ else if (session_context->flush_pending)
+ {
+ pool_write_and_flush(frontend, p1, len1);
+ ereport(DEBUG5,
+ (errmsg("SimpleForwardToFrontend: flush pending request. kind: %c", kind)));
+ }
else
{
pool_write(frontend, p1, len1);
}
+ session_context->flush_pending = false;
+
ereport(DEBUG5,
(errmsg("SimpleForwardToFrontend: packet:%c length:%d",
kind, len1)));
pool_pending_message_set_previous_message(msg);
pool_pending_message_query_context_dest_set(msg, msg->query_context);
session_context->query_context = msg->query_context;
+ session_context->flush_pending = msg->flush_pending;
ereport(DEBUG5,
(errmsg("read_kind_from_backend: where_to_send[0]:%d [1]:%d",
errdetail("backend:%d kind:'%c'", i, kind)));
/*
- * Read and discard parameter status and notice messages
+ * Read and forward notice messages to frontend
*/
if (kind == 'N')
{
pool_process_notice_message_from_one_backend(frontend, backend, i, kind);
}
+ /*
+ * Read and forward ParameterStatus messages to frontend
+ */
else if (kind == 'S')
{
+ int len2;
+
pool_read(CONNECTION(backend, i), &len, sizeof(len));
+ len2 = len;
len = htonl(len) - 4;
p = pool_read2(CONNECTION(backend, i), len);
if (p)
{
value = p + strlen(p) + 1;
- ereport(DEBUG5,
- (errmsg("reading backend data packet kind"),
- errdetail("parameter name: %s value: \"%s\"", p, value)));
+ ereport(LOG,
+ (errmsg("ParameterStatus message from backend: %d", i),
+ errdetail("parameter name: \"%s\" value: \"%s\"", p, value)));
if (IS_MAIN_NODE_ID(i))
{
set_application_name_with_string(pool_find_name(&CONNECTION(backend, i)->params, p, &pos));
}
}
-
+ /* forward to frontend */
+ pool_write(frontend, &kind, 1);
+ pool_write(frontend, &len2, sizeof(len2));
+ pool_write_and_flush(frontend, p, len);
}
else
{
errdetail("read from backend failed")));
}
}
-
} while (kind == 'S' || kind == 'N');
#ifdef DEALLOCATE_ERROR_TEST
if (fkind == 'H')
{
pool_set_doing_extended_query_message();
+ pool_pending_message_set_flush_request();
}
status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
status = SimpleForwardToFrontend(kind, frontend, backend);
break;
}
+
+ pool_get_session_context(false)->flush_pending = false;
+
}
else
{