Allow to use snapshot isolation mode with extended query protocol.
authorTatsuo Ishii <ishii@sraoss.co.jp>
Sat, 27 Jun 2020 08:13:55 +0000 (17:13 +0900)
committerTatsuo Ishii <ishii@sraoss.co.jp>
Sat, 27 Jun 2020 08:13:55 +0000 (17:13 +0900)
Also 030 regression test now tests extended query case by using
pgbench -M extended as well.

src/protocol/pool_proto_modules.c
src/test/regression/tests/030.snapshot_isolation/test.sh

index 2842fb69fe7e80e6cbf53a0951c58733e90f96c8..d2a231225935a4775c2580302c39b6ec93e95805 100644 (file)
@@ -100,6 +100,7 @@ static bool
                        process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context);
 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
                                                                                         POOL_CONNECTION_POOL * backend);
+static void si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node);
 
 /*
  * This is the workhorse of processing the pg_terminate_backend function to
@@ -510,35 +511,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
                 * because it might cause rw-conflict, which in turn causes a
                 * deadlock.
                 */
-               if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
-                       TSTATE(backend, MASTER_NODE_ID) == 'T' &&
-                       si_snapshot_aquire_command(node) &&
-                       !si_snapshot_prepared() &&
-                       frontend && frontend->no_forward == 0)
-               {
-                       int     i;
-
-                       si_aquire_snapshot();
-
-                       for (i = 0; i < NUM_BACKENDS; i++)
-                       {
-                               static  char *si_query = "SELECT current_setting('transaction_read_only')";
-                               POOL_SELECT_RESULT *res;
-
-                               do_query(CONNECTION(backend, i), si_query, &res, MAJOR(backend));
-                               if (res)
-                               {
-                                       if (res->data[0] && !strcmp(res->data[0], "on"))
-                                               session_context->transaction_read_only = true;
-                                       else
-                                               session_context->transaction_read_only = false;
-                                       free_select_result(res);
-                               }
-                               per_node_statement_log(backend, i, si_query);
-                       }
-
-                       si_snapshot_aquired();
-               }
+               si_get_snapshot(frontend, backend, node);
 
                /*
                 * pg_terminate function needs special handling, process it if the
@@ -1026,6 +999,15 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
                }
                else
                {
+                       /*
+                        * If commit command and Snapshot Isolation mode, wait for until
+                        * snapshot prepared.
+                        */
+                       if (commit && pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
+                       {
+                               si_commit_request();
+                       }
+
                        pool_extended_send_and_wait(query_context, "E", len, contents, -1, MASTER_NODE_ID, false);
                }
 
@@ -1036,6 +1018,17 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
                if (commit)
                {
                        pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, false);
+
+                       /*
+                        * If we are in the snapshot isolation mode, we need to declare
+                        * that commit has been done. This would wake up other children
+                        * waiting for acquiring snapshot.
+                        */
+                       if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
+                       {
+                               si_commit_done();
+                               session_context->transaction_read_only = false;
+                       }
                }
        }
        else                                            /* streaming replication mode */
@@ -1254,6 +1247,11 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
                        }
                }
 
+               /*
+                * Get snapshot if needed.
+                */
+               si_get_snapshot(frontend, backend, node);
+
                /*
                 * Decide where to send query
                 */
@@ -4311,3 +4309,55 @@ pool_read_int(POOL_CONNECTION_POOL * cp)
        }
        return data;
 }
+
+/*
+ * Aquire snapshot in snapshot isolation mode.
+ */
+static void
+si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node * node)
+{
+       POOL_SESSION_CONTEXT *session_context;
+
+       session_context = pool_get_session_context(true);
+       if (!session_context)
+               return;
+
+       /*
+        * From now on it is possible that query is actually sent to backend.
+        * So we need to acquire snapshot while there's no committing backend
+        * in snapshot isolation mode except while processing reset queries.
+        * For this purpose, we send a query to know whether the transaction
+        * is READ ONLY or not.  Sending actual user's query is not possible
+        * because it might cause rw-conflict, which in turn causes a
+        * deadlock.
+        */
+       if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
+               TSTATE(backend, MASTER_NODE_ID) == 'T' &&
+               si_snapshot_aquire_command(node) &&
+               !si_snapshot_prepared() &&
+               frontend && frontend->no_forward == 0)
+       {
+               int     i;
+
+               si_aquire_snapshot();
+
+               for (i = 0; i < NUM_BACKENDS; i++)
+               {
+                       static  char *si_query = "SELECT current_setting('transaction_read_only')";
+                       POOL_SELECT_RESULT *res;
+
+                       do_query(CONNECTION(backend, i), si_query, &res, MAJOR(backend));
+                       if (res)
+                       {
+                               if (res->data[0] && !strcmp(res->data[0], "on"))
+                                       session_context->transaction_read_only = true;
+                               else
+                                       session_context->transaction_read_only = false;
+                               free_select_result(res);
+                       }
+                       per_node_statement_log(backend, i, si_query);
+               }
+
+               si_snapshot_aquired();
+       }
+}
index 93f2f96eda2f5ffa4c4443d2e30837d00cfff59a..ea24f40bde2ce9a6508d80df0c857dd7259c2ef9 100755 (executable)
@@ -40,7 +40,7 @@ DROP TABLE log;
 CREATE TABLE log(i int);
 EOF
 
-# Do updating.
+# Do updating in simple query mode.
 $PGBENCH -n -c 1 -T 30 -f ../inconsistency1.sql&
 
 # Do SELECT INTO while updating. This will create different rows among
@@ -55,11 +55,42 @@ psql -p 11003 -c "\copy log to '11003.txt'"
 cmp 11002.txt 11003.txt >/dev/null
 
 if [ $? != 0 ];then
-    echo "Transaction results are not consistent."
+    echo "Transaction results are inconsistent (simple query)."
     ./shutdownall
     exit 1
 fi
-echo "Transaction results are consistent."
+echo "Transaction results are consistent (simple query)."
+
+$PSQL <<EOF
+DROP TABLE t1;
+CREATE TABLE t1(i int);
+INSERT INTO t1 VALUES(0);
+DROP TABLE log;
+CREATE TABLE log(i int);
+EOF
+
+rm -f 11002.txt 11003.txt
+
+# Do updating in extended query mode.
+$PGBENCH -M extended -n -c 1 -T 30 -f ../inconsistency1.sql&
+
+# Do SELECT INTO while updating. This will create different rows among
+# node 0 log table and node 1 log table if we cannot keep global
+# snapshot isolation visibly.
+$PGBENCH -M extended -n -c 1 -T 30 -f ../inconsistency2.sql&
+wait
+
+# Ok let's see if rows in the log tables are identical.
+psql -p 11002 -c "\copy log to '11002.txt'"
+psql -p 11003 -c "\copy log to '11003.txt'"
+cmp 11002.txt 11003.txt >/dev/null
+
+if [ $? != 0 ];then
+    echo "Transaction results inconsistent (extended query)."
+    ./shutdownall
+    exit 1
+fi
+echo "Transaction results are consistent (extended query)."
 
 ./shutdownall