static PreparedStatement *unnamed_portal = NULL;
static int force_replication = 0; /* non 0 if force to replicate query */
static int prepare_in_session = 0;
+static int receive_sync = 0;
static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */
static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend); /* show ps status */
char *name, *stmt;
int deadlock_detected = 0;
int checked = 0;
+ POOL_STATUS status;
/* read Parse packet */
if (pool_read(frontend, &len, sizeof(len)) < 0)
return POOL_END;
}
+ if (REPLICATION && need_insert_lock(backend, stmt))
+ {
+ char kind;
+
+ if (TSTATE(backend) != 'T')
+ {
+ /* synchronize transaction state */
+ for (i = 0; i < backend->num; i++)
+ {
+ POOL_CONNECTION *cp = backend->slots[i]->con;
+
+ send_extended_protocol_message(cp, "S", 0, "");
+ }
+
+ kind = pool_read_kind(backend);
+ if (kind != 'Z')
+ return POOL_END;
+ if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
+ return POOL_END;
+ }
+
+ /* start a transaction if needed and lock the table */
+ status = insert_lock(backend, stmt);
+ if (status != POOL_CONTINUE)
+ {
+ return status;
+ }
+ }
+
/* forward Parse message to backends */
for (i = 0; i < backend->num; i++)
{
{
StartupPacket *sp;
char psbuf[1024];
+ int len;
+ signed char state;
/* if a transaction is started for insert lock, we need to close it. */
- if (internal_transaction_started)
+ if (internal_transaction_started && receive_sync == 0)
{
int i;
int len;
internal_transaction_started = 0;
}
- if (pool_flush(frontend))
- return POOL_END;
+ receive_sync = 0;
+
+ if (MAJOR(backend) == PROTO_MAJOR_V3)
+ {
+ if ((len = pool_read_message_length(backend)) < 0)
+ return POOL_END;
+
+ pool_debug("ReadyForQuery: message length: %d", len);
+
+ state = pool_read_kind(backend);
+ if (state < 0)
+ return POOL_END;
+
+ /* set transaction state */
+ pool_debug("ReadyForQuery: transaction state: %c", state);
+ MASTER(backend)->tstate = state;
+ if (REPLICATION)
+ SECONDARY(backend)->tstate = state;
+ }
if (send_ready)
{
+ if (pool_flush(frontend))
+ return POOL_END;
+
pool_write(frontend, "Z", 1);
if (MAJOR(backend) == PROTO_MAJOR_V3)
{
- int len;
- signed char state;
-
- if ((len = pool_read_message_length(backend)) < 0)
- return POOL_END;
-
- pool_debug("ReadyForQuery: message length: %d", len);
-
len = htonl(len);
pool_write(frontend, &len, sizeof(len));
-
- state = pool_read_kind(backend);
- if (state < 0)
- return POOL_END;
-
- /* set transaction state */
- pool_debug("ReadyForQuery: transaction state: %c", state);
- MASTER(backend)->tstate = state;
- if (REPLICATION)
- SECONDARY(backend)->tstate = state;
-
pool_write(frontend, &state, 1);
}
char *p;
char *name = NULL;
+ if (kind == 'S') /* Sync message */
+ receive_sync = 1;
+
if (pool_write(MASTER(backend), &kind, 1))
return POOL_END;
if (REPLICATION)
while (*query && isspace(*query))
query++;
}
-
+
/*
* either insert_lock directive specified and without "NO INSERT LOCK" comment
* or "INSERT LOCK" comment exists?