Fix insert_lock with extended query protocol.
authorYoshiyuki Asaba <y-asaba at pgfoundry.org>
Tue, 6 Nov 2007 09:08:53 +0000 (09:08 +0000)
committerYoshiyuki Asaba <y-asaba at pgfoundry.org>
Tue, 6 Nov 2007 09:08:53 +0000 (09:08 +0000)
pool_process_query.c

index f645507dcdf793391924c8482c378deacbd6734f..27354947025fe34ad04743a9ff44f335e52d110a 100644 (file)
@@ -168,6 +168,7 @@ static PreparedStatement *unnamed_statement = NULL;
 static PreparedStatement *unnamed_portal = NULL;
 static int force_replication = 0; /* non 0 if force to replicate query */
 static int prepare_in_session = 0;
+static int receive_sync = 0;
 
 static int is_drop_database(char *query);              /* returns non 0 if this is a DROP DATABASE command */
 static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend);               /* show ps status */
@@ -1008,6 +1009,7 @@ static POOL_STATUS Parse(POOL_CONNECTION *frontend,
        char *name, *stmt;
        int deadlock_detected = 0;
        int checked = 0;
+       POOL_STATUS status;
 
        /* read Parse packet */
        if (pool_read(frontend, &len, sizeof(len)) < 0)
@@ -1055,6 +1057,35 @@ static POOL_STATUS Parse(POOL_CONNECTION *frontend,
                return POOL_END;
        }
 
+       if (REPLICATION && need_insert_lock(backend, stmt))
+       {
+               char kind;
+
+               if (TSTATE(backend) != 'T')
+               {
+                       /* synchronize transaction state */
+                       for (i = 0; i < backend->num; i++)
+                       {
+                               POOL_CONNECTION *cp = backend->slots[i]->con;
+
+                               send_extended_protocol_message(cp, "S", 0, "");
+                       }
+
+                       kind = pool_read_kind(backend);
+                       if (kind != 'Z')
+                               return POOL_END;
+                       if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
+                               return POOL_END;
+               }
+
+               /* start a transaction if needed and lock the table */
+               status = insert_lock(backend, stmt);
+               if (status != POOL_CONTINUE)
+               {
+                       return status;
+               }
+       }
+
        /* forward Parse message to backends */
        for (i = 0; i < backend->num; i++)
        {
@@ -1182,9 +1213,11 @@ static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 {
        StartupPacket *sp;
        char psbuf[1024];
+       int len;
+       signed char state;
 
        /* if a transaction is started for insert lock, we need to close it. */
-       if (internal_transaction_started)
+       if (internal_transaction_started && receive_sync == 0)
        {
                int i;
                int len;
@@ -1215,36 +1248,37 @@ static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
                internal_transaction_started = 0;
        }
 
-       if (pool_flush(frontend))
-               return POOL_END;
+       receive_sync = 0;
+
+       if (MAJOR(backend) == PROTO_MAJOR_V3)
+       {
+               if ((len = pool_read_message_length(backend)) < 0)
+                       return POOL_END;
+
+               pool_debug("ReadyForQuery: message length: %d", len);
+
+               state = pool_read_kind(backend);
+               if (state < 0)
+                       return POOL_END;
+
+               /* set transaction state */
+               pool_debug("ReadyForQuery: transaction state: %c", state);
+               MASTER(backend)->tstate = state;
+               if (REPLICATION)
+                       SECONDARY(backend)->tstate = state;
+       }
 
        if (send_ready)
        {
+               if (pool_flush(frontend))
+                       return POOL_END;
+
                pool_write(frontend, "Z", 1);
 
                if (MAJOR(backend) == PROTO_MAJOR_V3)
                {
-                       int len;
-                       signed char state;
-
-                       if ((len = pool_read_message_length(backend)) < 0)
-                               return POOL_END;
-
-                       pool_debug("ReadyForQuery: message length: %d", len);
-
                        len = htonl(len);
                        pool_write(frontend, &len, sizeof(len));
-
-                       state = pool_read_kind(backend);
-                       if (state < 0)
-                               return POOL_END;
-
-                       /* set transaction state */
-                       pool_debug("ReadyForQuery: transaction state: %c", state);
-                       MASTER(backend)->tstate = state;
-                       if (REPLICATION)
-                               SECONDARY(backend)->tstate = state;
-
                        pool_write(frontend, &state, 1);
                }
 
@@ -2935,6 +2969,9 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
        char *p;
        char *name = NULL;
 
+       if (kind == 'S') /* Sync message */
+               receive_sync = 1;
+
        if (pool_write(MASTER(backend), &kind, 1))
                return POOL_END;
        if (REPLICATION)
@@ -3594,7 +3631,7 @@ static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query)
                while (*query && isspace(*query))
                        query++;
        }
-       
+
        /*
         * either insert_lock directive specified and without "NO INSERT LOCK" comment
         * or "INSERT LOCK" comment exists?