Clean up two rather nasty bugs in operator selection code.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 16 Feb 2001 03:16:58 +0000 (03:16 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 16 Feb 2001 03:16:58 +0000 (03:16 +0000)
1. If there is exactly one pg_operator entry of the right name and oprkind,
oper() and related routines would return that entry whether its input type
had anything to do with the request or not.  This is just premature
optimization: we shouldn't return the single candidate until after we verify
that it really is a valid candidate, ie, is at least coercion-compatible
with the given types.

2. oper() and related routines only promise a coercion-compatible result.
Unfortunately, there were quite a few callers that assumed the returned
operator is binary-compatible with the given datatype; they would proceed
to call it without making any datatype coercions.  These callers include
sorting, grouping, aggregation, and VACUUM ANALYZE.  In general I think
it is appropriate for these callers to require an exact or binary-compatible
match, so I've added a new routine compatible_oper() that only succeeds if
it can find an operator that doesn't require any run-time conversions.
Callers now call oper() or compatible_oper() depending on whether they are
prepared to deal with type conversion or not.

The upshot of these bugs is revealed by the following silliness in PL/Tcl's
selftest: it creates an operator @< on int4, and then tries to use it to
sort a char(N) column.  The system would let it do that :-( (and evidently
has done so since 6.3 :-( :-().  The result in this case was just a silly
sort order, but the reverse combination would've provoked coredump from
trying to dereference integers.  With this fix you get more reasonable
behavior:
pltcl_test=# select * from T_pkey1 order by key1, key2 using @<;
ERROR:  Unable to identify an operator '@<' for types 'bpchar' and 'bpchar'
        You will have to retype this query using an explicit cast

src/backend/commands/analyze.c
src/backend/executor/nodeAgg.c
src/backend/executor/nodeGroup.c
src/backend/optimizer/path/indxpath.c
src/backend/optimizer/plan/initsplan.c
src/backend/parser/parse_clause.c
src/backend/parser/parse_expr.c
src/backend/parser/parse_oper.c
src/include/parser/parse_oper.h

index e0afbfc321b54c3700ebd32b2879f0e0f3fb7de5..0b3e4096b96848fef5022e554d2be2be0c50e3f8 100644 (file)
@@ -152,7 +152,6 @@ analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
        for (i = 0; i < attr_cnt; i++)
        {
                Operator        func_operator;
-               Form_pg_operator pgopform;
                VacAttrStats *stats;
 
                stats = &vacattrstats[i];
@@ -167,21 +166,25 @@ analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
                stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
                stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
 
-               func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
+               func_operator = compatible_oper("=",
+                                                                               stats->attr->atttypid,
+                                                                               stats->attr->atttypid,
+                                                                               true);
                if (func_operator != NULL)
                {
-                       pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
-                       fmgr_info(pgopform->oprcode, &(stats->f_cmpeq));
+                       fmgr_info(oprfuncid(func_operator), &(stats->f_cmpeq));
                        ReleaseSysCache(func_operator);
                }
                else
                        stats->f_cmpeq.fn_addr = NULL;
 
-               func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
+               func_operator = compatible_oper("<",
+                                                                               stats->attr->atttypid,
+                                                                               stats->attr->atttypid,
+                                                                               true);
                if (func_operator != NULL)
                {
-                       pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
-                       fmgr_info(pgopform->oprcode, &(stats->f_cmplt));
+                       fmgr_info(oprfuncid(func_operator), &(stats->f_cmplt));
                        stats->op_cmplt = oprid(func_operator);
                        ReleaseSysCache(func_operator);
                }
@@ -191,11 +194,13 @@ analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
                        stats->op_cmplt = InvalidOid;
                }
 
-               func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
+               func_operator = compatible_oper(">",
+                                                                               stats->attr->atttypid,
+                                                                               stats->attr->atttypid,
+                                                                               true);
                if (func_operator != NULL)
                {
-                       pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
-                       fmgr_info(pgopform->oprcode, &(stats->f_cmpgt));
+                       fmgr_info(oprfuncid(func_operator), &(stats->f_cmpgt));
                        ReleaseSysCache(func_operator);
                }
                else
index ab6dcc22668ab362cd84871abb74f11b808f9bdf..1190b0362d1a2c55c006209a8f0296be8c1d34be 100644 (file)
@@ -909,21 +909,19 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
                         * (Consider COUNT(*).)
                         */
                        Oid                     inputType = exprType(aggref->target);
-                       Operator        eq_operator;
-                       Form_pg_operator pgopform;
+                       Oid                     eq_function;
 
                        peraggstate->inputType = inputType;
                        get_typlenbyval(inputType,
                                                        &peraggstate->inputtypeLen,
                                                        &peraggstate->inputtypeByVal);
 
-                       eq_operator = oper("=", inputType, inputType, true);
-                       if (!HeapTupleIsValid(eq_operator))
+                       eq_function = compatible_oper_funcid("=", inputType, inputType,
+                                                                                                true);
+                       if (!OidIsValid(eq_function))
                                elog(ERROR, "Unable to identify an equality operator for type '%s'",
                                         typeidTypeName(inputType));
-                       pgopform = (Form_pg_operator) GETSTRUCT(eq_operator);
-                       fmgr_info(pgopform->oprcode, &(peraggstate->equalfn));
-                       ReleaseSysCache(eq_operator);
+                       fmgr_info(eq_function, &(peraggstate->equalfn));
                        peraggstate->sortOperator = any_ordering_op(inputType);
                        peraggstate->sortstate = NULL;
                }
index 210becab9573c85e3a59902cb7d6083e902affca..656e5f64900a64145255f96da4453ae2a671a542 100644 (file)
@@ -495,16 +495,13 @@ execTuplesMatchPrepare(TupleDesc tupdesc,
        {
                AttrNumber      att = matchColIdx[i];
                Oid                     typid = tupdesc->attrs[att - 1]->atttypid;
-               Operator        eq_operator;
-               Form_pg_operator pgopform;
+               Oid                     eq_function;
 
-               eq_operator = oper("=", typid, typid, true);
-               if (!HeapTupleIsValid(eq_operator))
+               eq_function = compatible_oper_funcid("=", typid, typid, true);
+               if (!OidIsValid(eq_function))
                        elog(ERROR, "Unable to identify an equality operator for type '%s'",
                                 typeidTypeName(typid));
-               pgopform = (Form_pg_operator) GETSTRUCT(eq_operator);
-               fmgr_info(pgopform->oprcode, &eqfunctions[i]);
-               ReleaseSysCache(eq_operator);
+               fmgr_info(eq_function, &eqfunctions[i]);
        }
 
        return eqfunctions;
index c9f89d16b1736477095670d48dff20a42ee3c858..e345a6b4fb98226cd670f284f671ed4b0c7f616c 100644 (file)
@@ -878,7 +878,7 @@ indexable_operator(Expr *clause, Oid opclass, Oid relam,
         * (In theory this might find a non-semantically-comparable operator,
         * but in practice that seems pretty unlikely for binary-compatible types.)
         */
-       new_op = oper_oid(opname, indexkeytype, indexkeytype, true);
+       new_op = compatible_oper_opid(opname, indexkeytype, indexkeytype, true);
 
        if (OidIsValid(new_op))
        {
index 2108a15533de94b794fdf3d6bc9a4424a30e6858..2da7089c336a3c37d6df01b66a706defeeb2f5a1 100644 (file)
@@ -617,7 +617,7 @@ process_implied_equality(Query *root, Node *item1, Node *item2,
         */
        ltype = exprType(item1);
        rtype = exprType(item2);
-       eq_operator = oper("=", ltype, rtype, true);
+       eq_operator = compatible_oper("=", ltype, rtype, true);
        if (!HeapTupleIsValid(eq_operator))
        {
                /*
index 8fb9b33457736d6875528c33825a2732772784fc..0e8ebb3cd0d7a2fc0763970435350c09e93c0352 100644 (file)
@@ -1155,10 +1155,10 @@ addTargetToSortList(TargetEntry *tle, List *sortlist, List *targetlist,
                sortcl->tleSortGroupRef = assignSortGroupRef(tle, targetlist);
 
                if (opname)
-                       sortcl->sortop = oper_oid(opname,
-                                                                         tle->resdom->restype,
-                                                                         tle->resdom->restype,
-                                                                         false);
+                       sortcl->sortop = compatible_oper_opid(opname,
+                                                                                                 tle->resdom->restype,
+                                                                                                 tle->resdom->restype,
+                                                                                                 false);
                else
                        sortcl->sortop = any_ordering_op(tle->resdom->restype);
 
index fab429c810f2d74404db038a2e2164246e40965d..7097d29362f35c4456b2c27e54da9dfd8ee51097 100644 (file)
@@ -383,6 +383,11 @@ transformExpr(ParseState *pstate, Node *expr, int precedence)
                                                lexpr = lfirst(left_list);
                                                left_list = lnext(left_list);
 
+                                               /*
+                                                * It's OK to use oper() not compatible_oper() here,
+                                                * because make_subplan() will insert type coercion
+                                                * calls if needed.
+                                                */
                                                optup = oper(op,
                                                                         exprType(lexpr),
                                                                         exprType(tent->expr),
index 60b4f7a465db8bb720bab8f8d6cb249fe004fbaf..713db9acfff3fbbf85b2fc1f6d296619d15de038 100644 (file)
@@ -40,15 +40,15 @@ static void unary_op_error(char *op, Oid arg, bool is_left_op);
 
 /* Select an ordering operator for the given datatype */
 Oid
-any_ordering_op(Oid restype)
+any_ordering_op(Oid argtype)
 {
        Oid                     order_opid;
 
-       order_opid = oper_oid("<", restype, restype, true);
+       order_opid = compatible_oper_opid("<", argtype, argtype, true);
        if (!OidIsValid(order_opid))
                elog(ERROR, "Unable to identify an ordering operator '%s' for type '%s'"
                         "\n\tUse an explicit ordering operator or modify the query",
-                        "<", typeidTypeName(restype));
+                        "<", typeidTypeName(argtype));
        return order_opid;
 }
 
@@ -59,6 +59,15 @@ oprid(Operator op)
        return op->t_data->t_oid;
 }
 
+/* given operator tuple, return the underlying function's OID */
+Oid
+oprfuncid(Operator op)
+{
+       Form_pg_operator        pgopform = (Form_pg_operator) GETSTRUCT(op);
+
+       return pgopform->oprcode;
+}
+
 
 /* binary_oper_get_candidates()
  *     given opname, find all possible input type pairs for which an operator
@@ -119,7 +128,7 @@ binary_oper_get_candidates(char *opname,
 
 
 /* oper_select_candidate()
- * Given the input argtype array and more than one candidate
+ * Given the input argtype array and one or more candidates
  * for the function argtype array, attempt to resolve the conflict.
  * Returns the selected argtype array if the conflict can be resolved,
  * otherwise returns NULL.
@@ -593,34 +602,22 @@ oper_inexact(char *op, Oid arg1, Oid arg2)
        if (ncandidates == 0)
                return NULL;
 
-       /* Or found exactly one? Then proceed... */
-       else if (ncandidates == 1)
+       /* Otherwise, check for compatible datatypes, and then try to resolve
+        * the conflict if more than one candidate remains.
+        */
+       inputOids[0] = arg1;
+       inputOids[1] = arg2;
+       targetOids = oper_select_candidate(2, inputOids, candidates);
+       if (targetOids != NULL)
        {
                tup = SearchSysCache(OPERNAME,
                                                         PointerGetDatum(op),
-                                                        ObjectIdGetDatum(candidates->args[0]),
-                                                        ObjectIdGetDatum(candidates->args[1]),
+                                                        ObjectIdGetDatum(targetOids[0]),
+                                                        ObjectIdGetDatum(targetOids[1]),
                                                         CharGetDatum('b'));
-               Assert(HeapTupleIsValid(tup));
        }
-
-       /* Otherwise, multiple operators of the desired types found... */
        else
-       {
-               inputOids[0] = arg1;
-               inputOids[1] = arg2;
-               targetOids = oper_select_candidate(2, inputOids, candidates);
-               if (targetOids != NULL)
-               {
-                       tup = SearchSysCache(OPERNAME,
-                                                                PointerGetDatum(op),
-                                                                ObjectIdGetDatum(targetOids[0]),
-                                                                ObjectIdGetDatum(targetOids[1]),
-                                                                CharGetDatum('b'));
-               }
-               else
-                       tup = NULL;
-       }
+               tup = NULL;
        return (Operator) tup;
 }
 
@@ -628,6 +625,10 @@ oper_inexact(char *op, Oid arg1, Oid arg2)
 /* oper() -- search for a binary operator
  * Given operator name, types of arg1 and arg2, return oper struct.
  *
+ * IMPORTANT: the returned operator (if any) is only promised to be
+ * coercion-compatible with the input datatypes.  Do not use this if
+ * you need an exact- or binary-compatible match; see compatible_oper.
+ *
  * If no matching operator found, return NULL if noError is true,
  * raise an error if it is false.
  *
@@ -653,19 +654,51 @@ oper(char *opname, Oid ltypeId, Oid rtypeId, bool noError)
        return (Operator) NULL;
 }
 
-/* oper_oid() -- get OID of a binary operator
+/* compatible_oper()
+ *     given an opname and input datatypes, find a compatible binary operator
+ *
+ *     This is tighter than oper() because it will not return an operator that
+ *     requires coercion of the input datatypes (but binary-compatible operators
+ *     are accepted).  Otherwise, the semantics are the same.
+ */
+Operator
+compatible_oper(char *op, Oid arg1, Oid arg2, bool noError)
+{
+       Operator        optup;
+       Form_pg_operator        opform;
+
+       /* oper() will find the best available match */
+       optup = oper(op, arg1, arg2, noError);
+       if (optup == (Operator) NULL)
+               return (Operator) NULL; /* must be noError case */
+
+       /* but is it good enough? */
+       opform = (Form_pg_operator) GETSTRUCT(optup);
+       if ((opform->oprleft == arg1 ||
+                IS_BINARY_COMPATIBLE(opform->oprleft, arg1)) &&
+               (opform->oprright == arg2 ||
+                IS_BINARY_COMPATIBLE(opform->oprright, arg2)))
+               return optup;
+
+       if (!noError)
+               op_error(op, arg1, arg2);
+
+       return (Operator) NULL;
+}
+
+/* compatible_oper_opid() -- get OID of a binary operator
  *
  * This is a convenience routine that extracts only the operator OID
- * from the result of oper().  InvalidOid is returned if the lookup
- * fails and noError is true.
+ * from the result of compatible_oper().  InvalidOid is returned if the
+ * lookup fails and noError is true.
  */
 Oid
-oper_oid(char *op, Oid arg1, Oid arg2, bool noError)
+compatible_oper_opid(char *op, Oid arg1, Oid arg2, bool noError)
 {
        Operator        optup;
        Oid                     result;
 
-       optup = oper(op, arg1, arg2, noError);
+       optup = compatible_oper(op, arg1, arg2, noError);
        if (optup != NULL)
        {
                result = oprid(optup);
@@ -675,6 +708,28 @@ oper_oid(char *op, Oid arg1, Oid arg2, bool noError)
        return InvalidOid;
 }
 
+/* compatible_oper_funcid() -- get OID of a binary operator's function
+ *
+ * This is a convenience routine that extracts only the function OID
+ * from the result of compatible_oper().  InvalidOid is returned if the
+ * lookup fails and noError is true.
+ */
+Oid
+compatible_oper_funcid(char *op, Oid arg1, Oid arg2, bool noError)
+{
+       Operator        optup;
+       Oid                     result;
+
+       optup = compatible_oper(op, arg1, arg2, noError);
+       if (optup != NULL)
+       {
+               result = oprfuncid(optup);
+               ReleaseSysCache(optup);
+               return result;
+       }
+       return InvalidOid;
+}
+
 /* unary_oper_get_candidates()
  *     given opname, find all possible types for which
  *     a right/left unary operator named opname exists.
@@ -737,6 +792,10 @@ unary_oper_get_candidates(char *opname,
 
 
 /* Given unary right operator (operator on right), return oper struct
+ *
+ * IMPORTANT: the returned operator (if any) is only promised to be
+ * coercion-compatible with the input datatype.  Do not use this if
+ * you need an exact- or binary-compatible match.
  *
  * Always raises error on failure.
  *
@@ -764,16 +823,11 @@ right_oper(char *op, Oid arg)
                ncandidates = unary_oper_get_candidates(op, &candidates, 'r');
                if (ncandidates == 0)
                        unary_op_error(op, arg, FALSE);
-               else if (ncandidates == 1)
-               {
-                       tup = SearchSysCache(OPERNAME,
-                                                                PointerGetDatum(op),
-                                                                ObjectIdGetDatum(candidates->args[0]),
-                                                                ObjectIdGetDatum(InvalidOid),
-                                                                CharGetDatum('r'));
-               }
                else
                {
+                       /* We must run oper_select_candidate even if only one candidate,
+                        * otherwise we may falsely return a non-type-compatible operator.
+                        */
                        targetOid = oper_select_candidate(1, &arg, candidates);
                        if (targetOid != NULL)
                                tup = SearchSysCache(OPERNAME,
@@ -792,6 +846,10 @@ right_oper(char *op, Oid arg)
 
 
 /* Given unary left operator (operator on left), return oper struct
+ *
+ * IMPORTANT: the returned operator (if any) is only promised to be
+ * coercion-compatible with the input datatype.  Do not use this if
+ * you need an exact- or binary-compatible match.
  *
  * Always raises error on failure.
  *
@@ -819,16 +877,11 @@ left_oper(char *op, Oid arg)
                ncandidates = unary_oper_get_candidates(op, &candidates, 'l');
                if (ncandidates == 0)
                        unary_op_error(op, arg, TRUE);
-               else if (ncandidates == 1)
-               {
-                       tup = SearchSysCache(OPERNAME,
-                                                                PointerGetDatum(op),
-                                                                ObjectIdGetDatum(InvalidOid),
-                                                                ObjectIdGetDatum(candidates->args[0]),
-                                                                CharGetDatum('l'));
-               }
                else
                {
+                       /* We must run oper_select_candidate even if only one candidate,
+                        * otherwise we may falsely return a non-type-compatible operator.
+                        */
                        targetOid = oper_select_candidate(1, &arg, candidates);
                        if (targetOid != NULL)
                                tup = SearchSysCache(OPERNAME,
index be2e5de25deccc4f2ba5abe2fd75432465c19b7b..5c33978ba62ee5c99dbda12bd060208e93589deb 100644 (file)
@@ -1,6 +1,6 @@
 /*-------------------------------------------------------------------------
  *
- * catalog_utils.h
+ * parse_oper.h
  *
  *
  *
 
 typedef HeapTuple Operator;
 
+/* Routines to find operators matching a name and given input types */
+/* NB: the selected operator may require coercion of the input types! */
 extern Operator oper(char *op, Oid arg1, Oid arg2, bool noError);
 extern Operator right_oper(char *op, Oid arg);
 extern Operator left_oper(char *op, Oid arg);
 
-extern Oid     oper_oid(char *op, Oid arg1, Oid arg2, bool noError);
-extern Oid     oprid(Operator op);
+/* Routines to find operators that DO NOT require coercion --- ie, their */
+/* input types are either exactly as given, or binary-compatible */
+extern Operator compatible_oper(char *op, Oid arg1, Oid arg2, bool noError);
+/* currently no need for compatible_left_oper/compatible_right_oper */
+
+/* Convenience routines that call compatible_oper() and return either */
+/* the operator OID or the underlying function OID, or InvalidOid if fail */
+extern Oid compatible_oper_opid(char *op, Oid arg1, Oid arg2, bool noError);
+extern Oid compatible_oper_funcid(char *op, Oid arg1, Oid arg2, bool noError);
 
-extern Oid     any_ordering_op(Oid restype);
+/* Convenience routine that packages a specific call on compatible_oper */
+extern Oid     any_ordering_op(Oid argtype);
+
+/* Extract operator OID or underlying-function OID from an Operator tuple */
+extern Oid     oprid(Operator op);
+extern Oid     oprfuncid(Operator op);
 
 #endif  /* PARSE_OPER_H */