Enhance that the query that begins with "SELECT nextval()" or "SELECT
authorYoshiyuki Asaba <y-asaba at pgfoundry.org>
Fri, 20 Apr 2007 06:14:20 +0000 (06:14 +0000)
committerYoshiyuki Asaba <y-asaba at pgfoundry.org>
Fri, 20 Apr 2007 06:14:20 +0000 (06:14 +0000)
setval()" are always replicated under replication mode.

Then, SELECT statements are only executed by MASTER node if
load_balance_mode is false. If need to replicate select statement, add
a comment in the begining of the query.

Example:
  SELECT foo() -> /*REPLICATION*/ SELECT foo()

pool_process_query.c

index f25e54e01f602b61c3edc1756614b3d6db091a76..15d7b5a0f96714c1ae46cb409e8de40ad2d4fed8 100644 (file)
@@ -115,6 +115,8 @@ static int synchronize(POOL_CONNECTION *cp);
 static void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
 
+static int is_select_query(char *sql);
+static int is_sequence_query(char *sql);
 static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql);
 static void start_load_balance(POOL_CONNECTION_POOL *backend);
 static void end_load_balance(POOL_CONNECTION_POOL *backend);
@@ -143,6 +145,7 @@ static int replication_was_enabled;         /* replication mode was enabled */
 static int master_slave_was_enabled;   /* master/slave mode was enabled */
 static int internal_transaction_started;               /* to issue table lock command a transaction
                                                                                                   has been started internally */
+static int select_in_transaction = 0; /* non 0 if select query is in transaction */
 static void (*pending_function)(PreparedStatementList *p, PreparedStatement *statement) = NULL;
 static PreparedStatement *pending_prepared_stmt = NULL;
 
@@ -723,6 +726,21 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend,
                MASTER_SLAVE = 0;
                master_slave_dml = 1;
        }
+       else if (REPLICATION && is_select_query(string1) && !is_sequence_query(string1))
+       {
+               int i;
+               /* save backend connection slots */
+               for (i=0;i<backend->num;i++)
+               {
+                       slots[i] = backend->slots[i];
+               }
+
+               /* send query to master only. */
+               replication_was_enabled = 1;
+               REPLICATION = 0;
+               in_load_balance = 1;
+               select_in_transaction = 1;
+       }
 
        /*
         * judge if we need to lock the table
@@ -842,6 +860,22 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
                MASTER_SLAVE = 0;
                master_slave_dml = 1;
        }
+       else if (REPLICATION && is_select_query(stmt->prepared_string) && !is_sequence_query(stmt->prepared_string))
+       {
+               int i;
+
+               /* save backend connection slots */
+               for (i=0;i<backend->num;i++)
+               {
+                       slots[i] = backend->slots[i];
+               }
+
+               /* send query to master only. */
+               replication_was_enabled = 1;
+               REPLICATION = 0;
+               in_load_balance = 1;
+               select_in_transaction = 1;
+       }
 
        for (i = 0;i < backend->num;i++)
        {
@@ -2501,6 +2535,8 @@ POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_C
        {
                pending_function(&prepared_list, pending_prepared_stmt);
        }
+       else if (kind == 'C' && select_in_transaction)
+               select_in_transaction = 0;
 
        pending_function = NULL;
        pending_prepared_stmt = NULL;
@@ -2588,6 +2624,19 @@ POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_C
                        }
                }
 
+               if (select_in_transaction)
+               {
+                       do_command(SECONDARY(backend), "send invalid query from pgpool to abort transaction.",
+                                          PROTO_MAJOR_V3, 0);
+                       pool_write(SECONDARY(backend), "S", 1);
+                       res1 = htonl(4);
+                       if (pool_write_and_flush(SECONDARY(backend), &res1, sizeof(res1)) < 0)
+                       {
+                               return POOL_END;
+                       }
+                       select_in_transaction = 0;
+               }
+
                for (i = 0;i < backend->num;i++)
                {
                        POOL_CONNECTION *cp = backend->slots[i]->con;
@@ -2942,28 +2991,64 @@ static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt)
 }
 
 /*
- * return non 0 if load balance is possible
+ * return non 0 if SQL is SELECT statement.
  */
-static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql)
+static int is_select_query(char *sql)
 {
-       if (pool_config.load_balance_mode &&
-               DUAL_MODE &&
-               MAJOR(backend) == PROTO_MAJOR_V3 &&
-               TSTATE(backend) == 'I')
+       if (pool_config.ignore_leading_white_space)
        {
-               if (pool_config.ignore_leading_white_space)
-               {
-                       /* ignore leading white spaces */
-                       while (*sql && isspace(*sql))
-                               sql++;
-               }
+               /* ignore leading white spaces */
+               while (*sql && isspace(*sql))
+                       sql++;
+       }
 
-               if (!strncasecmp(sql, "SELECT", 6))
-                       return 1;
+       return (!strncasecmp(sql, "SELECT", 6));
+
+}
+
+/*
+ * return non 0 if SQL is SELECT statement.
+ */
+static int is_sequence_query(char *sql)
+{
+       if (pool_config.ignore_leading_white_space)
+       {
+               /* ignore leading white spaces */
+               while (*sql && isspace(*sql))
+                       sql++;
        }
+
+       if (strncasecmp(sql, "SELECT", 6))
+               return 0;
+
+       sql += 6;
+       while (*sql && isspace(*sql))
+               sql++;
+
+       /* SELECT NEXTVAL('xxx') */
+       if (*sql && !strncasecmp(sql, "NEXTVAL", 7))
+               return 1;
+
+       /* SELECT SETVAL('xxx') */
+       if (*sql && !strncasecmp(sql, "SETVAL", 6))
+               return 1;
+
        return 0;
 }
 
+/*
+ * return non 0 if load balance is possible
+ */
+static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql)
+{
+       return (pool_config.load_balance_mode &&
+                       DUAL_MODE &&
+                       MAJOR(backend) == PROTO_MAJOR_V3 &&
+                       TSTATE(backend) == 'I' &&
+                       is_select_query(sql) &&
+                       !is_sequence_query(sql));
+}
+
 /*
  * start load balance mode
  */