*/
void pool_unset_query_in_progress(void)
{
+ POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
+
ereport(DEBUG1,
(errmsg("session context: unsetting query in progress. DONE")));
- pool_get_session_context(false)->in_progress = false;
+ s->in_progress = false;
+
+ /* Restore where_to_send map if neccessary */
+ if (s->need_to_restore_where_to_send)
+ {
+ memcpy(s->query_context->where_to_send,s->where_to_send_save, sizeof(s->where_to_send_save));
+ }
+ s->need_to_restore_where_to_send = false;
}
/*
{
int i;
+ POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
+
+ /* Save where_to_send map */
+ memcpy(s->where_to_send_save, query_context->where_to_send, sizeof(s->where_to_send_save));
+ s->need_to_restore_where_to_send = true;
+
+ /* Rewrite where_to_send map */
memset(query_context->where_to_send, 0, sizeof(query_context->where_to_send));
for (i=0;i<2;i++)
/* If true, we are doing extended query message */
bool doing_extended_query_message;
+ /* If true, we have rewritten where_to_send map in the current query
+ * context. pool_unset_query_in_progress() should restore the data from
+ * where_to_send_save. For now, this is only necessary while doing
+ * extended query protocol and in streaming replication mode.
+ */
+ bool need_to_restore_where_to_send;
+ bool where_to_send_save[MAX_NUM_BACKENDS];
+
/* If true, the command in progress has finished successfully. */
bool command_success;
pool_set_writing_transaction();
}
}
+ pool_unset_query_in_progress();
}
return POOL_CONTINUE;
else
{
POOL_PENDING_MESSAGE *pmsg;
+ bool where_to_send_save[MAX_NUM_BACKENDS];
+ /* Parse_before_bind() may have sent a bind message to the primary
+ * node id. So send the close message to the primary node as well.
+ * Even if not, sending a close message for non existing
+ * statement/portal is harmless. No error will happen.
+ */
if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
{
+ /* save where_to_send map */
+ memcpy(where_to_send_save, query_context->where_to_send, sizeof(where_to_send_save));
+
query_context->where_to_send[PRIMARY_NODE_ID] = true;
query_context->where_to_send[session_context->load_balance_node_id] = true;
}
pool_pending_message_add(pmsg);
pool_pending_message_free_pending_message(pmsg);
+ if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
+ {
+ /* Restore where_to_send map */
+ memcpy(query_context->where_to_send, where_to_send_save, sizeof(where_to_send_save));
+ }
+
#ifdef NOT_USED
dump_pending_message();
#endif
pool_set_doing_extended_query_message();
}
status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
+
+ if (pool_is_doing_extended_query_message())
+ {
+ pool_unset_doing_extended_query_message();
+ }
+
break;
}