static void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
+static int is_select_query(char *sql);
+static int is_sequence_query(char *sql);
static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql);
static void start_load_balance(POOL_CONNECTION_POOL *backend);
static void end_load_balance(POOL_CONNECTION_POOL *backend);
static int master_slave_was_enabled; /* master/slave mode was enabled */
static int internal_transaction_started; /* to issue table lock command a transaction
has been started internally */
+static int select_in_transaction = 0; /* non 0 if select query is in transaction */
static void (*pending_function)(PreparedStatementList *p, PreparedStatement *statement) = NULL;
static PreparedStatement *pending_prepared_stmt = NULL;
MASTER_SLAVE = 0;
master_slave_dml = 1;
}
+ else if (REPLICATION && is_select_query(string1) && !is_sequence_query(string1))
+ {
+ int i;
+ /* save backend connection slots */
+ for (i=0;i<backend->num;i++)
+ {
+ slots[i] = backend->slots[i];
+ }
+
+ /* send query to master only. */
+ replication_was_enabled = 1;
+ REPLICATION = 0;
+ in_load_balance = 1;
+ select_in_transaction = 1;
+ }
/*
* judge if we need to lock the table
MASTER_SLAVE = 0;
master_slave_dml = 1;
}
+ else if (REPLICATION && is_select_query(stmt->prepared_string) && !is_sequence_query(stmt->prepared_string))
+ {
+ int i;
+
+ /* save backend connection slots */
+ for (i=0;i<backend->num;i++)
+ {
+ slots[i] = backend->slots[i];
+ }
+
+ /* send query to master only. */
+ replication_was_enabled = 1;
+ REPLICATION = 0;
+ in_load_balance = 1;
+ select_in_transaction = 1;
+ }
for (i = 0;i < backend->num;i++)
{
{
pending_function(&prepared_list, pending_prepared_stmt);
}
+ else if (kind == 'C' && select_in_transaction)
+ select_in_transaction = 0;
pending_function = NULL;
pending_prepared_stmt = NULL;
}
}
+ if (select_in_transaction)
+ {
+ do_command(SECONDARY(backend), "send invalid query from pgpool to abort transaction.",
+ PROTO_MAJOR_V3, 0);
+ pool_write(SECONDARY(backend), "S", 1);
+ res1 = htonl(4);
+ if (pool_write_and_flush(SECONDARY(backend), &res1, sizeof(res1)) < 0)
+ {
+ return POOL_END;
+ }
+ select_in_transaction = 0;
+ }
+
for (i = 0;i < backend->num;i++)
{
POOL_CONNECTION *cp = backend->slots[i]->con;
}
/*
- * return non 0 if load balance is possible
+ * return non 0 if SQL is SELECT statement.
*/
-static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql)
+static int is_select_query(char *sql)
{
- if (pool_config.load_balance_mode &&
- DUAL_MODE &&
- MAJOR(backend) == PROTO_MAJOR_V3 &&
- TSTATE(backend) == 'I')
+ if (pool_config.ignore_leading_white_space)
{
- if (pool_config.ignore_leading_white_space)
- {
- /* ignore leading white spaces */
- while (*sql && isspace(*sql))
- sql++;
- }
+ /* ignore leading white spaces */
+ while (*sql && isspace(*sql))
+ sql++;
+ }
- if (!strncasecmp(sql, "SELECT", 6))
- return 1;
+ return (!strncasecmp(sql, "SELECT", 6));
+
+}
+
+/*
+ * return non 0 if SQL is SELECT statement.
+ */
+static int is_sequence_query(char *sql)
+{
+ if (pool_config.ignore_leading_white_space)
+ {
+ /* ignore leading white spaces */
+ while (*sql && isspace(*sql))
+ sql++;
}
+
+ if (strncasecmp(sql, "SELECT", 6))
+ return 0;
+
+ sql += 6;
+ while (*sql && isspace(*sql))
+ sql++;
+
+ /* SELECT NEXTVAL('xxx') */
+ if (*sql && !strncasecmp(sql, "NEXTVAL", 7))
+ return 1;
+
+ /* SELECT SETVAL('xxx') */
+ if (*sql && !strncasecmp(sql, "SETVAL", 6))
+ return 1;
+
return 0;
}
+/*
+ * return non 0 if load balance is possible
+ */
+static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql)
+{
+ return (pool_config.load_balance_mode &&
+ DUAL_MODE &&
+ MAJOR(backend) == PROTO_MAJOR_V3 &&
+ TSTATE(backend) == 'I' &&
+ is_select_query(sql) &&
+ !is_sequence_query(sql));
+}
+
/*
* start load balance mode
*/