bdr: Significantly improve the commandfilter.
authorAndres Freund <andres@anarazel.de>
Thu, 1 May 2014 17:01:14 +0000 (19:01 +0200)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:30 +0000 (17:55 +0200)
Several previously allowed command are now forbidden and vice versa.

contrib/bdr/bdr_commandfilter.c

index 54c981c659b2f8dbfa69a670380dfeaf21a4d606..855ae1b872be9d4bfe87180ba56a4a693f746990 100644 (file)
 #include "catalog/namespace.h"
 
 #include "commands/dbcommands.h"
+#include "commands/event_trigger.h"
+#include "commands/extension.h"
+#include "commands/tablecmds.h"
 
 #include "tcop/utility.h"
 
 #include "utils/guc.h"
 #include "utils/rel.h"
 
+static void error_unsupported_command(const char *cmdtag) __attribute__((noreturn));
+
 /*
 * bdr_commandfilter.c: a ProcessUtility_hook to prevent a cluster from running
 * commands that BDR does not yet support.
@@ -52,12 +57,10 @@ error_on_persistent_rv(RangeVar *rv,
    bool        needswal;
    Relation    rel;
 
-   if (bdr_permit_unsafe_commands)
-       return;
-
    if (rv == NULL)
        ereport(ERROR,
-               (errmsg("Unqualified command %s is unsafe with BDR active.",
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("Unqualified command %s is unsafe with BDR active.",
                        cmdtag)));
 
    rel = heap_openrv_extended(rv, lockmode, missing_ok);
@@ -68,12 +71,164 @@ error_on_persistent_rv(RangeVar *rv,
        heap_close(rel, lockmode);
        if (needswal)
            ereport(ERROR,
-                (errmsg("%s may only affect UNLOGGED or TEMPORARY tables " \
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("%s may only affect UNLOGGED or TEMPORARY tables " \
                         "when BDR is active; %s is a regular table",
                         cmdtag, rv->relname)));
    }
 }
 
+static void
+error_unsupported_command(const char *cmdtag)
+{
+   ereport(ERROR,
+           (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+            errmsg("%s is not support when bdr is active",
+                   cmdtag)));
+}
+
+static void
+filter_CreateStmt(Node *parsetree,
+                 char *completionTag)
+{
+   CreateStmt *stmt;
+   ListCell   *cell;
+
+   stmt = (CreateStmt *) parsetree;
+
+   if (stmt->ofTypename != NULL)
+       error_unsupported_command("CREATE TABLE ... OF TYPE");
+
+   foreach(cell, stmt->tableElts)
+   {
+       Node       *element = lfirst(cell);
+
+       if (nodeTag(element) == T_Constraint)
+       {
+           Constraint *con = (Constraint *) element;
+
+           if (con->contype == CONSTR_EXCLUSION &&
+               stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
+           {
+               ereport(ERROR,
+                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                        errmsg("EXCLUDE constraints are unsafe with BDR active")));
+           }
+       }
+   }
+}
+
+static void
+filter_AlterTableStmt(Node *parsetree,
+                  char *completionTag)
+{
+   AlterTableStmt *astmt;
+   ListCell   *cell;
+   bool        hasInvalid;
+
+   astmt = (AlterTableStmt *) parsetree;
+   hasInvalid = false;
+
+   foreach(cell, astmt->cmds)
+   {
+       AlterTableCmd *stmt;
+
+       Assert(IsA(lfirst(cell), AlterTableCmd));
+       stmt = (AlterTableCmd *) lfirst(cell);
+
+       switch (stmt->subtype)
+       {
+           /*
+            * allowed for now:
+            */
+           case AT_AddColumn:
+               {
+                   ColumnDef *def = (ColumnDef *) stmt->def;
+                   ListCell   *cell;
+
+                   /*
+                    * Error out if there's a default for the new
+                    * column, that requires a table rewrite which
+                    * might be nondeterministic.
+                    */
+                   if (def->raw_default != NULL ||
+                       def->cooked_default != NULL)
+                   {
+                       error_on_persistent_rv(
+                           astmt->relation,
+                           "ALTER TABLE ... ADD COLUMN ... DEFAULT",
+                           AlterTableGetLockLevel(astmt->cmds),
+                           astmt->missing_ok);
+                   }
+
+                   /*
+                    * Column defaults can also be represented as
+                    * constraints.
+                    */
+                   foreach(cell, def->constraints)
+                   {
+                       Constraint *con;
+
+                       Assert(IsA(lfirst(cell), Constraint));
+                       con = (Constraint *) lfirst(cell);
+
+                       if (con->contype == CONSTR_DEFAULT)
+                           error_on_persistent_rv(
+                               astmt->relation,
+                               "ALTER TABLE ... ADD COLUMN ... DEFAULT",
+                               AlterTableGetLockLevel(astmt->cmds),
+                               astmt->missing_ok);
+                   }
+               }
+           case AT_DropColumn:
+           case AT_DropNotNull:
+           case AT_SetNotNull:
+           case AT_ColumnDefault:      /* ALTER COLUMN DEFAULT */
+
+           case AT_ClusterOn:          /* CLUSTER ON */
+           case AT_DropCluster:        /* SET WITHOUT CLUSTER */
+
+           case AT_SetRelOptions:      /* SET (...) */
+           case AT_ResetRelOptions:    /* RESET (...) */
+           case AT_ReplaceRelOptions:  /* replace reloption list */
+           case AT_ReplicaIdentity:
+           case AT_ChangeOwner:
+           case AT_SetStorage:
+               break;
+
+           case AT_DropConstraint:
+               break;
+
+           case AT_SetTableSpace:
+               break;
+
+           case AT_AddConstraint:
+           case AT_ProcessedConstraint:
+               if (IsA(stmt->def, Constraint))
+               {
+                   Constraint *con = (Constraint *) stmt->def;
+
+                   if (con->contype == CONSTR_EXCLUSION)
+                       error_on_persistent_rv(astmt->relation,
+                                              "ALTER TABLE ... ADD CONSTRAINT ... EXCLUDE",
+                                              AlterTableGetLockLevel(astmt->cmds),
+                                              astmt->missing_ok);
+               }
+               break;
+
+           default:
+               hasInvalid = true;
+               break;
+       }
+   }
+
+   if (hasInvalid)
+       error_on_persistent_rv(astmt->relation,
+                              "ALTER TABLE",
+                              AlterTableGetLockLevel(astmt->cmds),
+                              astmt->missing_ok);
+}
+
 static void
 bdr_commandfilter(Node *parsetree,
                  const char *queryString,
@@ -82,145 +237,257 @@ bdr_commandfilter(Node *parsetree,
                  DestReceiver *dest,
                  char *completionTag)
 {
-   ListCell   *cell;
-   AlterTableStmt *alterTableStatement;
-   CreateStmt *createStatement;
-   bool        hasInvalid;
-   Constraint *con;
-   IndexStmt  *indexStmt;
+   /* don't filter if explicitly told so */
+   if (bdr_permit_unsafe_commands)
+       goto done;
 
-   ereport(DEBUG4,
-        (errmsg_internal("bdr_commandfilter ProcessUtility_hook invoked")));
+   /* extension contents aren't individually replicated */
+   if (creating_extension)
+       goto done;
 
+   /* statements handled directly in standard_ProcessUtility */
    switch (nodeTag(parsetree))
    {
-       case T_SecLabelStmt:    /* XXX what about this? */
-           error_on_persistent_rv(((ClusterStmt *) parsetree)->relation,
-                              "SECURITY LABEL", AccessExclusiveLock, false);
-           break;
+       case T_TransactionStmt:
+       case T_PlannedStmt:
+       case T_ClosePortalStmt:
+       case T_FetchStmt:
+       case T_DoStmt:
+       case T_CreateTableSpaceStmt:
+       case T_DropTableSpaceStmt:
+       case T_AlterTableSpaceOptionsStmt:
+       case T_TruncateStmt:
+       case T_CommentStmt: /* XXX: we could replicate these */;
+       case T_SecLabelStmt: /* XXX: we could replicate these */;
+       case T_CopyStmt:
+       case T_PrepareStmt:
+       case T_ExecuteStmt:
+       case T_DeallocateStmt:
+       case T_GrantStmt: /* XXX: we could replicate some of these these */;
+       case T_GrantRoleStmt:
+       case T_CreatedbStmt:
+       case T_AlterDatabaseStmt:
+       case T_AlterDatabaseSetStmt:
+       case T_DropdbStmt:
+       case T_NotifyStmt:
+       case T_ListenStmt:
+       case T_UnlistenStmt:
+       case T_LoadStmt:
+       case T_ClusterStmt: /* XXX: we could replicate these */;
+       case T_VacuumStmt:
+       case T_ExplainStmt:
+       case T_AlterSystemStmt:
+       case T_VariableSetStmt:
+       case T_VariableShowStmt:
+       case T_DiscardStmt:
+       case T_CreateEventTrigStmt:
+       case T_AlterEventTrigStmt:
+       case T_CreateRoleStmt:
+       case T_AlterRoleStmt:
+       case T_AlterRoleSetStmt:
+       case T_DropRoleStmt:
+       case T_ReassignOwnedStmt:
+       case T_LockStmt:
+       case T_ConstraintsSetStmt:
+       case T_CheckPointStmt:
+       case T_ReindexStmt:
+           goto done;
+
+       case T_DropStmt:
+           {
+               DropStmt   *stmt = (DropStmt *) parsetree;
 
-       case T_CreateStmt:
-           createStatement = (CreateStmt *) parsetree;
+               if (EventTriggerSupportsObjectType(stmt->removeType))
+                   break;
+               else
+                   goto done;
+           }
+       case T_RenameStmt:
+           {
+               RenameStmt *stmt = (RenameStmt *) parsetree;
 
-           foreach(cell, createStatement->tableElts)
+               if (EventTriggerSupportsObjectType(stmt->renameType))
+                   break;
+               else
+                   goto done;
+           }
+       case T_AlterObjectSchemaStmt:
            {
-               Node       *element = lfirst(cell);
+               AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) parsetree;
 
-               if (nodeTag(element) == T_Constraint)
-               {
-                   con = (Constraint *) element;
-                   if (con->contype == CONSTR_EXCLUSION &&
-                       createStatement->relation->relpersistence != RELPERSISTENCE_TEMP)
-                   {
-                       if (!bdr_permit_unsafe_commands)
-                           ereport(ERROR,
-                                   (errmsg("EXCLUDE constraints are unsafe with BDR active")));
-                   }
-               }
+               if (EventTriggerSupportsObjectType(stmt->objectType))
+                   break;
+               else
+                   goto done;
            }
+       case T_AlterOwnerStmt:
+           {
+               AlterOwnerStmt *stmt = (AlterOwnerStmt *) parsetree;
 
+               if (EventTriggerSupportsObjectType(stmt->objectType))
+                   break;
+               else
+                   goto done;
+           }
+       default:
            break;
+   }
 
-       case T_IndexStmt:
-           indexStmt = (IndexStmt *) parsetree;
+   /* all commands handled by ProcessUtilitySlow() */
+   switch (nodeTag(parsetree))
+   {
+       case T_CreateSchemaStmt:
+           break;
+       case T_CreateStmt:
+           filter_CreateStmt(parsetree, completionTag);
 
-           if (indexStmt->whereClause && indexStmt->unique)
-               error_on_persistent_rv(indexStmt->relation,
-                                      "CREATE UNIQUE INDEX ... WHERE",
-                                      AccessExclusiveLock, false);
+       case T_CreateForeignTableStmt:
+           break;
 
-           /*
-            * XXX allow ALTER TABLE statements when stabilized; still forbid *
-            * ALTER TABLE RENAME as well as ALTER TYPE (which seems to be
-            * handled by ALTER TYPE, too)
-            */
        case T_AlterTableStmt:
-           alterTableStatement = (AlterTableStmt *) parsetree;
-           hasInvalid = false;
+           filter_AlterTableStmt(parsetree, completionTag);
+           break;
+
+       case T_AlterDomainStmt:
+           /* XXX: we could support this */
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_DefineStmt:
+           /* XXX: we could support some of these, primarily CREATE TYPE */
+           error_unsupported_command(completionTag);
+           break;
 
-           foreach(cell, alterTableStatement->cmds)
+       case T_IndexStmt:
            {
-               AlterTableCmd *stmt = (AlterTableCmd *) lfirst(cell);
+               IndexStmt  *stmt;
 
-               switch (stmt->subtype)
-               {
-                       /*
-                        * allowed for now:
-                        */
-                   case AT_AddColumn:
-                   case AT_DropColumn:
-                   case AT_DropNotNull:
-                   case AT_SetNotNull:
-                   case AT_ColumnDefault:      /* ALTER COLUMN DEFAULT */
-
-                   case AT_ClusterOn:          /* CLUSTER ON */
-                   case AT_DropCluster:        /* SET WITHOUT CLUSTER */
-
-                   case AT_SetRelOptions:      /* SET (...) */
-                   case AT_ResetRelOptions:    /* RESET (...) */
-                   case AT_ReplaceRelOptions:  /* replace reloption list */
-                       break;
-
-                   case AT_AddConstraint:
-                   case AT_ProcessedConstraint:
-                       if (IsA(stmt->def, Constraint))
-                       {
-                           con = (Constraint *) stmt->def;
-
-                           if (con->contype == CONSTR_EXCLUSION)
-                               error_on_persistent_rv(alterTableStatement->relation,
-                               "ALTER TABLE ... ADD CONSTRAINT ... EXCLUDE",
-                                                      AccessExclusiveLock,
-                                           alterTableStatement->missing_ok);
-                       }
-                       break;
-
-                   default:
-                       hasInvalid = true;
-                       break;
-               }
+               stmt = (IndexStmt *) parsetree;
+
+               if (stmt->whereClause && stmt->unique)
+                   error_on_persistent_rv(stmt->relation,
+                                          "CREATE UNIQUE INDEX ... WHERE",
+                                          AccessExclusiveLock, false);
            }
+       case T_CreateExtensionStmt:
+           break;
+
+       case T_AlterExtensionStmt:
+           /* XXX: we could support some of these */
+           error_unsupported_command(completionTag);
+           break;
 
-           if (hasInvalid)
-               error_on_persistent_rv(alterTableStatement->relation,
-                                      "ALTER TABLE", AccessExclusiveLock,
-                                      alterTableStatement->missing_ok);
+       case T_AlterExtensionContentsStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_CreateFdwStmt:
+       case T_AlterFdwStmt:
+       case T_CreateForeignServerStmt:
+       case T_AlterForeignServerStmt:
+       case T_CreateUserMappingStmt:
+       case T_AlterUserMappingStmt:
+       case T_DropUserMappingStmt:
+           /* XXX: we should probably support all of these at some point */
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_CompositeTypeStmt:   /* CREATE TYPE (composite) */
+       case T_CreateEnumStmt:      /* CREATE TYPE AS ENUM */
+       case T_CreateRangeStmt:     /* CREATE TYPE AS RANGE */
            break;
 
        case T_AlterEnumStmt:
-           if (!bdr_permit_unsafe_commands)
-               ereport(ERROR,
-                       (errmsg("ALTER TYPE ... ADD VALUE is unsafe with BDR active")));
+       case T_ViewStmt:    /* CREATE VIEW */
+       case T_CreateFunctionStmt:  /* CREATE FUNCTION */
+       case T_AlterFunctionStmt:   /* ALTER FUNCTION */
+       case T_RuleStmt:    /* CREATE RULE */
+       case T_CreateSeqStmt:
+       case T_AlterSeqStmt:
+           break;
+
+       case T_CreateTableAsStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_RefreshMatViewStmt:
+           /* XXX: might make sense to support or not */
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_CreateTrigStmt:
+           break;
+
+       case T_CreatePLangStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_CreateDomainStmt:
+           break;
+
+       case T_CreateConversionStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_CreateCastStmt:
+       case T_CreateOpClassStmt:
+       case T_CreateOpFamilyStmt:
+       case T_AlterOpFamilyStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_AlterTSDictionaryStmt:
+       case T_AlterTSConfigurationStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_DropStmt:
+       case T_RenameStmt:
+           /* FIXME: catch unsupported rename operations */
+           break;
+
+       case T_AlterObjectSchemaStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_AlterTableSpaceMoveStmt:
+           /* XXX: forbid? */
+           break;
+
+       case T_AlterOwnerStmt:
+           /* local only for now*/
+           break;
+
+       case T_DropOwnedStmt:
+           error_unsupported_command(completionTag);
+           break;
+
+       case T_AlterDefaultPrivilegesStmt:
+           break;
+
+       case T_SecLabelStmt:
+           error_unsupported_command(completionTag);
            break;
 
        default:
+           elog(ERROR, "unrecognized node type: %d",
+                (int) nodeTag(parsetree));
            break;
    }
 
+done:
    if (next_ProcessUtility_hook)
-   {
-       ereport(DEBUG4,
-               (errmsg_internal("bdr_commandfilter ProcessUtility_hook " \
-                                "handing off to next hook ")));
-       (*next_ProcessUtility_hook) (parsetree, queryString, context, params,
-                                    dest, completionTag);
-   }
+       next_ProcessUtility_hook(parsetree, queryString, context, params,
+                                dest, completionTag);
    else
-   {
-       ereport(DEBUG4,
-               (errmsg_internal("bdr_commandfilter ProcessUtility_hook " \
-                                "invoking standard_ProcessUtility")));
        standard_ProcessUtility(parsetree, queryString, context, params,
                                dest, completionTag);
-   }
 }
 
 /* Module load */
 void
 init_bdr_commandfilter(void)
 {
-   ereport(DEBUG4, (errcode(ERRCODE_SUCCESSFUL_COMPLETION),
-       errmsg_internal("bdr_commandfilter ProcessUtility_hook installed")));
    next_ProcessUtility_hook = ProcessUtility_hook;
    ProcessUtility_hook = bdr_commandfilter;