Load balancing an extended query protocol.
authorYoshiyuki Asaba <y-asaba at pgfoundry.org>
Tue, 10 Oct 2006 04:31:18 +0000 (04:31 +0000)
committerYoshiyuki Asaba <y-asaba at pgfoundry.org>
Tue, 10 Oct 2006 04:31:18 +0000 (04:31 +0000)
pool_process_query.c

index 192ab03cbbab397f53e9c446ddd0c3551ded4c29..c620b8267397664bec2765b0fc3f29f190d83595 100644 (file)
 
 #define INIT_STATEMENT_LIST_SIZE 8
 
+typedef struct {
+       char *statement_name;
+       char *portal_name;
+       char *prepared_string;
+} PreparedStatement;
+
 /*
  * prepared statement list
  */
 typedef struct {
        int size;
        int cnt;
-       char **stmt_list;
+       PreparedStatement **stmt_list;
 } PreparedStatementList;
 
 static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, 
@@ -116,11 +122,16 @@ static POOL_STATUS do_command(POOL_CONNECTION *backend, char *query, int protoMa
 static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query);
 static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query);
 static char *get_insert_command_table_name(char *query);
+static char *get_execute_command_portal_name(char *query);
+static PreparedStatement *get_prepared_command_portal_and_statement(char *query);
 static char *skip_comment(char *query);
 
-static void add_prepared_list(PreparedStatementList *p, char *name);
-static void del_prepared_list(PreparedStatementList *p, char *name);
+static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt);
+static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt);
+static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt);
 static void reset_prepared_list(PreparedStatementList *p);
+static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name);
+static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name);
 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n);
 
 static POOL_CONNECTION_POOL_SLOT *slots[MAX_CONNECTION_SLOTS];
@@ -131,10 +142,12 @@ static int replication_was_enabled;               /* replication mode was enabled */
 static int master_slave_was_enabled;   /* master/slave mode was enabled */
 static int internal_transaction_started;               /* to issue table lock command a transaction
                                                                                                   has been started internally */
-static void (*pending_function)(PreparedStatementList *p, char *name) = NULL;
-static char *pending_prepared_name = NULL;
+static void (*pending_function)(PreparedStatementList *p, PreparedStatement *statement) = NULL;
+static PreparedStatement *pending_prepared_stmt = NULL;
 
 static PreparedStatementList prepared_list; /* prepared statement name list */
+static PreparedStatement *unnamed_statement = NULL;
+static PreparedStatement *unnamed_portal = NULL;
 static int is_drop_database(char *query);              /* returns non 0 if this is a DROP DATABASE command */
 
 POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, 
@@ -544,7 +557,7 @@ POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
 static POOL_STATUS Query(POOL_CONNECTION *frontend, 
                                                 POOL_CONNECTION_POOL *backend, char *query)
 {
-       char *string;
+       char *string, *string1;
        int len;
        static char *sq = "show pool_status";
        POOL_STATUS status;
@@ -613,8 +626,21 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend,
        }
 
        if (frontend &&
-               (strncasecmp("prepare", string, 7) == 0 ||
-               strncasecmp("deallocate", string, 10) == 0))
+               (strncasecmp("prepare", string, 7) == 0))
+       {
+               PreparedStatement *stmt;
+
+               stmt = get_prepared_command_portal_and_statement(string);
+               /* could not get info. probably wrong SQL command */
+               if (stmt == NULL)
+               {
+                       return POOL_CONTINUE;
+               }
+               pending_function = add_prepared_list;
+               pending_prepared_stmt = stmt;
+       }
+       else if (frontend &&
+                        strncasecmp("deallocate", string, 10) == 0)
        {
                char *query = string;
                char *buf, *name;
@@ -630,21 +656,53 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend,
 
                buf = strdup(query);
                name = strtok(buf, "\t\r\n (;");
-               if (name && (*string == 'p' || *string == 'P'))
+
+               pending_function = del_prepared_list;
+               pending_prepared_stmt = malloc(sizeof(PreparedStatement));
+               if (pending_prepared_stmt == NULL)
                {
-                       pending_function = add_prepared_list;
-                       pending_prepared_name = strdup(name);
+                       pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
+                       return POOL_END;
                }
-               else if (name && (*string == 'd' || *string == 'D'))
+
+               pending_prepared_stmt->statement_name = strdup(name);
+               pending_prepared_stmt->portal_name = NULL;
+               if (pending_prepared_stmt->portal_name == NULL ||
+                       pending_prepared_stmt->statement_name == NULL)
                {
-                       pending_function = del_prepared_list;
-                       pending_prepared_name = strdup(name);
+                       pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno));
+                       return POOL_END;
                }
                free(buf);
        }
 
+       if (frontend &&
+               (strncasecmp("execute", string, 7) == 0))
+       {
+               char *portal_name = get_execute_command_portal_name(string);
+               PreparedStatement *stmt;
+
+               /* could not get portal name. probably wrong SQL command */
+               if (portal_name == NULL)
+               {
+                       return POOL_CONTINUE;
+               }
+
+               stmt = lookup_prepared_statement_by_statement(&prepared_list,
+                                                                                                         portal_name);
+               
+               if (!stmt)
+                       string1 = string;
+               else
+                       string1 = stmt->prepared_string;
+       }
+       else
+       {
+               string1 = string;
+       }
+
        /* load balance trick */
-       if (load_balance_enabled(backend, string))
+       if (load_balance_enabled(backend, string1))
                start_load_balance(backend);
        else if (MASTER_SLAVE)
        {
@@ -749,6 +807,7 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
        int i;
        char kind;
        int status;
+       PreparedStatement *stmt;
 
        /* read Execute packet */
        if (pool_read(frontend, &len, sizeof(len)) < 0)
@@ -759,6 +818,18 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
 
        pool_debug("Execute: portal name <%s>", string);
 
+       stmt = lookup_prepared_statement_by_portal(&prepared_list, string);
+
+       /* load balance trick */
+       if (stmt && load_balance_enabled(backend, stmt->prepared_string))
+               start_load_balance(backend);
+       else if (MASTER_SLAVE)
+       {
+               master_slave_was_enabled = 1;
+               MASTER_SLAVE = 0;
+               master_slave_dml = 1;
+       }
+
        for (i = 0;i < backend->num;i++)
        {
                POOL_CONNECTION *cp = backend->slots[i]->con;
@@ -809,6 +880,17 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
                return status;
        pool_flush(frontend);
 
+       /* end load balance mode */
+       if (in_load_balance)
+               end_load_balance(backend);
+
+       if (master_slave_dml)
+       {
+               MASTER_SLAVE = 1;
+               master_slave_was_enabled = 0;
+               master_slave_dml = 0;
+       }
+
        return POOL_CONTINUE;
 }
 
@@ -2369,14 +2451,13 @@ POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_C
         * pending prepared statement.
         */
        if ((kind == 'C' || kind == '1' || kind == '3') &&
-               pending_function &&     pending_prepared_name)
+               pending_function &&     pending_prepared_stmt)
        {
-               pending_function(&prepared_list, pending_prepared_name);
+               pending_function(&prepared_list, pending_prepared_stmt);
        }
 
-       free(pending_prepared_name);
        pending_function = NULL;
-       pending_prepared_name = NULL;
+       pending_prepared_stmt = NULL;
 
        status = pool_read(MASTER(backend), &len, sizeof(len));
        if (status < 0)
@@ -2574,8 +2655,10 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
                if (pool_write_and_flush(SECONDARY(backend), p, len))
                        return POOL_END;
 
-       if (kind == 'P' && *p)
+       if (kind == 'P') /* Parse message? */
        {
+               char *stmt;
+               
                name_len = strlen(p) + 3;
                name = malloc(name_len);
                if (name == NULL)
@@ -2584,8 +2667,68 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
                        return POOL_END;
                }
                sprintf(name, "\"%s\"", p);
-               pending_function = add_prepared_list;
-               pending_prepared_name = name;
+
+               pending_prepared_stmt = malloc(sizeof(PreparedStatement));
+               if (pending_prepared_stmt == NULL)
+               {
+                       pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
+                       return POOL_END;
+               }
+               pending_prepared_stmt->portal_name = NULL;
+
+               if (*p)
+               {
+                       pending_function = add_prepared_list;
+                       pending_prepared_stmt->statement_name = name;
+               }
+               else
+               {
+                       pending_function = add_unnamed_portal;
+                       pending_prepared_stmt->statement_name = NULL;
+                       free(name);
+               }
+
+               /* copy prepared statement string */
+               stmt = p;
+               stmt += strlen(p) + 1;
+               pending_prepared_stmt->prepared_string = strdup(stmt);
+               if (pending_prepared_stmt->prepared_string == NULL)
+               {
+                       pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno));
+                       return POOL_END;
+               }
+       }
+       else if (kind == 'B') /* Bind message? */
+       {
+               char *stmt_name, *portal_name;
+               PreparedStatement *stmt;
+
+               portal_name = p;
+               stmt_name = p + strlen(portal_name) + 1;
+
+               pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name);
+
+               if (*stmt_name == '\0')
+                       stmt = unnamed_statement;
+               else
+               {
+                       char *name = malloc(strlen(stmt_name) + 3);
+                       sprintf(name, "\"%s\"", stmt_name);
+                       stmt = lookup_prepared_statement_by_statement(&prepared_list, name);
+                       free(name);
+               }
+
+               if (stmt == NULL)
+                       free(name);
+               else if (*portal_name == '\0')
+                       unnamed_portal = stmt;
+               else
+               {
+                       if (stmt->portal_name)
+                               free(stmt->portal_name);
+                       stmt->portal_name = strdup(portal_name);
+               }
+
        }
        else if (kind == 'C' && *p == 'S' && *(p + 1))
        {
@@ -2598,7 +2741,20 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
                }
                sprintf(name, "\"%s\"", p + 1);
                pending_function = del_prepared_list;
-               pending_prepared_name = name;
+               pending_prepared_stmt = malloc(sizeof(PreparedStatement));
+               if (pending_prepared_stmt == NULL)
+               {
+                       pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
+                       return POOL_END;
+               }
+
+               pending_prepared_stmt->statement_name = strdup(name);
+               if (pending_prepared_stmt->statement_name == NULL)
+               {
+                       pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
+                       return POOL_END;
+               }
+               pending_prepared_stmt->prepared_string = NULL;
        }
 
        if (kind == 'P' || kind == 'B' || kind == 'D' || kind == 'C')
@@ -3144,6 +3300,141 @@ static char *get_insert_command_table_name(char *query)
        return table;
 }
 
+/*
+ * obtain portal name in EXECUTE statement
+ */
+static char *get_execute_command_portal_name(char *query)
+{
+       static char portal[1024];
+       char *qbuf;
+       char *token;
+
+       portal[0] = '\0';
+
+       /* skip comment */
+    query = skip_comment(query);
+
+       if (*query == '\0')
+               return portal;
+
+       /* skip spaces */
+       while (*query && isspace(*query))
+               query++;
+
+       /* skip non spaces(EXECUTE) */
+       while (*query && !isspace(*query))
+               query++;
+
+       /* skip spaces */
+       while (*query && isspace(*query))
+               query++;
+
+       /* get portal name */
+       qbuf = strdup(query);
+       token = strtok(qbuf, "\r\n\t (");
+
+       if (token == NULL)
+       {
+               pool_error("get_execute_command_portal_name: could not get portal name");
+               return NULL;
+       }
+
+       strncpy(portal, token, sizeof(portal));
+       free(qbuf);
+
+       pool_debug("get_execute_command_portal_name: extracted portal name: %s", portal);
+
+       return portal;
+}
+
+/*
+ * obtain portal name and statement in PREPARED statement
+ */
+static PreparedStatement *get_prepared_command_portal_and_statement(char *query)
+{
+       PreparedStatement *stmt;
+       static char portal[1024];
+       char *string = NULL;
+       char *qbuf;
+       char *token;
+       int len;
+
+       portal[0] = '\0';
+
+       /* skip comment */
+    query = skip_comment(query);
+
+       if (*query == '\0')
+               return NULL;
+
+       /* skip spaces */
+       while (*query && isspace(*query))
+               query++;
+
+       /* skip non spaces(PREPARED) */
+       while (*query && !isspace(*query))
+               query++;
+
+       /* skip spaces */
+       while (*query && isspace(*query))
+               query++;
+
+       /* get portal name */
+       qbuf = strdup(query);
+       token = strtok(qbuf, "\r\n\t (");
+
+       if (token == NULL)
+       {
+               pool_error("get_prepared_command_portal_and_statement: could not get portal name");
+               return NULL;
+       }
+
+       strncpy(portal, token, sizeof(portal));
+       free(qbuf);
+
+       /* skip data type list */
+       while (*query && *query != ')')
+               query++;
+       query++;
+
+       /* skip spaces */
+       while (*query && isspace(*query))
+               query++;
+
+       /* skip non spaces(AS) */
+       while (*query && !isspace(*query))
+               query++;
+
+       /* skip spaces */
+       while (*query && isspace(*query))
+               query++;
+
+       if (!*query)
+       {
+               pool_error("get_prepared_command_portal_and_statement: could not get statement");
+               return NULL;
+       }
+
+       len = strlen(query) + 1;
+       string = malloc(len);
+       if (string == NULL)
+       {
+               pool_error("get_prepared_command_portal_and_statement: malloc failed: %s", strerror(errno));
+               return NULL;
+       }
+       memcpy(string, query, len);
+
+       stmt = malloc(sizeof(PreparedStatement));
+       stmt->statement_name = strdup(portal);
+       stmt->portal_name = NULL;
+       stmt->prepared_string = string;
+
+       pool_debug("get_prepared_command_portal_and_statement: extracted portal name: %s  portal statement: %s", stmt->portal_name, stmt->prepared_string);
+
+       return stmt;
+}
+
+
 /* judge if this is a DROP DATABASE command */
 static int is_drop_database(char *query)
 {
@@ -3207,7 +3498,7 @@ void init_prepared_list(void)
        }
 }
 
-static void add_prepared_list(PreparedStatementList *p, char *name)
+static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt)
 {
        if (p->cnt == p->size)
        {
@@ -3220,27 +3511,45 @@ static void add_prepared_list(PreparedStatementList *p, char *name)
                }
        }
 
-       p->stmt_list[p->cnt++] = strdup(name);
+       p->stmt_list[p->cnt++] = stmt;
 }
 
-static void del_prepared_list(PreparedStatementList *p, char *name)
+static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt)
+{
+       if (unnamed_statement && unnamed_statement->statement_name == NULL)
+       {
+               free(unnamed_statement->prepared_string);
+               free(unnamed_statement);
+       }
+
+       unnamed_portal = NULL;
+       unnamed_statement = stmt;
+}
+
+static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt)
 {
        int i;
 
        for (i = 0; i < p->cnt; i++)
        {
-               if (strcmp(p->stmt_list[i]name) == 0)
-                       break;
+               if (strcmp(p->stmt_list[i]->statement_name, stmt->statement_name) == 0)
+               break;
        }
+
+       free(stmt->statement_name);
+       free(stmt);
        
        if (i == p->cnt)
                return;
 
+       free(p->stmt_list[i]->statement_name);
+       free(p->stmt_list[i]->portal_name);
+       free(p->stmt_list[i]->prepared_string);
        free(p->stmt_list[i]);
        if (i != p->cnt - 1)
        {
                memmove(&p->stmt_list[i], &p->stmt_list[i+1],
-                               sizeof(char *) * (p->cnt - i - 1));
+                               sizeof(PreparedStatement *) * (p->cnt - i - 1));
        }
        p->cnt--;
 }
@@ -3250,10 +3559,50 @@ static void reset_prepared_list(PreparedStatementList *p)
        int i;
 
        for (i = 0; i < p->cnt; i++)
+       {
+               free(p->stmt_list[i]->statement_name);
+               free(p->stmt_list[i]->portal_name);
+               free(p->stmt_list[i]->prepared_string);
                free(p->stmt_list[i]);
+       }
        p->cnt = 0;
 }
 
+static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name)
+{
+       int i;
+
+       /* unnamed portal? */
+       if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
+               return unnamed_statement;
+
+       for (i = 0; i < p->cnt; i++)
+       {
+               if (strcmp(p->stmt_list[i]->statement_name, name) == 0)
+                       return p->stmt_list[i];
+       }
+
+       return NULL;
+}
+
+static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name)
+{
+       int i;
+
+       /* unnamed portal? */
+       if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
+               return unnamed_portal;
+
+       for (i = 0; i < p->cnt; i++)
+       {
+               if (p->stmt_list[i]->portal_name &&
+                       strcmp(p->stmt_list[i]->portal_name, name) == 0)
+                       return p->stmt_list[i];
+       }
+
+       return NULL;
+}
+
 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p,
                                        int n)
 {
@@ -3263,14 +3612,14 @@ static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList
        if (p->cnt <= n)
                return 1;
        
-       len = strlen(p->stmt_list[n]) + 12; /* "DEALLOCATE " + '\0' */
+       len = strlen(p->stmt_list[n]->statement_name) + 12; /* "DEALLOCATE " + '\0' */
        query = malloc(len);
        if (query == NULL)
        {
                pool_error("send_deallocate: malloc failed: %s", strerror(errno));
                exit(1);
        }
-       sprintf(query, "DEALLOCATE %s", p->stmt_list[n]);
+       sprintf(query, "DEALLOCATE %s", p->stmt_list[n]->statement_name);
        if (Query(NULL, backend, query) != POOL_CONTINUE)
        {
                free(query);