From ccc5a4a189b716d3fbc4920fa7551991c8d46e16 Mon Sep 17 00:00:00 2001 From: Yoshiyuki Asaba Date: Tue, 10 Oct 2006 04:31:18 +0000 Subject: [PATCH] Load balancing an extended query protocol. --- pool_process_query.c | 411 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 380 insertions(+), 31 deletions(-) diff --git a/pool_process_query.c b/pool_process_query.c index 192ab03..c620b82 100644 --- a/pool_process_query.c +++ b/pool_process_query.c @@ -41,13 +41,19 @@ #define INIT_STATEMENT_LIST_SIZE 8 +typedef struct { + char *statement_name; + char *portal_name; + char *prepared_string; +} PreparedStatement; + /* * prepared statement list */ typedef struct { int size; int cnt; - char **stmt_list; + PreparedStatement **stmt_list; } PreparedStatementList; static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, @@ -116,11 +122,16 @@ static POOL_STATUS do_command(POOL_CONNECTION *backend, char *query, int protoMa static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query); static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query); static char *get_insert_command_table_name(char *query); +static char *get_execute_command_portal_name(char *query); +static PreparedStatement *get_prepared_command_portal_and_statement(char *query); static char *skip_comment(char *query); -static void add_prepared_list(PreparedStatementList *p, char *name); -static void del_prepared_list(PreparedStatementList *p, char *name); +static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt); +static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt); +static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt); static void reset_prepared_list(PreparedStatementList *p); +static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name); +static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name); static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n); static POOL_CONNECTION_POOL_SLOT *slots[MAX_CONNECTION_SLOTS]; @@ -131,10 +142,12 @@ static int replication_was_enabled; /* replication mode was enabled */ static int master_slave_was_enabled; /* master/slave mode was enabled */ static int internal_transaction_started; /* to issue table lock command a transaction has been started internally */ -static void (*pending_function)(PreparedStatementList *p, char *name) = NULL; -static char *pending_prepared_name = NULL; +static void (*pending_function)(PreparedStatementList *p, PreparedStatement *statement) = NULL; +static PreparedStatement *pending_prepared_stmt = NULL; static PreparedStatementList prepared_list; /* prepared statement name list */ +static PreparedStatement *unnamed_statement = NULL; +static PreparedStatement *unnamed_portal = NULL; static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */ POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, @@ -544,7 +557,7 @@ POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, static POOL_STATUS Query(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query) { - char *string; + char *string, *string1; int len; static char *sq = "show pool_status"; POOL_STATUS status; @@ -613,8 +626,21 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend, } if (frontend && - (strncasecmp("prepare", string, 7) == 0 || - strncasecmp("deallocate", string, 10) == 0)) + (strncasecmp("prepare", string, 7) == 0)) + { + PreparedStatement *stmt; + + stmt = get_prepared_command_portal_and_statement(string); + /* could not get info. probably wrong SQL command */ + if (stmt == NULL) + { + return POOL_CONTINUE; + } + pending_function = add_prepared_list; + pending_prepared_stmt = stmt; + } + else if (frontend && + strncasecmp("deallocate", string, 10) == 0) { char *query = string; char *buf, *name; @@ -630,21 +656,53 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend, buf = strdup(query); name = strtok(buf, "\t\r\n (;"); - if (name && (*string == 'p' || *string == 'P')) + + pending_function = del_prepared_list; + pending_prepared_stmt = malloc(sizeof(PreparedStatement)); + if (pending_prepared_stmt == NULL) { - pending_function = add_prepared_list; - pending_prepared_name = strdup(name); + pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); + return POOL_END; } - else if (name && (*string == 'd' || *string == 'D')) + + pending_prepared_stmt->statement_name = strdup(name); + pending_prepared_stmt->portal_name = NULL; + if (pending_prepared_stmt->portal_name == NULL || + pending_prepared_stmt->statement_name == NULL) { - pending_function = del_prepared_list; - pending_prepared_name = strdup(name); + pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno)); + return POOL_END; } free(buf); } + if (frontend && + (strncasecmp("execute", string, 7) == 0)) + { + char *portal_name = get_execute_command_portal_name(string); + PreparedStatement *stmt; + + /* could not get portal name. probably wrong SQL command */ + if (portal_name == NULL) + { + return POOL_CONTINUE; + } + + stmt = lookup_prepared_statement_by_statement(&prepared_list, + portal_name); + + if (!stmt) + string1 = string; + else + string1 = stmt->prepared_string; + } + else + { + string1 = string; + } + /* load balance trick */ - if (load_balance_enabled(backend, string)) + if (load_balance_enabled(backend, string1)) start_load_balance(backend); else if (MASTER_SLAVE) { @@ -749,6 +807,7 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend, int i; char kind; int status; + PreparedStatement *stmt; /* read Execute packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) @@ -759,6 +818,18 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend, pool_debug("Execute: portal name <%s>", string); + stmt = lookup_prepared_statement_by_portal(&prepared_list, string); + + /* load balance trick */ + if (stmt && load_balance_enabled(backend, stmt->prepared_string)) + start_load_balance(backend); + else if (MASTER_SLAVE) + { + master_slave_was_enabled = 1; + MASTER_SLAVE = 0; + master_slave_dml = 1; + } + for (i = 0;i < backend->num;i++) { POOL_CONNECTION *cp = backend->slots[i]->con; @@ -809,6 +880,17 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend, return status; pool_flush(frontend); + /* end load balance mode */ + if (in_load_balance) + end_load_balance(backend); + + if (master_slave_dml) + { + MASTER_SLAVE = 1; + master_slave_was_enabled = 0; + master_slave_dml = 0; + } + return POOL_CONTINUE; } @@ -2369,14 +2451,13 @@ POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_C * pending prepared statement. */ if ((kind == 'C' || kind == '1' || kind == '3') && - pending_function && pending_prepared_name) + pending_function && pending_prepared_stmt) { - pending_function(&prepared_list, pending_prepared_name); + pending_function(&prepared_list, pending_prepared_stmt); } - free(pending_prepared_name); pending_function = NULL; - pending_prepared_name = NULL; + pending_prepared_stmt = NULL; status = pool_read(MASTER(backend), &len, sizeof(len)); if (status < 0) @@ -2574,8 +2655,10 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO if (pool_write_and_flush(SECONDARY(backend), p, len)) return POOL_END; - if (kind == 'P' && *p) + if (kind == 'P') /* Parse message? */ { + char *stmt; + name_len = strlen(p) + 3; name = malloc(name_len); if (name == NULL) @@ -2584,8 +2667,68 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO return POOL_END; } sprintf(name, "\"%s\"", p); - pending_function = add_prepared_list; - pending_prepared_name = name; + + pending_prepared_stmt = malloc(sizeof(PreparedStatement)); + if (pending_prepared_stmt == NULL) + { + pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); + return POOL_END; + } + pending_prepared_stmt->portal_name = NULL; + + if (*p) + { + pending_function = add_prepared_list; + pending_prepared_stmt->statement_name = name; + } + else + { + pending_function = add_unnamed_portal; + pending_prepared_stmt->statement_name = NULL; + free(name); + } + + /* copy prepared statement string */ + stmt = p; + stmt += strlen(p) + 1; + pending_prepared_stmt->prepared_string = strdup(stmt); + if (pending_prepared_stmt->prepared_string == NULL) + { + pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno)); + return POOL_END; + } + } + else if (kind == 'B') /* Bind message? */ + { + char *stmt_name, *portal_name; + PreparedStatement *stmt; + + portal_name = p; + stmt_name = p + strlen(portal_name) + 1; + + pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name); + + if (*stmt_name == '\0') + stmt = unnamed_statement; + else + { + char *name = malloc(strlen(stmt_name) + 3); + sprintf(name, "\"%s\"", stmt_name); + stmt = lookup_prepared_statement_by_statement(&prepared_list, name); + free(name); + } + + if (stmt == NULL) + free(name); + else if (*portal_name == '\0') + unnamed_portal = stmt; + else + { + if (stmt->portal_name) + free(stmt->portal_name); + stmt->portal_name = strdup(portal_name); + } + } else if (kind == 'C' && *p == 'S' && *(p + 1)) { @@ -2598,7 +2741,20 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO } sprintf(name, "\"%s\"", p + 1); pending_function = del_prepared_list; - pending_prepared_name = name; + pending_prepared_stmt = malloc(sizeof(PreparedStatement)); + if (pending_prepared_stmt == NULL) + { + pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); + return POOL_END; + } + + pending_prepared_stmt->statement_name = strdup(name); + if (pending_prepared_stmt->statement_name == NULL) + { + pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); + return POOL_END; + } + pending_prepared_stmt->prepared_string = NULL; } if (kind == 'P' || kind == 'B' || kind == 'D' || kind == 'C') @@ -3144,6 +3300,141 @@ static char *get_insert_command_table_name(char *query) return table; } +/* + * obtain portal name in EXECUTE statement + */ +static char *get_execute_command_portal_name(char *query) +{ + static char portal[1024]; + char *qbuf; + char *token; + + portal[0] = '\0'; + + /* skip comment */ + query = skip_comment(query); + + if (*query == '\0') + return portal; + + /* skip spaces */ + while (*query && isspace(*query)) + query++; + + /* skip non spaces(EXECUTE) */ + while (*query && !isspace(*query)) + query++; + + /* skip spaces */ + while (*query && isspace(*query)) + query++; + + /* get portal name */ + qbuf = strdup(query); + token = strtok(qbuf, "\r\n\t ("); + + if (token == NULL) + { + pool_error("get_execute_command_portal_name: could not get portal name"); + return NULL; + } + + strncpy(portal, token, sizeof(portal)); + free(qbuf); + + pool_debug("get_execute_command_portal_name: extracted portal name: %s", portal); + + return portal; +} + +/* + * obtain portal name and statement in PREPARED statement + */ +static PreparedStatement *get_prepared_command_portal_and_statement(char *query) +{ + PreparedStatement *stmt; + static char portal[1024]; + char *string = NULL; + char *qbuf; + char *token; + int len; + + portal[0] = '\0'; + + /* skip comment */ + query = skip_comment(query); + + if (*query == '\0') + return NULL; + + /* skip spaces */ + while (*query && isspace(*query)) + query++; + + /* skip non spaces(PREPARED) */ + while (*query && !isspace(*query)) + query++; + + /* skip spaces */ + while (*query && isspace(*query)) + query++; + + /* get portal name */ + qbuf = strdup(query); + token = strtok(qbuf, "\r\n\t ("); + + if (token == NULL) + { + pool_error("get_prepared_command_portal_and_statement: could not get portal name"); + return NULL; + } + + strncpy(portal, token, sizeof(portal)); + free(qbuf); + + /* skip data type list */ + while (*query && *query != ')') + query++; + query++; + + /* skip spaces */ + while (*query && isspace(*query)) + query++; + + /* skip non spaces(AS) */ + while (*query && !isspace(*query)) + query++; + + /* skip spaces */ + while (*query && isspace(*query)) + query++; + + if (!*query) + { + pool_error("get_prepared_command_portal_and_statement: could not get statement"); + return NULL; + } + + len = strlen(query) + 1; + string = malloc(len); + if (string == NULL) + { + pool_error("get_prepared_command_portal_and_statement: malloc failed: %s", strerror(errno)); + return NULL; + } + memcpy(string, query, len); + + stmt = malloc(sizeof(PreparedStatement)); + stmt->statement_name = strdup(portal); + stmt->portal_name = NULL; + stmt->prepared_string = string; + + pool_debug("get_prepared_command_portal_and_statement: extracted portal name: %s portal statement: %s", stmt->portal_name, stmt->prepared_string); + + return stmt; +} + + /* judge if this is a DROP DATABASE command */ static int is_drop_database(char *query) { @@ -3207,7 +3498,7 @@ void init_prepared_list(void) } } -static void add_prepared_list(PreparedStatementList *p, char *name) +static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt) { if (p->cnt == p->size) { @@ -3220,27 +3511,45 @@ static void add_prepared_list(PreparedStatementList *p, char *name) } } - p->stmt_list[p->cnt++] = strdup(name); + p->stmt_list[p->cnt++] = stmt; } -static void del_prepared_list(PreparedStatementList *p, char *name) +static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt) +{ + if (unnamed_statement && unnamed_statement->statement_name == NULL) + { + free(unnamed_statement->prepared_string); + free(unnamed_statement); + } + + unnamed_portal = NULL; + unnamed_statement = stmt; +} + +static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt) { int i; for (i = 0; i < p->cnt; i++) { - if (strcmp(p->stmt_list[i], name) == 0) - break; + if (strcmp(p->stmt_list[i]->statement_name, stmt->statement_name) == 0) + break; } + + free(stmt->statement_name); + free(stmt); if (i == p->cnt) return; + free(p->stmt_list[i]->statement_name); + free(p->stmt_list[i]->portal_name); + free(p->stmt_list[i]->prepared_string); free(p->stmt_list[i]); if (i != p->cnt - 1) { memmove(&p->stmt_list[i], &p->stmt_list[i+1], - sizeof(char *) * (p->cnt - i - 1)); + sizeof(PreparedStatement *) * (p->cnt - i - 1)); } p->cnt--; } @@ -3250,10 +3559,50 @@ static void reset_prepared_list(PreparedStatementList *p) int i; for (i = 0; i < p->cnt; i++) + { + free(p->stmt_list[i]->statement_name); + free(p->stmt_list[i]->portal_name); + free(p->stmt_list[i]->prepared_string); free(p->stmt_list[i]); + } p->cnt = 0; } +static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name) +{ + int i; + + /* unnamed portal? */ + if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"')) + return unnamed_statement; + + for (i = 0; i < p->cnt; i++) + { + if (strcmp(p->stmt_list[i]->statement_name, name) == 0) + return p->stmt_list[i]; + } + + return NULL; +} + +static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name) +{ + int i; + + /* unnamed portal? */ + if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"')) + return unnamed_portal; + + for (i = 0; i < p->cnt; i++) + { + if (p->stmt_list[i]->portal_name && + strcmp(p->stmt_list[i]->portal_name, name) == 0) + return p->stmt_list[i]; + } + + return NULL; +} + static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n) { @@ -3263,14 +3612,14 @@ static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList if (p->cnt <= n) return 1; - len = strlen(p->stmt_list[n]) + 12; /* "DEALLOCATE " + '\0' */ + len = strlen(p->stmt_list[n]->statement_name) + 12; /* "DEALLOCATE " + '\0' */ query = malloc(len); if (query == NULL) { pool_error("send_deallocate: malloc failed: %s", strerror(errno)); exit(1); } - sprintf(query, "DEALLOCATE %s", p->stmt_list[n]); + sprintf(query, "DEALLOCATE %s", p->stmt_list[n]->statement_name); if (Query(NULL, backend, query) != POOL_CONTINUE) { free(query); -- 2.39.5