Reduce the number of roundtrips in the implementation of statement rollack.
authorHiroshi Inoue <h-inoue@dream.email.ne.jp>
Fri, 9 Jun 2017 02:05:43 +0000 (11:05 +0900)
committerHiroshi Inoue <h-inoue@dream.email.ne.jp>
Mon, 12 Jun 2017 02:54:56 +0000 (11:54 +0900)
Change the current sequence of operations
SAVEPOINT -> execute -> RELEASE
 to
(RELEASE;)SAVEPOINT -> execute
.

Note that there are 2 risks to RELEASE an internal savepoint. One is to RELEASE the savepoint invalitated due to manually issued ROLLBACK or RELEASE. Another is to invalitate manual SAVEPOINTs unexpectedly by RELEASing the internal savepoint.
Currently *RELEASE* related stuff is #ifdef'd.

connection.c
connection.h
execute.c
statement.c
statement.h

index f4c2380659ecaac88c1e7f12c3d328aa0acee7e4..c72f94dce07b49a855d278ae65f9da2cc5210c3f 100644 (file)
@@ -448,12 +448,12 @@ CC_examine_global_transaction(ConnectionClass *self)
 }
 
 
-static const char *bgncmd = "BEGIN";
-static const char *cmtcmd = "COMMIT";
-static const char *rbkcmd = "ROLLBACK";
-static const char *svpcmd = "SAVEPOINT";
-static const char *per_query_svp = "_per_query_svp_";
-static const char *rlscmd = "RELEASE";
+CSTR   bgncmd = "BEGIN";
+CSTR   cmtcmd = "COMMIT";
+CSTR   rbkcmd = "ROLLBACK";
+CSTR   svpcmd = "SAVEPOINT";
+CSTR   per_query_svp = "_per_query_svp_";
+CSTR   rlscmd = "RELEASE";
 
 /*
  * Used to begin a transaction.
@@ -1476,6 +1476,8 @@ void  CC_on_commit(ConnectionClass *conn)
        CC_set_no_trans(conn);
        CC_set_no_manual_trans(conn);
    }
+   conn->internal_svp = conn->is_in_internal_op = 0;
+   CC_start_stmt(conn);
    CC_clear_cursors(conn, FALSE);
    CONNLOCK_RELEASE(conn);
    CC_discard_marked_objects(conn);
@@ -1506,6 +1508,8 @@ mylog("CC_on_abort in opt=%x\n", opt);
            set_no_trans = TRUE;
        }
    }
+   conn->internal_svp = conn->is_in_internal_op = 0;
+   CC_start_stmt(conn);
    CC_clear_cursors(conn, TRUE);
    if (0 != (opt & CONN_DEAD))
    {
@@ -1537,8 +1541,13 @@ mylog("CC_on_abort in opt=%x\n", opt);
 void   CC_on_abort_partial(ConnectionClass *conn)
 {
 mylog("CC_on_abort_partial in\n");
-   ProcessRollback(conn, TRUE, TRUE);
    CONNLOCK_ACQUIRE(conn);
+   if (!conn->is_in_internal_op)   /* manually rolling back to */
+   {
+       conn->internal_svp = 0; /* possibly an internal savepoint is invalid */
+       mylog(" %s:reset an internal savepoint\n", __FUNCTION__);
+   }
+   ProcessRollback(conn, TRUE, TRUE);
    CC_discard_marked_objects(conn);
    CONNLOCK_RELEASE(conn);
 }
@@ -1695,7 +1704,7 @@ CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UD
        if (stmt)
        {
            StatementClass  *astmt = SC_get_ancestor(stmt);
-           if (!SC_accessed_db(astmt))
+           if (!CC_started_rbpoint(self))
            {
                if (SQL_ERROR == SetStatementSvp(astmt))
                {
@@ -1713,7 +1722,9 @@ CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UD
        + strlen(svpcmd) + 1 + strlen(per_query_svp) + 1
        + query_len
        + (appendq ? (1 + strlen(appendq)) : 0)
+#ifdef _RELEASE_INTERNAL_SAVEPOINT
        + 1 + strlen(rlscmd) + strlen(per_query_svp)
+#endif /* _RELEASE_INTERNAL_SAVEPOINT */
        + 1;
    query_buf = malloc(query_buf_len);
    if (!query_buf)
@@ -1737,10 +1748,12 @@ CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UD
    {
        snprintfcat(query_buf, query_buf_len, ";%s", appendq);
    }
+#ifdef _RELEASE_INTERNAL_SAVEPOINT
    if (query_rollback)
    {
        snprintfcat(query_buf, query_buf_len, ";%s %s", rlscmd, per_query_svp);
    }
+#endif /* _RELEASE_INTERNAL_SAVEPOINT */
 
    /* Set up notice receiver */
    nrarg.conn = self;
@@ -1787,6 +1800,22 @@ CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UD
                /* read in the return message from the backend */
                cmdbuffer = PQcmdStatus(pgres);
                mylog("send_query: ok - 'C' - %s\n", cmdbuffer);
+#ifdef _RELEASE_INTERNAL_SAVEPOINT
+               /*
+                * There are 2 risks to RELEASE an internal savepoint.
+                * One is to RELEASE the savepoint invalitated
+                * due to manually issued ROLLBACK or RELEASE.
+                * Another is to invalitate manual SAVEPOINTs unexpectedly
+                * by RELEASing the internal savepoint.
+                */
+               if (!self->is_in_internal_op && self->internal_svp)
+               {
+                   if (0 == stricmp(cmdbuffer, rlscmd) ||
+                       0 == stricmp(cmdbuffer, rbkcmd) ||
+                       0 == stricmp(cmdbuffer, svpcmd))
+                       self->internal_svp = 0;
+               }
+#endif /* _RELEASE_INTERNAL_SAVEPOINT */
 
                if (query_completed)    /* allow for "show" style notices */
                {
@@ -1998,11 +2027,26 @@ cleanup:
            if (CC_is_in_error_trans(self))
            {
                char tmpsqlbuf[100];
+#ifdef _RELEASE_INTERNAL_SAVEPOINT
+               SPRINTF_FIXED(tmpsqlbuf,
+                       "%s TO %s"
+                       ";%s %s"
+                       , rbkcmd, per_query_svp
+                       , rlscmd, per_query_svp);
+#else
                SPRINTF_FIXED(tmpsqlbuf,
-                        "%s TO %s; %s %s",
-                        rbkcmd, per_query_svp,
-                        rlscmd, per_query_svp);
+                       "%s TO %s"
+                       , rbkcmd, per_query_svp);
+#endif /* _RELEASE_INTERNAL_SAVEPOINT */
+               self->is_in_internal_op = 1;
                pgres = PQexec(self->pqconn, tmpsqlbuf);
+               self->is_in_internal_op = 0;
+               /*
+                * Though it's not appropriate to
+                * call PQExec for a multiple command,
+                * it seems OK because we don't check
+                * the result here.
+                */
            }
        }
        else if (CC_is_in_error_trans(self))
index 339907f609d19e6fa2c0f3cd2d9f410b2e0df831..5bc0f717ef62acf7d89ed8e448cc9b97b4c276c2 100644 (file)
@@ -305,6 +305,16 @@ struct ConnectionClass_
    char        unicode;
    char        result_uncommitted;
    char        lo_is_domain;
+   char        current_schema_valid;   /* is current_schema valid? TRUE when
+                        * current_schema == NULL means it's
+                        * really NULL, while FALSE means it's
+                        * unknown */
+   /* for per statement rollback */
+   char        internal_svp;       /* is set? */
+   char        is_in_internal_op;  /* an operation as to internal savepoint in progress */
+   unsigned char   lock_CC_for_rb;
+   unsigned char   rbonerr;
+
    char        *original_client_encoding;
    char        *locale_encoding;
    char        *server_encoding;
@@ -313,10 +323,6 @@ struct ConnectionClass_
    SQLUINTEGER isolation;      /* isolation level initially unknown */
    SQLUINTEGER server_isolation;   /* isolation at server initially unknown */
    char        *current_schema;
-   char        current_schema_valid;   /* is current_schema valid? TRUE when
-                                        * current_schema == NULL means it's
-                                        * really NULL, while FALSE means it's
-                                        * unknown */
    StatementClass *unnamed_prepared_stmt;
    Int2        max_identifier_length;
    Int2        num_discardp;
@@ -374,7 +380,16 @@ enum {
 #define    CC_set_dtc_isolated(x)  ((x)->gTranInfo |= DTC_ISOLATED)
 #define    CC_is_idle_in_global_transaction(x) (0 != ((x)->gTranInfo & DTC_PREPARE_REQUESTED) || (x)->gTranInfo == DTC_IN_PROGRESS)
 #endif /* _HANDLE_ENLIST_IN_DTC_ */
-
+/* statement callback */
+#define CC_start_stmt(a)        ((a)->rbonerr = 0)
+#define CC_start_tc_stmt(a)     ((a)->rbonerr = (1L << 1))
+#define CC_is_tc_stmt(a)        (((a)->rbonerr & (1L << 1)) != 0)
+#define CC_start_rb_stmt(a)     ((a)->rbonerr = (1L << 2))
+#define CC_is_rb_stmt(a)        (((a)->rbonerr & (1L << 2)) != 0)
+#define CC_set_accessed_db(a)   ((a)->rbonerr |= (1L << 3))
+#define CC_accessed_db(a)       (((a)->rbonerr & (1L << 3)) != 0)
+#define CC_start_rbpoint(a)     ((a)->rbonerr |= (1L << 4))
+#define CC_started_rbpoint(a)   (((a)->rbonerr & (1L << 4)) != 0)
 
 /* prototypes */
 ConnectionClass *CC_Constructor(void);
@@ -446,6 +461,8 @@ enum {
 #define    NO_TRANS        1L
 #define    CONN_DEAD       (1L << 1) /* connection is no longer valid */
 
+#define    _RELEASE_INTERNAL_SAVEPOINT
+
 #ifdef __cplusplus
 }
 #endif
index cd8f270b6ba0c8a8bd9b37a59bafa1bda9a17ff0..65d3bb94d637eb428dc8ddf1489282f6a3d87123 100644 (file)
--- a/execute.c
+++ b/execute.c
@@ -393,6 +393,13 @@ int HowToPrepareBeforeExec(StatementClass *stmt, BOOL checkOnly)
    return nCallParse;
 }
 
+static
+const char *GetSvpName(const ConnectionClass *conn, char *wrk, int wrksize)
+{
+   snprintf(wrk, wrksize, "_EXEC_SVP_%p_%d", conn, conn->internal_svp);
+   return wrk;
+}
+
 /*
  * The execution after all parameters were resolved.
  */
@@ -597,10 +604,10 @@ SetStatementSvp(StatementClass *stmt)
    if (CC_is_in_error_trans(conn))
        return ret;
 
-   if (0 == stmt->lock_CC_for_rb)
+   if (0 == conn->lock_CC_for_rb)
    {
        ENTER_CONN_CS(conn);
-       stmt->lock_CC_for_rb++;
+       conn->lock_CC_for_rb++;
    }
    switch (stmt->statement_type)
    {
@@ -608,26 +615,36 @@ SetStatementSvp(StatementClass *stmt)
        case STMT_TYPE_TRANSACTION:
            return ret;
    }
-   if (!SC_accessed_db(stmt))
+   if (!CC_started_rbpoint(conn))
    {
        BOOL    need_savep = FALSE;
 
        if (SC_is_rb_stmt(stmt))
        {
-           if (CC_is_in_trans(conn))
+           if (CC_is_in_trans(conn)) /* needless to issue SAVEPOINT before the 1st command */
            {
                need_savep = TRUE;
            }
        }
        if (need_savep)
        {
-           SPRINTF_FIXED(esavepoint, "_EXEC_SVP_%p", stmt);
-           SPRINTF_FIXED(cmd, "SAVEPOINT %s", esavepoint);
+           int internal_svp = conn->internal_svp;
+
+           cmd[0] = '\0';
+#ifdef _RELEASE_INTERNAL_SAVEPOINT
+           if (conn->internal_svp)
+               SPRINTF_FIXED(cmd, "RELEASE %s;", GetSvpName(conn, esavepoint, sizeof(esavepoint)));
+#endif /* _RELEASE_INTERNAL_SAVEPOINT */
+           conn->internal_svp = 1;
+           SPRINTFCAT_FIXED(cmd, "SAVEPOINT %s", GetSvpName(conn, esavepoint, sizeof(esavepoint)));
+           conn->internal_svp = internal_svp;
+           conn->is_in_internal_op = 1;
            res = CC_send_query(conn, cmd, NULL, 0, NULL);
+           conn->is_in_internal_op = 0;
            if (QR_command_maybe_successful(res))
            {
-               SC_set_accessed_db(stmt);
-               SC_start_rbpoint(stmt);
+               conn->internal_svp = 1;
+               CC_start_rbpoint(conn);
                ret = SQL_SUCCESS;
            }
            else
@@ -637,10 +654,9 @@ SetStatementSvp(StatementClass *stmt)
            }
            QR_Destructor(res);
        }
-       else
-           SC_set_accessed_db(stmt);
+       CC_set_accessed_db(conn);
    }
-inolog("%s:%p->accessed=%d\n", func, stmt, SC_accessed_db(stmt));
+inolog("%s:%p->accessed=%d\n", func, conn, CC_accessed_db(conn));
    return ret;
 }
 
@@ -648,12 +664,12 @@ RETCODE
 DiscardStatementSvp(StatementClass *stmt, RETCODE ret, BOOL errorOnly)
 {
    CSTR    func = "DiscardStatementSvp";
-   char    esavepoint[32], cmd[64];
+   char    cmd[64];
    ConnectionClass *conn = SC_get_conn(stmt);
    QResultClass *res;
    BOOL    cmd_success, start_stmt = FALSE;
 
-inolog("%s:%p->accessed=%d is_in=%d is_rb=%d is_tc=%d\n", func, stmt, SC_accessed_db(stmt),
+inolog("%s:%p->accessed=%d is_in=%d is_rb=%d is_tc=%d\n", func, conn, CC_accessed_db(conn),
 CC_is_in_trans(conn), SC_is_rb_stmt(stmt), SC_is_tc_stmt(stmt));
    switch (ret)
    {
@@ -667,17 +683,20 @@ CC_is_in_trans(conn), SC_is_rb_stmt(stmt), SC_is_tc_stmt(stmt));
                start_stmt = TRUE;
            break;
    }
-   if (!SC_accessed_db(stmt) || !CC_is_in_trans(conn))
+   if (!CC_accessed_db(conn) || !CC_is_in_trans(conn))
        goto cleanup;
    if (!SC_is_rb_stmt(stmt) && !SC_is_tc_stmt(stmt))
        goto cleanup;
-   SPRINTF_FIXED(esavepoint, "_EXEC_SVP_%p", stmt);
    if (SQL_ERROR == ret)
    {
-       if (SC_started_rbpoint(stmt))
+       if (CC_started_rbpoint(conn) && conn->internal_svp)
        {
-           SPRINTF_FIXED(cmd, "ROLLBACK to %s", esavepoint);
+           char esavepoint[32];
+
+           SPRINTF_FIXED(cmd, "ROLLBACK to %s", GetSvpName(conn, esavepoint, sizeof(esavepoint)));
+           conn->is_in_internal_op = 1;
            res = CC_send_query(conn, cmd, NULL, IGNORE_ABORT_ON_CONN, NULL);
+           conn->is_in_internal_op = 0;
            cmd_success = QR_command_maybe_successful(res);
            QR_Destructor(res);
            if (!cmd_success)
@@ -696,19 +715,6 @@ CC_is_in_trans(conn), SC_is_rb_stmt(stmt), SC_is_tc_stmt(stmt));
    else if (errorOnly)
        return ret;
 inolog("ret=%d\n", ret);
-   if (SQL_NEED_DATA != ret && SC_started_rbpoint(stmt))
-   {
-       SPRINTF_FIXED(cmd, "RELEASE %s", esavepoint);
-       res = CC_send_query(conn, cmd, NULL, IGNORE_ABORT_ON_CONN, NULL);
-       cmd_success = QR_command_maybe_successful(res);
-       QR_Destructor(res);
-       if (!cmd_success)
-       {
-           SC_set_error(stmt, STMT_INTERNAL_ERROR, "internal RELEASE failed", func);
-           CC_abort(conn);
-           ret = SQL_ERROR;
-       }
-   }
 cleanup:
 #ifdef NOT_USED
    if (!SC_is_prepare_statement(stmt) && ONCE_DESCRIBED == stmt->prepared)
@@ -716,12 +722,12 @@ cleanup:
 #endif
    if (start_stmt || SQL_ERROR == ret)
    {
-       if (stmt->lock_CC_for_rb > 0)
+       while (conn->lock_CC_for_rb > 0)
        {
            LEAVE_CONN_CS(conn);
-           stmt->lock_CC_for_rb--;
+           conn->lock_CC_for_rb--;
        }
-       SC_start_stmt(stmt);
+       CC_start_stmt(conn);
    }
    return ret;
 }
index 66ef2310bbd434647e527422778b9048c9c8e4dc..da326ffb6332c342d40b494e6605869c2bd47487 100644 (file)
@@ -417,7 +417,6 @@ SC_Constructor(ConnectionClass *conn)
        rv->exec_current_row = -1;
        rv->put_data = FALSE;
        rv->ref_CC_error = FALSE;
-       rv->lock_CC_for_rb = 0;
        rv->join_info = 0;
        rv->curr_param_result = 0;
        SC_init_parse_method(rv);
@@ -446,7 +445,7 @@ SC_Constructor(ConnectionClass *conn)
                rv, SQL_ATTR_IMP_PARAM_DESC);
 
        rv->miscinfo = 0;
-       rv->rbonerr = 0;
+       rv->rb_or_tc = 0;
        SC_reset_updatable(rv);
        rv->diag_row_count = 0;
        rv->stmt_time = 0;
@@ -688,14 +687,6 @@ SC_initialize_stmts(StatementClass *self, BOOL initializeOriginal)
    ProcessedStmt *pstmt;
    ProcessedStmt *next_pstmt;
 
-   if (self->lock_CC_for_rb > 0)
-   {
-       while (self->lock_CC_for_rb > 0)
-       {
-           LEAVE_CONN_CS(conn);
-           self->lock_CC_for_rb--;
-       }
-   }
    if (initializeOriginal)
    {
        if (self->statement)
@@ -2365,7 +2356,7 @@ RequestStart(StatementClass *stmt, ConnectionClass *conn, const char *func)
        SC_set_error(stmt, STMT_COMMUNICATION_ERROR, "The connection has been lost", __FUNCTION__);
        return SQL_ERROR;
    }
-   if (SC_accessed_db(stmt))
+   if (CC_started_rbpoint(conn))
        return TRUE;
    if (SQL_ERROR == SetStatementSvp(stmt))
    {
@@ -2412,7 +2403,7 @@ libpq_bind_and_exec(StatementClass *stmt)
    if (!RequestStart(stmt, conn, func))
        return NULL;
 
-   if (CC_is_in_trans(conn) && !SC_accessed_db(stmt))
+   if (CC_is_in_trans(conn) && !CC_started_rbpoint(conn))
    {
        if (SQL_ERROR == SetStatementSvp(stmt))
        {
index 304284bb998a7e056210b1164503d15f5be05898..9da7712554e005d40da3367c7250fe67b5b6346d 100644 (file)
@@ -263,11 +263,11 @@ struct StatementClass_
    po_ind_t    external;   /* Allocated via SQLAllocHandle() */
    po_ind_t    transition_status;  /* Transition status */
    po_ind_t    multi_statement; /* -1:unknown 0:single 1:multi */
-   po_ind_t    rbonerr;    /* rollback on error */
+   po_ind_t    rb_or_tc;   /* rollback on error */
    po_ind_t    discard_output_params;   /* discard output parameters on parse stage */
    po_ind_t    cancel_info;    /* cancel information */
    po_ind_t    ref_CC_error;   /* refer to CC_error ? */
-   po_ind_t    lock_CC_for_rb; /* lock CC for statement rollback ? */
+// po_ind_t    lock_CC_for_rb; /* lock CC for statement rollback ? */
    po_ind_t    join_info;  /* have joins ? */
    po_ind_t    parse_method;   /* parse_statement is forced or ? */
    po_ind_t    curr_param_result; /* current param result is set ? */
@@ -436,15 +436,10 @@ enum
 #define SC_set_outer_join(a)   ((a)->join_info |= STMT_HAS_OUTER_JOIN)
 #define SC_set_inner_join(a)   ((a)->join_info |= STMT_HAS_INNER_JOIN)
 
-#define SC_start_stmt(a)   ((a)->rbonerr = 0)
-#define SC_start_tc_stmt(a)    ((a)->rbonerr = (1L << 1))
-#define SC_is_tc_stmt(a)   (((a)->rbonerr & (1L << 1)) != 0)
-#define SC_start_rb_stmt(a)    ((a)->rbonerr = (1L << 2))
-#define SC_is_rb_stmt(a)   (((a)->rbonerr & (1L << 2)) != 0)
-#define SC_set_accessed_db(a)  ((a)->rbonerr |= (1L << 3))
-#define SC_accessed_db(a)  (((a)->rbonerr & (1L << 3)) != 0)
-#define SC_start_rbpoint(a)    ((a)->rbonerr |= (1L << 4))
-#define SC_started_rbpoint(a)  (((a)->rbonerr & (1L << 4)) != 0)
+#define SC_start_tc_stmt(a)    ((a)->rb_or_tc = (1L << 1))
+#define SC_is_tc_stmt(a)   (((a)->rb_or_tc & (1L << 1)) != 0)
+#define SC_start_rb_stmt(a)    ((a)->rb_or_tc = (1L << 2))
+#define SC_is_rb_stmt(a)   (((a)->rb_or_tc & (1L << 2)) != 0)
 #define SC_unref_CC_error(a)   (((a)->ref_CC_error) = FALSE)
 #define SC_ref_CC_error(a) (((a)->ref_CC_error) = TRUE)
 #define SC_can_parse_statement(a) (STMT_TYPE_SELECT == (a)->statement_type)