Code review for LIKE ... INCLUDING INDEXES patch. Fix failure to propagate
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 1 Dec 2007 23:44:44 +0000 (23:44 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 1 Dec 2007 23:44:44 +0000 (23:44 +0000)
constraint status of copied indexes (bug #3774), as well as various other
small bugs such as failure to pstrdup when needed.  Allow INCLUDING INDEXES
indexes to be merged with identical declared indexes (perhaps not real useful,
but the code is there and having it not apply to LIKE indexes seems pretty
unorthogonal).  Avoid useless work in generateClonedIndexStmt().  Undo some
poorly chosen API changes, and put a couple of routines in modules that seem
to be better places for them.

17 files changed:
src/backend/access/common/reloptions.c
src/backend/bootstrap/bootparse.y
src/backend/catalog/pg_depend.c
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/parser/parse_utilcmd.c
src/backend/tcop/utility.c
src/backend/utils/adt/ruleutils.c
src/include/access/reloptions.h
src/include/catalog/dependency.h
src/include/commands/defrem.h
src/include/nodes/parsenodes.h
src/include/utils/builtins.h
src/test/regress/expected/inherit.out

index ec54f5fe7a8411fb5ea7dc466c1ce3594b00ab2c..cecdc8086ffcb5492d6f6d8532d5d56ae879012f 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/reloptions.h"
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
+#include "nodes/makefuncs.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/rel.h"
@@ -149,6 +150,50 @@ transformRelOptions(Datum oldOptions, List *defList,
 }
 
 
+/*
+ * Convert the text-array format of reloptions into a List of DefElem.
+ * This is the inverse of transformRelOptions().
+ */
+List *
+untransformRelOptions(Datum options)
+{
+       List       *result = NIL;
+       ArrayType  *array;
+       Datum      *optiondatums;
+       int                     noptions;
+       int                     i;
+
+       /* Nothing to do if no options */
+       if (options == (Datum) 0)
+               return result;
+
+       array = DatumGetArrayTypeP(options);
+
+       Assert(ARR_ELEMTYPE(array) == TEXTOID);
+
+       deconstruct_array(array, TEXTOID, -1, false, 'i',
+                                         &optiondatums, NULL, &noptions);
+
+       for (i = 0; i < noptions; i++)
+       {
+               char       *s;
+               char       *p;
+               Node       *val = NULL;
+
+               s = DatumGetCString(DirectFunctionCall1(textout, optiondatums[i]));
+               p = strchr(s, '=');
+               if (p)
+               {
+                       *p++ = '\0';
+                       val = (Node *) makeString(pstrdup(p));
+               }
+               result = lappend(result, makeDefElem(pstrdup(s), val));
+       }
+
+       return result;
+}
+
+
 /*
  * Interpret reloptions that are given in text-array format.
  *
index 8bcd2f25e66c9bea268bd8b0f14cd70cef701a3f..296ccdadab2d333f4b6f9d6c516ed51fab168b27 100644 (file)
@@ -252,7 +252,7 @@ Boot_DeclareIndexStmt:
                                                                LexIDStr($8),
                                                                NULL,
                                                                $10,
-                                                               NULL, NIL, NULL,
+                                                               NULL, NIL,
                                                                false, false, false,
                                                                false, false, true, false, false);
                                        do_end();
@@ -270,7 +270,7 @@ Boot_DeclareUniqueIndexStmt:
                                                                LexIDStr($9),
                                                                NULL,
                                                                $11,
-                                                               NULL, NIL, NULL,
+                                                               NULL, NIL,
                                                                true, false, false,
                                                                false, false, true, false, false);
                                        do_end();
index a40ee4efe55ab44c22d09efab92de0737d61d106..a46a449da2b8264e7493b3659ce17be618c27a9a 100644 (file)
 #include "access/heapam.h"
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_constraint.h"
 #include "catalog/pg_depend.h"
 #include "miscadmin.h"
 #include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
 
 
 static bool isObjectPinned(const ObjectAddress *object, Relation rel);
@@ -260,6 +262,62 @@ changeDependencyFor(Oid classId, Oid objectId,
        return count;
 }
 
+/*
+ * isObjectPinned()
+ *
+ * Test if an object is required for basic database functionality.
+ * Caller must already have opened pg_depend.
+ *
+ * The passed subId, if any, is ignored; we assume that only whole objects
+ * are pinned (and that this implies pinning their components).
+ */
+static bool
+isObjectPinned(const ObjectAddress *object, Relation rel)
+{
+       bool            ret = false;
+       SysScanDesc scan;
+       HeapTuple       tup;
+       ScanKeyData key[2];
+
+       ScanKeyInit(&key[0],
+                               Anum_pg_depend_refclassid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(object->classId));
+
+       ScanKeyInit(&key[1],
+                               Anum_pg_depend_refobjid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(object->objectId));
+
+       scan = systable_beginscan(rel, DependReferenceIndexId, true,
+                                                         SnapshotNow, 2, key);
+
+       /*
+        * Since we won't generate additional pg_depend entries for pinned
+        * objects, there can be at most one entry referencing a pinned object.
+        * Hence, it's sufficient to look at the first returned tuple; we don't
+        * need to loop.
+        */
+       tup = systable_getnext(scan);
+       if (HeapTupleIsValid(tup))
+       {
+               Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
+
+               if (foundDep->deptype == DEPENDENCY_PIN)
+                       ret = true;
+       }
+
+       systable_endscan(scan);
+
+       return ret;
+}
+
+
+/*
+ * Various special-purpose lookups and manipulations of pg_depend.
+ */
+
+
 /*
  * Detect whether a sequence is marked as "owned" by a column
  *
@@ -359,52 +417,120 @@ markSequenceUnowned(Oid seqId)
        heap_close(depRel, RowExclusiveLock);
 }
 
+
 /*
- * isObjectPinned()
- *
- * Test if an object is required for basic database functionality.
- * Caller must already have opened pg_depend.
+ * get_constraint_index
+ *             Given the OID of a unique or primary-key constraint, return the
+ *             OID of the underlying unique index.
  *
- * The passed subId, if any, is ignored; we assume that only whole objects
- * are pinned (and that this implies pinning their components).
+ * Return InvalidOid if the index couldn't be found; this suggests the
+ * given OID is bogus, but we leave it to caller to decide what to do.
  */
-static bool
-isObjectPinned(const ObjectAddress *object, Relation rel)
+Oid
+get_constraint_index(Oid constraintId)
 {
-       bool            ret = false;
+       Oid                     indexId = InvalidOid;
+       Relation        depRel;
+       ScanKeyData key[3];
        SysScanDesc scan;
        HeapTuple       tup;
-       ScanKeyData key[2];
+
+       /* Search the dependency table for the dependent index */
+       depRel = heap_open(DependRelationId, AccessShareLock);
 
        ScanKeyInit(&key[0],
                                Anum_pg_depend_refclassid,
                                BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(object->classId));
-
+                               ObjectIdGetDatum(ConstraintRelationId));
        ScanKeyInit(&key[1],
                                Anum_pg_depend_refobjid,
                                BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(object->objectId));
+                               ObjectIdGetDatum(constraintId));
+       ScanKeyInit(&key[2],
+                               Anum_pg_depend_refobjsubid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(0));
 
-       scan = systable_beginscan(rel, DependReferenceIndexId, true,
-                                                         SnapshotNow, 2, key);
+       scan = systable_beginscan(depRel, DependReferenceIndexId, true,
+                                                         SnapshotNow, 3, key);
 
-       /*
-        * Since we won't generate additional pg_depend entries for pinned
-        * objects, there can be at most one entry referencing a pinned object.
-        * Hence, it's sufficient to look at the first returned tuple; we don't
-        * need to loop.
-        */
-       tup = systable_getnext(scan);
-       if (HeapTupleIsValid(tup))
+       while (HeapTupleIsValid(tup = systable_getnext(scan)))
        {
-               Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
+               Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
 
-               if (foundDep->deptype == DEPENDENCY_PIN)
-                       ret = true;
+               /*
+                * We assume any internal dependency of an index on the constraint
+                * must be what we are looking for.  (The relkind test is just
+                * paranoia; there shouldn't be any such dependencies otherwise.)
+                */
+               if (deprec->classid == RelationRelationId &&
+                       deprec->objsubid == 0 &&
+                       deprec->deptype == DEPENDENCY_INTERNAL &&
+                       get_rel_relkind(deprec->objid) == RELKIND_INDEX)
+               {
+                       indexId = deprec->objid;
+                       break;
+               }
        }
 
        systable_endscan(scan);
+       heap_close(depRel, AccessShareLock);
 
-       return ret;
+       return indexId;
+}
+
+/*
+ * get_index_constraint
+ *             Given the OID of an index, return the OID of the owning unique or
+ *             primary-key constraint, or InvalidOid if no such constraint.
+ */
+Oid
+get_index_constraint(Oid indexId)
+{
+       Oid                     constraintId = InvalidOid;
+       Relation        depRel;
+       ScanKeyData key[3];
+       SysScanDesc scan;
+       HeapTuple       tup;
+
+       /* Search the dependency table for the index */
+       depRel = heap_open(DependRelationId, AccessShareLock);
+
+       ScanKeyInit(&key[0],
+                               Anum_pg_depend_classid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationRelationId));
+       ScanKeyInit(&key[1],
+                               Anum_pg_depend_objid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(indexId));
+       ScanKeyInit(&key[2],
+                               Anum_pg_depend_objsubid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(0));
+
+       scan = systable_beginscan(depRel, DependDependerIndexId, true,
+                                                         SnapshotNow, 3, key);
+
+       while (HeapTupleIsValid(tup = systable_getnext(scan)))
+       {
+               Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
+
+               /*
+                * We assume any internal dependency on a constraint
+                * must be what we are looking for.
+                */
+               if (deprec->refclassid == ConstraintRelationId &&
+                       deprec->refobjsubid == 0 &&
+                       deprec->deptype == DEPENDENCY_INTERNAL)
+               {
+                       constraintId = deprec->refobjid;
+                       break;
+               }
+       }
+
+       systable_endscan(scan);
+       heap_close(depRel, AccessShareLock);
+
+       return constraintId;
 }
index 513b729b232197df7d46ebd55f6a58430cae9117..6410287bc843657776150c6a821ca918131a8798 100644 (file)
@@ -80,8 +80,6 @@ static bool relationHasPrimaryKey(Relation rel);
  *             to index on.
  * 'predicate': the partial-index condition, or NULL if none.
  * 'options': reloptions from WITH (in list-of-DefElem form).
- * 'src_options': reloptions from the source index, if this is a cloned
- *             index produced by CREATE TABLE LIKE ... INCLUDING INDEXES
  * 'unique': make the index enforce uniqueness.
  * 'primary': mark the index as a primary key in the catalogs.
  * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
@@ -103,7 +101,6 @@ DefineIndex(RangeVar *heapRelation,
                        List *attributeList,
                        Expr *predicate,
                        List *options,
-                       char *src_options,
                        bool unique,
                        bool primary,
                        bool isconstraint,
@@ -396,16 +393,9 @@ DefineIndex(RangeVar *heapRelation,
        }
 
        /*
-        * Parse AM-specific options, convert to text array form, validate.  The
-        * src_options introduced due to using indexes via the "CREATE LIKE
-        * INCLUDING INDEXES" statement also need to be merged here
+        * Parse AM-specific options, convert to text array form, validate.
         */
-       if (src_options)
-               reloptions = unflatten_reloptions(src_options);
-       else
-               reloptions = (Datum) 0;
-
-       reloptions = transformRelOptions(reloptions, options, false, false);
+       reloptions = transformRelOptions((Datum) 0, options, false, false);
 
        (void) index_reloptions(amoptions, reloptions, true);
 
index e91826fd6f7de9642f3b3ecd3136a2ba519bd966..0a9e828c63f9038dfe55d77ec5fe2f0870250e31 100644 (file)
@@ -3795,7 +3795,6 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
                                stmt->indexParams,              /* parameters */
                                (Expr *) stmt->whereClause,
                                stmt->options,
-                               stmt->src_options,
                                stmt->unique,
                                stmt->primary,
                                stmt->isconstraint,
index 122bc8c7989e56fbacaa8152e267805c86212ad6..36779942ce284b9c0f1aca82e29702d265174eb8 100644 (file)
@@ -2195,7 +2195,6 @@ _copyIndexStmt(IndexStmt *from)
        COPY_STRING_FIELD(tableSpace);
        COPY_NODE_FIELD(indexParams);
        COPY_NODE_FIELD(options);
-       COPY_STRING_FIELD(src_options);
        COPY_NODE_FIELD(whereClause);
        COPY_SCALAR_FIELD(unique);
        COPY_SCALAR_FIELD(primary);
index 4494a1a5a33efd432b092a5f71d28967b9fe061e..6aef05be250d2a85d3fdd2a7c0b376cba03009ef 100644 (file)
@@ -1046,7 +1046,6 @@ _equalIndexStmt(IndexStmt *a, IndexStmt *b)
        COMPARE_STRING_FIELD(tableSpace);
        COMPARE_NODE_FIELD(indexParams);
        COMPARE_NODE_FIELD(options);
-       COMPARE_STRING_FIELD(src_options);
        COMPARE_NODE_FIELD(whereClause);
        COMPARE_SCALAR_FIELD(unique);
        COMPARE_SCALAR_FIELD(primary);
index 638ce48e66ac765d94cbc063b293f95cfb0cd252..13ede370beac00abda826a2576e927fc12ca8dd0 100644 (file)
@@ -1546,7 +1546,6 @@ _outIndexStmt(StringInfo str, IndexStmt *node)
        WRITE_STRING_FIELD(tableSpace);
        WRITE_NODE_FIELD(indexParams);
        WRITE_NODE_FIELD(options);
-       WRITE_STRING_FIELD(src_options);
        WRITE_NODE_FIELD(whereClause);
        WRITE_BOOL_FIELD(unique);
        WRITE_BOOL_FIELD(primary);
index 193004ad6c25a9891180546c4e88311baf7a3bce..6cc7525c751af24a64911a6e1508fdf27018ec5c 100644 (file)
@@ -28,6 +28,8 @@
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/reloptions.h"
+#include "catalog/dependency.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
@@ -675,13 +677,15 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
                }
        }
 
+       /*
+        * Likewise, copy indexes if requested
+        */
        if (including_indexes && relation->rd_rel->relhasindex)
        {
-               AttrNumber *attmap;
+               AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
                List       *parent_indexes;
                ListCell   *l;
 
-               attmap = varattnos_map_schema(tupleDesc, cxt->columns);
                parent_indexes = RelationGetIndexList(relation);
 
                foreach(l, parent_indexes)
@@ -693,14 +697,12 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
                        parent_index = index_open(parent_index_oid, AccessShareLock);
 
                        /* Build CREATE INDEX statement to recreate the parent_index */
-                       index_stmt = generateClonedIndexStmt(cxt, parent_index,
-                                                                                                attmap);
+                       index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap);
 
-                       /* Add the new IndexStmt to the create context */
+                       /* Save it in the inh_indexes list for the time being */
                        cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
 
-                       /* Keep our lock on the index till xact commit */
-                       index_close(parent_index, NoLock);
+                       index_close(parent_index, AccessShareLock);
                }
        }
 
@@ -713,54 +715,62 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
 }
 
 /*
- * Generate an IndexStmt entry using information from an already
- * existing index "source_idx".
- *
- * Note: Much of this functionality is cribbed from pg_get_indexdef.
+ * Generate an IndexStmt node using information from an already existing index
+ * "source_idx".  Attribute numbers should be adjusted according to attmap.
  */
 static IndexStmt *
 generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                                                AttrNumber *attmap)
 {
-       HeapTuple       ht_idx;
+       Oid                     source_relid = RelationGetRelid(source_idx);
        HeapTuple       ht_idxrel;
-       HeapTuple       ht_am;
-       Form_pg_index idxrec;
+       HeapTuple       ht_idx;
        Form_pg_class idxrelrec;
+       Form_pg_index idxrec;
        Form_pg_am      amrec;
-       List       *indexprs = NIL;
+       oidvector  *indclass;
+       IndexStmt  *index;
+       List       *indexprs;
        ListCell   *indexpr_item;
        Oid                     indrelid;
-       Oid                     source_relid;
        int                     keyno;
        Oid                     keycoltype;
-       Datum           indclassDatum;
-       Datum           indoptionDatum;
+       Datum           datum;
        bool            isnull;
-       oidvector  *indclass;
-       int2vector *indoption;
-       IndexStmt  *index;
-       Datum           reloptions;
 
-       source_relid = RelationGetRelid(source_idx);
+       /*
+        * Fetch pg_class tuple of source index.  We can't use the copy in the
+        * relcache entry because it doesn't include optional fields.
+        */
+       ht_idxrel = SearchSysCache(RELOID,
+                                                          ObjectIdGetDatum(source_relid),
+                                                          0, 0, 0);
+       if (!HeapTupleIsValid(ht_idxrel))
+               elog(ERROR, "cache lookup failed for relation %u", source_relid);
+       idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
 
-       /* Fetch pg_index tuple for source index */
-       ht_idx = SearchSysCache(INDEXRELID,
-                                                       ObjectIdGetDatum(source_relid),
-                                                       0, 0, 0);
-       if (!HeapTupleIsValid(ht_idx))
-               elog(ERROR, "cache lookup failed for index %u", source_relid);
+       /* Fetch pg_index tuple for source index from relcache entry */
+       ht_idx = source_idx->rd_indextuple;
        idxrec = (Form_pg_index) GETSTRUCT(ht_idx);
-
-       Assert(source_relid == idxrec->indexrelid);
        indrelid = idxrec->indrelid;
 
+       /* Fetch pg_am tuple for source index from relcache entry */
+       amrec = source_idx->rd_am;
+
+       /* Must get indclass the hard way, since it's not stored in relcache */
+       datum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                                       Anum_pg_index_indclass, &isnull);
+       Assert(!isnull);
+       indclass = (oidvector *) DatumGetPointer(datum);
+
+       /* Begin building the IndexStmt */
        index = makeNode(IndexStmt);
+       index->relation = cxt->relation;
+       index->accessMethod = pstrdup(NameStr(amrec->amname));
+       index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode);
        index->unique = idxrec->indisunique;
-       index->concurrent = false;
        index->primary = idxrec->indisprimary;
-       index->relation = cxt->relation;
-       index->isconstraint = false;
+       index->concurrent = false;
 
        /*
         * We don't try to preserve the name of the source index; instead, just
@@ -768,65 +778,40 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
         */
        index->idxname = NULL;
 
-       /* Must get indclass and indoption the hard way */
-       indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
-                                                                       Anum_pg_index_indclass, &isnull);
-       Assert(!isnull);
-       indclass = (oidvector *) DatumGetPointer(indclassDatum);
-       indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
-                                                                        Anum_pg_index_indoption, &isnull);
-       Assert(!isnull);
-       indoption = (int2vector *) DatumGetPointer(indoptionDatum);
-
-       /* Fetch pg_class tuple of source index */
-       ht_idxrel = SearchSysCache(RELOID,
-                                                          ObjectIdGetDatum(source_relid),
-                                                          0, 0, 0);
-       if (!HeapTupleIsValid(ht_idxrel))
-               elog(ERROR, "cache lookup failed for relation %u", source_relid);
-
        /*
-        * Store the reloptions for later use by this new index
+        * If the index is marked PRIMARY, it's certainly from a constraint;
+        * else, if it's not marked UNIQUE, it certainly isn't; else, we have
+        * to search pg_depend to see if there's an associated unique constraint.
         */
-       reloptions = SysCacheGetAttr(RELOID, ht_idxrel,
-                                                                Anum_pg_class_reloptions, &isnull);
-       if (!isnull)
-               index->src_options = flatten_reloptions(source_relid);
-
-       idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
-
-       /* Fetch pg_am tuple for the index's access method */
-       ht_am = SearchSysCache(AMOID,
-                                                  ObjectIdGetDatum(idxrelrec->relam),
-                                                  0, 0, 0);
-       if (!HeapTupleIsValid(ht_am))
-               elog(ERROR, "cache lookup failed for access method %u",
-                        idxrelrec->relam);
-       amrec = (Form_pg_am) GETSTRUCT(ht_am);
-       index->accessMethod = pstrdup(NameStr(amrec->amname));
+       if (index->primary)
+               index->isconstraint = true;
+       else if (!index->unique)
+               index->isconstraint = false;
+       else
+               index->isconstraint = OidIsValid(get_index_constraint(source_relid));
 
        /* Get the index expressions, if any */
-       if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs))
+       datum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                                       Anum_pg_index_indexprs, &isnull);
+       if (!isnull)
        {
-               Datum           exprsDatum;
-               bool            isnull;
                char       *exprsString;
 
-               exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx,
-                                                                        Anum_pg_index_indexprs, &isnull);
-               exprsString = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                 exprsDatum));
-               Assert(!isnull);
+               exprsString = DatumGetCString(DirectFunctionCall1(textout, datum));
                indexprs = (List *) stringToNode(exprsString);
        }
+       else
+               indexprs = NIL;
 
-       indexpr_item = list_head(indexprs);
+       /* Build the list of IndexElem */
+       index->indexParams = NIL;
 
+       indexpr_item = list_head(indexprs);
        for (keyno = 0; keyno < idxrec->indnatts; keyno++)
        {
                IndexElem  *iparam;
                AttrNumber      attnum = idxrec->indkey.values[keyno];
-               int16           opt = indoption->values[keyno];
+               int16           opt = source_idx->rd_indoption[keyno];
 
                iparam = makeNode(IndexElem);
 
@@ -849,11 +834,14 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                        if (indexpr_item == NULL)
                                elog(ERROR, "too few entries in indexprs list");
                        indexkey = (Node *) lfirst(indexpr_item);
+                       indexpr_item = lnext(indexpr_item);
+
+                       /* OK to modify indexkey since we are working on a private copy */
                        change_varattnos_of_a_node(indexkey, attmap);
+
                        iparam->name = NULL;
                        iparam->expr = indexkey;
 
-                       indexpr_item = lnext(indexpr_item);
                        keycoltype = exprType(indexkey);
                }
 
@@ -866,40 +854,50 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                /* Adjust options if necessary */
                if (amrec->amcanorder)
                {
-                       /* If it supports sort ordering, report DESC and NULLS opts */
+                       /*
+                        * If it supports sort ordering, copy DESC and NULLS opts.
+                        * Don't set non-default settings unnecessarily, though,
+                        * so as to improve the chance of recognizing equivalence
+                        * to constraint indexes.
+                        */
                        if (opt & INDOPTION_DESC)
+                       {
                                iparam->ordering = SORTBY_DESC;
-                       if (opt & INDOPTION_NULLS_FIRST)
-                               iparam->nulls_ordering = SORTBY_NULLS_FIRST;
+                               if ((opt & INDOPTION_NULLS_FIRST) == 0)
+                                       iparam->nulls_ordering = SORTBY_NULLS_LAST;
+                       }
+                       else
+                       {
+                               if (opt & INDOPTION_NULLS_FIRST)
+                                       iparam->nulls_ordering = SORTBY_NULLS_FIRST;
+                       }
                }
 
                index->indexParams = lappend(index->indexParams, iparam);
        }
 
-       /* Use the same tablespace as the source index */
-       index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode);
+       /* Copy reloptions if any */
+       datum = SysCacheGetAttr(RELOID, ht_idxrel,
+                                                       Anum_pg_class_reloptions, &isnull);
+       if (!isnull)
+               index->options = untransformRelOptions(datum);
 
        /* If it's a partial index, decompile and append the predicate */
-       if (!heap_attisnull(ht_idx, Anum_pg_index_indpred))
+       datum = SysCacheGetAttr(INDEXRELID, ht_idx,
+                                                       Anum_pg_index_indpred, &isnull);
+       if (!isnull)
        {
-               Datum           pred_datum;
-               bool            isnull;
                char       *pred_str;
 
                /* Convert text string to node tree */
-               pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx,
-                                                                        Anum_pg_index_indpred, &isnull);
-               Assert(!isnull);
-               pred_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                          pred_datum));
+               pred_str = DatumGetCString(DirectFunctionCall1(textout, datum));
                index->whereClause = (Node *) stringToNode(pred_str);
+               /* Adjust attribute numbers */
                change_varattnos_of_a_node(index->whereClause, attmap);
        }
 
        /* Clean up */
-       ReleaseSysCache(ht_idx);
        ReleaseSysCache(ht_idxrel);
-       ReleaseSysCache(ht_am);
 
        return index;
 }
@@ -924,11 +922,11 @@ get_opclass(Oid opclass, Oid actual_datatype)
                elog(ERROR, "cache lookup failed for opclass %u", opclass);
        opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc);
 
-       if (!OidIsValid(actual_datatype) ||
-               GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass)
+       if (GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass)
        {
+               /* For simplicity, we always schema-qualify the name */
                char       *nsp_name = get_namespace_name(opc_rec->opcnamespace);
-               char       *opc_name = NameStr(opc_rec->opcname);
+               char       *opc_name = pstrdup(NameStr(opc_rec->opcname));
 
                result = list_make2(makeString(nsp_name), makeString(opc_name));
        }
@@ -940,8 +938,8 @@ get_opclass(Oid opclass, Oid actual_datatype)
 
 /*
  * transformIndexConstraints
- *             Handle UNIQUE and PRIMARY KEY constraints, which create
- *             indexes. We also merge index definitions arising from
+ *             Handle UNIQUE and PRIMARY KEY constraints, which create indexes.
+ *             We also merge in any index definitions arising from
  *             LIKE ... INCLUDING INDEXES.
  */
 static void
@@ -960,7 +958,30 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
        {
                Constraint *constraint = (Constraint *) lfirst(lc);
 
+               Assert(IsA(constraint, Constraint));
+               Assert(constraint->contype == CONSTR_PRIMARY ||
+                          constraint->contype == CONSTR_UNIQUE);
+
                index = transformIndexConstraint(constraint, cxt);
+
+               indexlist = lappend(indexlist, index);
+       }
+
+       /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */
+       foreach(lc, cxt->inh_indexes)
+       {
+               index = (IndexStmt *) lfirst(lc);
+
+               if (index->primary)
+               {
+                       if (cxt->pkey != NULL)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                                errmsg("multiple primary keys for table \"%s\" are not allowed",
+                                                               cxt->relation->relname)));
+                       cxt->pkey = index;
+               }
+
                indexlist = lappend(indexlist, index);
        }
 
@@ -995,8 +1016,11 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
                {
                        IndexStmt  *priorindex = lfirst(k);
 
-                       if (equal(index->indexParams, priorindex->indexParams))
+                       if (equal(index->indexParams, priorindex->indexParams) &&
+                               equal(index->whereClause, priorindex->whereClause) &&
+                               strcmp(index->accessMethod, priorindex->accessMethod) == 0)
                        {
+                               priorindex->unique |= index->unique;
                                /*
                                 * If the prior index is as yet unnamed, and this one is
                                 * named, then transfer the name to the prior index. This
@@ -1013,27 +1037,13 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
                if (keep)
                        cxt->alist = lappend(cxt->alist, index);
        }
-
-       /* Copy indexes defined by LIKE ... INCLUDING INDEXES */
-       foreach(lc, cxt->inh_indexes)
-       {
-               index = (IndexStmt *) lfirst(lc);
-
-               if (index->primary)
-               {
-                       if (cxt->pkey)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                                errmsg("multiple primary keys for table \"%s\" are not allowed",
-                                                               cxt->relation->relname)));
-
-                       cxt->pkey = index;
-               }
-
-               cxt->alist = lappend(cxt->alist, index);
-       }
 }
 
+/*
+ * transformIndexConstraint
+ *             Transform one UNIQUE or PRIMARY KEY constraint for
+ *             transformIndexConstraints.
+ */
 static IndexStmt *
 transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 {
@@ -1041,13 +1051,10 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
        ListCell   *keys;
        IndexElem  *iparam;
 
-       Assert(constraint->contype == CONSTR_PRIMARY ||
-                  constraint->contype == CONSTR_UNIQUE);
-
        index = makeNode(IndexStmt);
+
        index->unique = true;
        index->primary = (constraint->contype == CONSTR_PRIMARY);
-
        if (index->primary)
        {
                if (cxt->pkey != NULL)
@@ -1771,7 +1778,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
 
        /*
         * transformIndexConstraints wants cxt.alist to contain only index
-        * statements, so transfer anything we already have into save_alist.
+        * statements, so transfer anything we already have into save_alist
         * immediately.
         */
        save_alist = cxt.alist;
index a7601bdc71f4f3f6301b34075de40e76b63b8502..efaa20acea1e08526c2607136f2990f6b601911c 100644 (file)
@@ -924,7 +924,6 @@ ProcessUtility(Node *parsetree,
                                                        stmt->indexParams,      /* parameters */
                                                        (Expr *) stmt->whereClause,
                                                        stmt->options,
-                                                       stmt->src_options,
                                                        stmt->unique,
                                                        stmt->primary,
                                                        stmt->isconstraint,
index c60d803021ab8b5394dd361908c897acc7299268..238585835674ba61b39dfd9bc09c93608900afab 100644 (file)
@@ -133,7 +133,6 @@ static char *pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
                                                        int prettyFlags);
 static char *pg_get_expr_worker(text *expr, Oid relid, char *relname,
                                   int prettyFlags);
-static Oid     get_constraint_index(Oid constraintId);
 static void make_ruledef(StringInfo buf, HeapTuple ruletup, TupleDesc rulettc,
                         int prettyFlags);
 static void make_viewdef(StringInfo buf, HeapTuple ruletup, TupleDesc rulettc,
@@ -195,6 +194,7 @@ static char *generate_relation_name(Oid relid);
 static char *generate_function_name(Oid funcid, int nargs, Oid *argtypes);
 static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2);
 static text *string_to_text(char *str);
+static char *flatten_reloptions(Oid relid);
 
 #define only_marker(rte)  ((rte)->inh ? "" : "ONLY ")
 
@@ -1384,68 +1384,6 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS)
 }
 
 
-/*
- * get_constraint_index
- *             Given the OID of a unique or primary-key constraint, return the
- *             OID of the underlying unique index.
- *
- * Return InvalidOid if the index couldn't be found; this suggests the
- * given OID is bogus, but we leave it to caller to decide what to do.
- */
-static Oid
-get_constraint_index(Oid constraintId)
-{
-       Oid                     indexId = InvalidOid;
-       Relation        depRel;
-       ScanKeyData key[3];
-       SysScanDesc scan;
-       HeapTuple       tup;
-
-       /* Search the dependency table for the dependent index */
-       depRel = heap_open(DependRelationId, AccessShareLock);
-
-       ScanKeyInit(&key[0],
-                               Anum_pg_depend_refclassid,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(ConstraintRelationId));
-       ScanKeyInit(&key[1],
-                               Anum_pg_depend_refobjid,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(constraintId));
-       ScanKeyInit(&key[2],
-                               Anum_pg_depend_refobjsubid,
-                               BTEqualStrategyNumber, F_INT4EQ,
-                               Int32GetDatum(0));
-
-       scan = systable_beginscan(depRel, DependReferenceIndexId, true,
-                                                         SnapshotNow, 3, key);
-
-       while (HeapTupleIsValid(tup = systable_getnext(scan)))
-       {
-               Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
-
-               /*
-                * We assume any internal dependency of an index on the constraint
-                * must be what we are looking for.  (The relkind test is just
-                * paranoia; there shouldn't be any such dependencies otherwise.)
-                */
-               if (deprec->classid == RelationRelationId &&
-                       deprec->objsubid == 0 &&
-                       deprec->deptype == DEPENDENCY_INTERNAL &&
-                       get_rel_relkind(deprec->objid) == RELKIND_INDEX)
-               {
-                       indexId = deprec->objid;
-                       break;
-               }
-       }
-
-       systable_endscan(scan);
-       heap_close(depRel, AccessShareLock);
-
-       return indexId;
-}
-
-
 /*
  * deparse_expression                  - General utility for deparsing expressions
  *
@@ -5507,7 +5445,7 @@ string_to_text(char *str)
 /*
  * Generate a C string representing a relation's reloptions, or NULL if none.
  */
-char *
+static char *
 flatten_reloptions(Oid relid)
 {
        char       *result = NULL;
@@ -5543,32 +5481,3 @@ flatten_reloptions(Oid relid)
 
        return result;
 }
-
-/*
- * Generate an Array Datum representing a relation's reloptions using
- * a C string
- */
-Datum
-unflatten_reloptions(char *reloptstring)
-{
-       Datum           result = (Datum) 0;
-
-       if (reloptstring)
-       {
-               Datum           sep,
-                                       relopts;
-
-               /*
-                * We want to use text_to_array(reloptstring, ', ') --- but
-                * DirectFunctionCall2(text_to_array) does not work, because
-                * text_to_array() relies on fcinfo to be valid.  So use
-                * OidFunctionCall2.
-                */
-               sep = DirectFunctionCall1(textin, CStringGetDatum(", "));
-               relopts = DirectFunctionCall1(textin, CStringGetDatum(reloptstring));
-
-               result = OidFunctionCall2(F_TEXT_TO_ARRAY, relopts, sep);
-       }
-
-       return result;
-}
index b0a7d3b4aa73d152ce289e5e50f029c26eeb5ece..b6115f8389d2757ad328e4448f4a94aada42b7dc 100644 (file)
@@ -23,6 +23,8 @@
 extern Datum transformRelOptions(Datum oldOptions, List *defList,
                                        bool ignoreOids, bool isReset);
 
+extern List *untransformRelOptions(Datum options);
+
 extern void parseRelOptions(Datum options, int numkeywords,
                                const char *const * keywords,
                                char **values, bool validate);
index 6d9575ac08e4fda6b14e8724816c5daaeed091f4..e468ff5d987077e4ebbe3c61ebe52b3fd2155c52 100644 (file)
@@ -207,6 +207,10 @@ extern bool sequenceIsOwned(Oid seqId, Oid *tableId, int32 *colId);
 
 extern void markSequenceUnowned(Oid seqId);
 
+extern Oid     get_constraint_index(Oid constraintId);
+
+extern Oid     get_index_constraint(Oid indexId);
+
 /* in pg_shdepend.c */
 
 extern void recordSharedDependencyOn(ObjectAddress *depender,
index d648e692aaccd4542ec8c28a3b3a85753a75c890..aa701001c58e8575e6e9784b6c77508af2dc4fe9 100644 (file)
@@ -26,7 +26,6 @@ extern void DefineIndex(RangeVar *heapRelation,
                        List *attributeList,
                        Expr *predicate,
                        List *options,
-                       char *src_options,
                        bool unique,
                        bool primary,
                        bool isconstraint,
index 1a973856af0fcceb39363ce353db5b9a8794d3c4..5a3c7e6ae3f2fb9fcdf7507c78b7bc231c10e25c 100644 (file)
@@ -1540,7 +1540,6 @@ typedef struct IndexStmt
        char       *tableSpace;         /* tablespace, or NULL to use parent's */
        List       *indexParams;        /* a list of IndexElem */
        List       *options;            /* options from WITH clause */
-       char       *src_options;        /* relopts inherited from source index */
        Node       *whereClause;        /* qualification (partial-index predicate) */
        bool            unique;                 /* is index unique? */
        bool            primary;                /* is index on primary key? */
index 84e82d308de7d7fc638f9e9b8b14e278773205c9..f381ff181f3cf6ed3658fb37d0b413067049e5b1 100644 (file)
@@ -566,8 +566,6 @@ extern List *deparse_context_for_plan(Node *outer_plan, Node *inner_plan,
 extern const char *quote_identifier(const char *ident);
 extern char *quote_qualified_identifier(const char *namespace,
                                                   const char *ident);
-extern char *flatten_reloptions(Oid relid);
-extern Datum unflatten_reloptions(char *reloptstring);
 
 /* tid.c */
 extern Datum tidin(PG_FUNCTION_ARGS);
index 40dfaeda902a0c75e0f51b8e66fc86c2190f5319..f81776fe80463dac11247b8cab439d3415f96030 100644 (file)
@@ -634,6 +634,7 @@ SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y
 
 DROP TABLE inhg;
 CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
+NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "inhg_pkey" for table "inhg"
 INSERT INTO inhg VALUES (5, 10);
 INSERT INTO inhg VALUES (20, 10); -- should fail
 ERROR:  duplicate key value violates unique constraint "inhg_pkey"
@@ -647,6 +648,7 @@ CREATE UNIQUE INDEX inhz_xx_idx on inhz (xx) WHERE xx <> 'test';
 /* Ok to create multiple unique indexes */
 CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES);
 NOTICE:  CREATE TABLE / UNIQUE will create implicit index "inhg_x_key" for table "inhg"
+NOTICE:  CREATE TABLE / UNIQUE will create implicit index "inhg_yy_key" for table "inhg"
 INSERT INTO inhg (xx, yy, x) VALUES ('test', 5, 10);
 INSERT INTO inhg (xx, yy, x) VALUES ('test', 10, 15);
 INSERT INTO inhg (xx, yy, x) VALUES ('foo', 10, 15); -- should fail